Redid parallel implementation

- Made parse_matrices into a generator
- Keep track of num_proccesses results and spawn new ones when done
This commit is contained in:
Brandon Rozek 2025-03-18 18:08:28 -04:00
parent b1452ac672
commit 4412b6c2da
2 changed files with 121 additions and 47 deletions

View file

@ -4,7 +4,7 @@ Parses the Magic Ugly Data File Format
Assumes the base logic is R with no extra connectives
"""
import re
from typing import TextIO, List, Optional, Tuple, Set, Dict
from typing import TextIO, List, Iterator, Optional, Tuple, Set, Dict
from model import Model, ModelValue, ModelFunction, OrderTable
from logic import (
@ -167,8 +167,7 @@ def derive_stages(header: UglyHeader) -> Stages:
return stages
def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]:
solutions = []
def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation, ModelFunction]]]:
header = parse_header(infile)
stages = derive_stages(header)
first_run = True
@ -179,7 +178,7 @@ def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]:
case "end":
break
case "process_model":
process_model(stages.name(), current_model_parts, solutions)
yield process_model(stages.name(), current_model_parts)
stage = stage.next
case "size":
processed = process_sizes(infile, current_model_parts, first_run)
@ -245,8 +244,6 @@ def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]:
stages.reset_after(stage.name)
stage = stage.previous
return solutions
def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool:
try:
size = parse_size(infile, first_run)
@ -325,7 +322,7 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
current_model_parts.custom_model_functions[symbol] = mfunction
return True
def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Model, Dict]]):
def process_model(model_name: str, mp: ModelBuilder) -> Tuple[Model, Dict[Operation, ModelFunction]]:
"""Create Model"""
assert mp.size > 0
assert mp.size + 1 == len(mp.carrier_set)
@ -333,7 +330,6 @@ def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Mode
assert mp.mimplication is not None
logical_operations = { mp.mimplication }
model = Model(mp.carrier_set, logical_operations, mp.designated_values, ordering=mp.ordering, name=model_name)
interpretation = {
Implication: mp.mimplication
}
@ -355,8 +351,10 @@ def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Mode
logical_operations.add(custom_mf)
op = Operation(custom_mf.operation_name, custom_mf.arity)
interpretation[op] = custom_mf
model = Model(mp.carrier_set, logical_operations, mp.designated_values, ordering=mp.ordering, name=model_name)
return (model, interpretation)
solutions.append((model, interpretation))
def parse_header(infile: SourceFile) -> UglyHeader:
"""

View file

