diff --git a/vspursuer.py b/vspursuer.py index 7e42368..be04ebc 100755 --- a/vspursuer.py +++ b/vspursuer.py @@ -2,8 +2,8 @@ # NOTE: Perhaps we should use process_cpu_count but that's not available to all Python versions from os import cpu_count -from time import sleep from typing import Dict, Iterator, Optional, Tuple +from queue import Empty as QueueEmpty import argparse import multiprocessing as mp @@ -38,14 +38,51 @@ def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, Model def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation): """ - Wrapper so that we can save the model that satisfies VSP. - NOTE: At the time of writing, models that don't satisfy VSP - get discarded from memory for efficiency sake. + Wrapper which also stores the models along with its vsp result """ vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation) - if vsp_result.has_vsp: - return (model, vsp_result) - return (None, vsp_result) + return (model, vsp_result) + +def worker(task_queue, result_queue): + """ + Worker process which processes models from the task + queue and adds the result to the result_queue. + + Adds the sentinal value None when exception occurs and when there's + no more to process. + """ + try: + while True: + task = task_queue.get() + # If sentinal value, break + if task is None: + result_queue.put(None) + break + (model, impfunction, mconjunction, mdisjunction, mnegation) = task + result = has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation) + result_queue.put(result) + except Exception: + # Process failed somehow, push sentinal value + result_queue.put(None) + +def add_to_queue(gen, queue, num_sentinal_values) -> bool: + """ + Consumes an item from gen and puts + it in the queue. + + If there are no items left in gen, + return false and send sentinal values, + otherwise true. + """ + try: + item = next(gen) + queue.put(item) + return True + except StopIteration: + for _ in range(num_sentinal_values): + queue.put(None) + return False + if __name__ == "__main__": parser = argparse.ArgumentParser(description="VSP Checker") @@ -71,52 +108,68 @@ if __name__ == "__main__": # Set up parallel verification num_tested = 0 num_has_vsp = 0 - with mp.Pool(processes=num_cpu) as pool: - task_pool = [] - done_parsing = False - # Populate initial task pool - for _ in range(num_cpu): - try: - model, impfunction, mconjunction, mdisjunction, mnegation = next(solutions) - except StopIteration: - done_parsing = True + # Create queues + task_queue = mp.Queue() + result_queue = mp.Queue() + + # Create worker processes + processes = [] + for _ in range(num_cpu): + p = mp.Process(target=worker, args=(task_queue, result_queue)) + processes.append(p) + p.start() + + # Populate initial task queue + done_parsing = False + for _ in range(num_cpu): + added = add_to_queue(solutions, task_queue, num_cpu) + if not added: + done_parsing = True + break + + # Check results and add new tasks until finished + result_sentinal_count = 0 + while True: + + # Read a result + try: + result = result_queue.get(True, 60) + except QueueEmpty: + # Health check in case processes crashed + if all((not p.is_alive() for p in processes)): + print("[WARNING] No child processes remain") break - result_async = pool.apply_async(has_vsp_plus_model, (model, impfunction, mconjunction, mdisjunction, mnegation)) - task_pool.append(result_async) + # Otherwise continue + continue - while len(task_pool) > 0: - next_task_pool = [] - # Check the status of all the tasks, and spawn - # new ones if finished - for result_async in task_pool: - if result_async.ready(): - model, vsp_result = result_async.get() - print(vsp_result) - num_tested += 1 + # When we receive None, it means a child process has finished + if result is None: + result_sentinal_count += 1 + # If all workers have finished break + if result_sentinal_count == num_cpu: + break + continue - if args['verbose'] or vsp_result.has_vsp: - print(model) + # Process result + model, vsp_result = result + print(vsp_result) + num_tested += 1 - if vsp_result.has_vsp: - num_has_vsp += 1 + if vsp_result.has_vsp: + print(model) - if done_parsing: - continue + if vsp_result.has_vsp: + num_has_vsp += 1 + + # Submit new task if available + if done_parsing: + continue + + added = add_to_queue(solutions, task_queue, num_cpu) + if not added: + done_parsing = True - # Submit new task if available - try: - model, impfunction, mconjunction, mdisjunction, mnegation = next(solutions) - next_result_async = pool.apply_async(has_vsp_plus_model, (model, impfunction, mconjunction, mdisjunction, mnegation)) - next_task_pool.append(next_result_async) - except StopIteration: - done_parsing = True - else: - # Otherwise the task is still working, - # add it to the next task pool - next_task_pool.append(result_async) - task_pool = next_task_pool - sleep(0.01) print(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")