From de8637bca4a71cd2466bcaad07738414ee87e907 Mon Sep 17 00:00:00 2001 From: Brandon Rozek Date: Wed, 9 Apr 2025 13:00:08 -0400 Subject: [PATCH] Dedicated process for file parsing --- vspursuer.py | 125 +++++++++++++++++++++++---------------------------- 1 file changed, 55 insertions(+), 70 deletions(-) diff --git a/vspursuer.py b/vspursuer.py index 34c219d..9c6537b 100755 --- a/vspursuer.py +++ b/vspursuer.py @@ -43,7 +43,7 @@ def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation) return (model, vsp_result) -def worker(task_queue, result_queue): +def worker_vsp(task_queue, result_queue): """ Worker process which processes models from the task queue and adds the result to the result_queue. @@ -56,32 +56,39 @@ def worker(task_queue, result_queue): task = task_queue.get() # If sentinal value, break if task is None: - result_queue.put(None) break (model, impfunction, mconjunction, mdisjunction, mnegation) = task result = has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation) result_queue.put(result) - except Exception: - # Process failed somehow, push sentinal value + finally: + # Either an exception occured or the worker finished + # Push sentinal value result_queue.put(None) -def add_to_queue(gen, queue, num_sentinal_values) -> bool: +def worker_parser(data_file_path, num_sentinal_values, task_queue): """ - Consumes an item from gen and puts - it in the queue. + Function which parses the MaGIC file + and adds models to the task_queue. - If there are no items left in gen, - return false and send sentinal values, - otherwise true. + Intended to be deployed with a dedicated process. """ try: - item = next(gen) - queue.put(item) - return True - except StopIteration: + data_file = open(data_file_path, "r") + solutions = parse_matrices(SourceFile(data_file)) + solutions = restructure_solutions(solutions, args) + + while True: + try: + item = next(solutions) + task_queue.put(item) + except StopIteration: + break + finally: + data_file.close() for _ in range(num_sentinal_values): - queue.put(None) - return False + task_queue.put(None) + + if __name__ == "__main__": @@ -96,10 +103,6 @@ if __name__ == "__main__": if data_file_path is None: data_file_path = input("Path to MaGIC Ugly Data File: ") - data_file = open(data_file_path, "r") - - solutions = parse_matrices(SourceFile(data_file)) - solutions = restructure_solutions(solutions, args) num_cpu = args.get("c") if num_cpu is None: @@ -108,27 +111,23 @@ if __name__ == "__main__": # Set up parallel verification num_tested = 0 num_has_vsp = 0 + num_workers = max(num_cpu - 1, 1) # Create queues - task_queue = mp.Queue() + task_queue = mp.Queue(maxsize=1000) result_queue = mp.Queue() - # Create worker processes - processes = [] - for _ in range(num_cpu): - p = mp.Process(target=worker, args=(task_queue, result_queue)) - processes.append(p) + # Create dedicated process to parse the MaGIC file + process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue)) + process_parser.start() + + # Create dedicated processes which check VSP + process_workers = [] + for _ in range(num_workers): + p = mp.Process(target=worker_vsp, args=(task_queue, result_queue)) + process_workers.append(p) p.start() - # Populate initial task queue - # NOTE: Adding more than number of processes - # to make sure there's always work to do. - done_parsing = False - for _ in range(num_cpu * 2): - added = add_to_queue(solutions, task_queue, num_cpu) - if not added: - done_parsing = True - break # Check results and add new tasks until finished result_sentinal_count = 0 @@ -138,24 +137,32 @@ if __name__ == "__main__": try: result = result_queue.get(True, 60) except QueueEmpty: - # Health check in case processes crashed - num_dead = 0 - for p in processes: - if not p.is_alive(): - num_dead += 1 - if num_dead == len(processes): - print("[ERROR] No child processes remain") + if all((not p.is_alive() for p in process_workers)): + # All workers finished without us receiving all the + # sentinal values. break - elif num_dead > 0: - print("[WARNING] Number of dead processes:", num_dead) - # Otherwise continue - continue + + task_queue_size = 0 + try: + task_queue_size = task_queue.qsize() + except NotImplementedError: + # MacOS doesn't implement this + pass + + if task_queue_size == 0 and not process_parser.is_alive(): + # For Linux/Windows this means that the process_parser + # died before sending the sentinal values. + # For Mac, this doesn't guarentee anything but might + # as well push more sentinal values. + for _ in range(num_workers): + task_queue.put(None) + # When we receive None, it means a child process has finished if result is None: result_sentinal_count += 1 # If all workers have finished break - if result_sentinal_count == num_cpu: + if result_sentinal_count == len(process_workers): break continue @@ -170,29 +177,7 @@ if __name__ == "__main__": if vsp_result.has_vsp: num_has_vsp += 1 - # Submit new task if available - if done_parsing: - continue - - # NOTE: We should attempt to maintain a decent amount - # of work in the task queue so that workers stay busy - task_queue_size: Optional[int] = None - try: - task_queue_size = task_queue.qsize() - except NotImplementedError: - # On MacOS this isn't implemented - pass - - num_new_tasks = 1 - if task_queue_size is not None and task_queue_size < num_cpu * 2: - num_new_tasks = (num_cpu * 2) - task_queue_size - - for _ in range(num_new_tasks): - added = add_to_queue(solutions, task_queue, num_cpu) - if not added: - done_parsing = True - break print(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP") - data_file.close() +