@ -1,11 +1,69 @@
#!/usr/bin/env python3
from os import cpu_count
from os import process_cpu_count
from time import sleep
from typing import Dict, Iterator, Optional, Tuple
import argparse
import multiprocessing as mp
from logic import Conjunction, Disjunction, Negation, Implication
from logic import Conjunction, Disjunction, Negation, Implication, Operation
from model import Model, ModelFunction
from parse_magic import SourceFile, parse_matrices
from vsp import has_vsp, VSP_Result
from vsp import has_vsp
def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], args) -> \
Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]:
"""
When subprocess gets spawned, the logical operations will
have a different memory address than what's expected in interpretation.
Therefore, we need to pass the model functions directly instead.
While we're at it, filter out models until we get to --skip-to
if applicable.
"""
start_processing = args.get("skip_to") is None
for model, interpretation in solutions:
# If skip_to is defined, then don't process models
# until then.
if not start_processing and model.name == args.get("skip_to"):
start_processing = True
if not start_processing:
continue
impfunction = interpretation[Implication]
mconjunction = interpretation.get(Conjunction)
mdisjunction = interpretation.get(Disjunction)
mnegation = interpretation.get(Negation)
yield (model, impfunction, mconjunction, mdisjunction, mnegation)
def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation):
"""
Wrapper so that we can save the model that satisfies VSP.
NOTE: At the time of writing, models that don't satisfy VSP
get discarded from memory for efficiency sake.
"""
vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation)
if vsp_result.has_vsp:
return (model, vsp_result)
return (None, vsp_result)
def create_chunks(data, chunk_size: int):
"""
Takes a stream of data and creates a new
stream where each element is a "chunk" of
several primitive elements.
Ex: create_chunks((1, 2, 3, 4, 5, 6), 2) ->
((1, 2), (3, 4), (5, 6))
"""
chunk = []
for item in data:
chunk.append(item)
if len(chunk) == chunk_size:
yield tuple(chunk)
chunk = []
if len(chunk) > 0:
yield tuple(chunk)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VSP Checker")
@ -19,47 +77,65 @@ if __name__ == "__main__":
if data_file_path is None:
data_file_path = input("Path to MaGIC Ugly Data File: ")
solutions = []
with open(data_file_path, "r") as data_file:
solutions = parse_matrices(SourceFile(data_file))
print(f"Parsed {len(solutions)} matrices")
data_file = open(data_file_path, "r")
start_processing = args.get("skip_to") is None
solutions = parse_matrices(SourceFile(data_file))
solutions = restructure_solutions(solutions, args)
# NOTE: When subprocess gets spawned, the logical operations will
# have a different memory address than what's expected in interpretation.
# Therefore, we need to pass the model functions directly instead.
solutions_expanded = []
for model, interpretation in solutions:
# If skip_to is defined, then don't process models
# until then.
if not start_processing and model.name == args.get("skip_to"):
start_processing = True
if not start_processing:
continue
impfunction = interpretation[Implication]
mconjunction = interpretation.get(Conjunction)
mdisjunction = interpretation.get(Disjunction)
mnegation = interpretation.get(Negation)
solutions_expanded.append((model, impfunction, mconjunction, mdisjunction, mnegation))
num_cpu = args.get("c")
if num_cpu is None:
num_cpu = max(process_cpu_count() - 2, 1)
# solution_chunks = create_chunks(solutions, num_cpu * 2)
# Set up parallel verification
num_tested = 0
num_has_vsp = 0
num_cpu = args.get("c", max(cpu_count() - 2, 1))
with mp.Pool(processes=num_cpu) as pool:
results = [
pool.apply_async(has_vsp, (model, impfunction, mconjunction, mdisjunction, mnegation))
for model, impfunction, mconjunction, mdisjunction, mnegation in solutions_expanded
]
task_pool = []
done_parsing = False
for i, result in enumerate(results):
vsp_result: VSP_Result = result.get()
print(vsp_result)
# Populate initial task pool
for _ in range(num_cpu):
try:
model, impfunction, mconjunction, mdisjunction, mnegation = next(solutions)
except StopIteration:
done_parsing = True
break
result_async = pool.apply_async(has_vsp_plus_model, (model, impfunction, mconjunction, mdisjunction, mnegation))
task_pool.append(result_async)
if args['verbose'] or vsp_result.has_vsp:
model = solutions_expanded[i][0]
print(model)
while len(task_pool) > 0:
next_task_pool = []
# Check the status of all the tasks, and spawn
# new ones if finished
for result_async in task_pool:
if result_async.ready():
model, vsp_result = result_async.get()
print(vsp_result)
num_tested += 1
if vsp_result.has_vsp:
num_has_vsp += 1
if args['verbose'] or vsp_result.has_vsp:
print(model)
print(f"Tested {len(solutions_expanded)} models, {num_has_vsp} of which satisfy VSP")
if vsp_result.has_vsp:
num_has_vsp += 1
if done_parsing:
continue
# Submit new task if available
try:
model, impfunction, mconjunction, mdisjunction, mnegation = next(solutions)
next_result_async = pool.apply_async(has_vsp_plus_model, (model, impfunction, mconjunction, mdisjunction, mnegation))
next_task_pool.append(next_result_async)
except StopIteration:
done_parsing = True
else:
next_task_pool.append(result_async)
task_pool = next_task_pool
sleep(0.01)
print(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
data_file.close()