Rewrote multiprocessing as the prior approach was unreliable

This commit is contained in:
Brandon Rozek 2025-04-08 21:10:28 -04:00
parent 154cee0349
commit 502252676f

View file

@ -2,8 +2,8 @@
# NOTE: Perhaps we should use process_cpu_count but that's not available to all Python versions # NOTE: Perhaps we should use process_cpu_count but that's not available to all Python versions
from os import cpu_count from os import cpu_count
from time import sleep
from typing import Dict, Iterator, Optional, Tuple from typing import Dict, Iterator, Optional, Tuple
from queue import Empty as QueueEmpty
import argparse import argparse
import multiprocessing as mp import multiprocessing as mp
@ -38,14 +38,51 @@ def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, Model
def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation): def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation):
""" """
Wrapper so that we can save the model that satisfies VSP. Wrapper which also stores the models along with its vsp result
NOTE: At the time of writing, models that don't satisfy VSP
get discarded from memory for efficiency sake.
""" """
vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation) vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation)
if vsp_result.has_vsp: return (model, vsp_result)
return (model, vsp_result)
return (None, vsp_result) def worker(task_queue, result_queue):
"""
Worker process which processes models from the task
queue and adds the result to the result_queue.
Adds the sentinal value None when exception occurs and when there's
no more to process.
"""
try:
while True:
task = task_queue.get()
# If sentinal value, break
if task is None:
result_queue.put(None)
break
(model, impfunction, mconjunction, mdisjunction, mnegation) = task
result = has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation)
result_queue.put(result)
except Exception:
# Process failed somehow, push sentinal value
result_queue.put(None)
def add_to_queue(gen, queue, num_sentinal_values) -> bool:
"""
Consumes an item from gen and puts
it in the queue.
If there are no items left in gen,
return false and send sentinal values,
otherwise true.
"""
try:
item = next(gen)
queue.put(item)
return True
except StopIteration:
for _ in range(num_sentinal_values):
queue.put(None)
return False
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VSP Checker") parser = argparse.ArgumentParser(description="VSP Checker")
@ -71,52 +108,68 @@ if __name__ == "__main__":
# Set up parallel verification # Set up parallel verification
num_tested = 0 num_tested = 0
num_has_vsp = 0 num_has_vsp = 0
with mp.Pool(processes=num_cpu) as pool:
task_pool = []
done_parsing = False
# Populate initial task pool # Create queues
for _ in range(num_cpu): task_queue = mp.Queue()
try: result_queue = mp.Queue()
model, impfunction, mconjunction, mdisjunction, mnegation = next(solutions)
except StopIteration: # Create worker processes
done_parsing = True processes = []
for _ in range(num_cpu):
p = mp.Process(target=worker, args=(task_queue, result_queue))
processes.append(p)
p.start()
# Populate initial task queue
done_parsing = False
for _ in range(num_cpu):
added = add_to_queue(solutions, task_queue, num_cpu)
if not added:
done_parsing = True
break
# Check results and add new tasks until finished
result_sentinal_count = 0
while True:
# Read a result
try:
result = result_queue.get(True, 60)
except QueueEmpty:
# Health check in case processes crashed
if all((not p.is_alive() for p in processes)):
print("[WARNING] No child processes remain")
break break
result_async = pool.apply_async(has_vsp_plus_model, (model, impfunction, mconjunction, mdisjunction, mnegation)) # Otherwise continue
task_pool.append(result_async) continue
while len(task_pool) > 0: # When we receive None, it means a child process has finished
next_task_pool = [] if result is None:
# Check the status of all the tasks, and spawn result_sentinal_count += 1
# new ones if finished # If all workers have finished break
for result_async in task_pool: if result_sentinal_count == num_cpu:
if result_async.ready(): break
model, vsp_result = result_async.get() continue
print(vsp_result)
num_tested += 1
if args['verbose'] or vsp_result.has_vsp: # Process result
print(model) model, vsp_result = result
print(vsp_result)
num_tested += 1
if vsp_result.has_vsp: if vsp_result.has_vsp:
num_has_vsp += 1 print(model)
if done_parsing: if vsp_result.has_vsp:
continue num_has_vsp += 1
# Submit new task if available
if done_parsing:
continue
added = add_to_queue(solutions, task_queue, num_cpu)
if not added:
done_parsing = True
# Submit new task if available
try:
model, impfunction, mconjunction, mdisjunction, mnegation = next(solutions)
next_result_async = pool.apply_async(has_vsp_plus_model, (model, impfunction, mconjunction, mdisjunction, mnegation))
next_task_pool.append(next_result_async)
except StopIteration:
done_parsing = True
else:
# Otherwise the task is still working,
# add it to the next task pool
next_task_pool.append(result_async)
task_pool = next_task_pool
sleep(0.01)
print(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP") print(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")