From 4412b6c2dab59313fa7eb39e7de4cde24b29c172 Mon Sep 17 00:00:00 2001 From: Brandon Rozek Date: Tue, 18 Mar 2025 18:08:28 -0400 Subject: [PATCH] Redid parallel implementation - Made parse_matrices into a generator - Keep track of num_proccesses results and spawn new ones when done --- parse_magic.py | 16 +++--- vspursuer.py | 152 ++++++++++++++++++++++++++++++++++++------------- 2 files changed, 121 insertions(+), 47 deletions(-) diff --git a/parse_magic.py b/parse_magic.py index 7980741..b693c31 100644 --- a/parse_magic.py +++ b/parse_magic.py @@ -4,7 +4,7 @@ Parses the Magic Ugly Data File Format Assumes the base logic is R with no extra connectives """ import re -from typing import TextIO, List, Optional, Tuple, Set, Dict +from typing import TextIO, List, Iterator, Optional, Tuple, Set, Dict from model import Model, ModelValue, ModelFunction, OrderTable from logic import ( @@ -167,8 +167,7 @@ def derive_stages(header: UglyHeader) -> Stages: return stages -def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]: - solutions = [] +def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation, ModelFunction]]]: header = parse_header(infile) stages = derive_stages(header) first_run = True @@ -179,7 +178,7 @@ def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]: case "end": break case "process_model": - process_model(stages.name(), current_model_parts, solutions) + yield process_model(stages.name(), current_model_parts) stage = stage.next case "size": processed = process_sizes(infile, current_model_parts, first_run) @@ -245,8 +244,6 @@ def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]: stages.reset_after(stage.name) stage = stage.previous - return solutions - def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool: try: size = parse_size(infile, first_run) @@ -325,7 +322,7 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur current_model_parts.custom_model_functions[symbol] = mfunction return True -def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Model, Dict]]): +def process_model(model_name: str, mp: ModelBuilder) -> Tuple[Model, Dict[Operation, ModelFunction]]: """Create Model""" assert mp.size > 0 assert mp.size + 1 == len(mp.carrier_set) @@ -333,7 +330,6 @@ def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Mode assert mp.mimplication is not None logical_operations = { mp.mimplication } - model = Model(mp.carrier_set, logical_operations, mp.designated_values, ordering=mp.ordering, name=model_name) interpretation = { Implication: mp.mimplication } @@ -355,8 +351,10 @@ def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Mode logical_operations.add(custom_mf) op = Operation(custom_mf.operation_name, custom_mf.arity) interpretation[op] = custom_mf + + model = Model(mp.carrier_set, logical_operations, mp.designated_values, ordering=mp.ordering, name=model_name) + return (model, interpretation) - solutions.append((model, interpretation)) def parse_header(infile: SourceFile) -> UglyHeader: """ diff --git a/vspursuer.py b/vspursuer.py index fc4d5f8..864568d 100755 --- a/vspursuer.py +++ b/vspursuer.py @@ -1,11 +1,69 @@ #!/usr/bin/env python3 -from os import cpu_count +from os import process_cpu_count +from time import sleep +from typing import Dict, Iterator, Optional, Tuple import argparse import multiprocessing as mp -from logic import Conjunction, Disjunction, Negation, Implication +from logic import Conjunction, Disjunction, Negation, Implication, Operation +from model import Model, ModelFunction from parse_magic import SourceFile, parse_matrices -from vsp import has_vsp, VSP_Result +from vsp import has_vsp + +def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], args) -> \ + Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]: + """ + When subprocess gets spawned, the logical operations will + have a different memory address than what's expected in interpretation. + Therefore, we need to pass the model functions directly instead. + + While we're at it, filter out models until we get to --skip-to + if applicable. + """ + start_processing = args.get("skip_to") is None + for model, interpretation in solutions: + # If skip_to is defined, then don't process models + # until then. + if not start_processing and model.name == args.get("skip_to"): + start_processing = True + if not start_processing: + continue + impfunction = interpretation[Implication] + mconjunction = interpretation.get(Conjunction) + mdisjunction = interpretation.get(Disjunction) + mnegation = interpretation.get(Negation) + yield (model, impfunction, mconjunction, mdisjunction, mnegation) + +def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation): + """ + Wrapper so that we can save the model that satisfies VSP. + NOTE: At the time of writing, models that don't satisfy VSP + get discarded from memory for efficiency sake. + """ + vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation) + if vsp_result.has_vsp: + return (model, vsp_result) + return (None, vsp_result) + +def create_chunks(data, chunk_size: int): + """ + Takes a stream of data and creates a new + stream where each element is a "chunk" of + several primitive elements. + Ex: create_chunks((1, 2, 3, 4, 5, 6), 2) -> + ((1, 2), (3, 4), (5, 6)) + """ + chunk = [] + for item in data: + chunk.append(item) + if len(chunk) == chunk_size: + yield tuple(chunk) + chunk = [] + + if len(chunk) > 0: + yield tuple(chunk) + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="VSP Checker") @@ -19,47 +77,65 @@ if __name__ == "__main__": if data_file_path is None: data_file_path = input("Path to MaGIC Ugly Data File: ") - solutions = [] - with open(data_file_path, "r") as data_file: - solutions = parse_matrices(SourceFile(data_file)) - print(f"Parsed {len(solutions)} matrices") + data_file = open(data_file_path, "r") - start_processing = args.get("skip_to") is None + solutions = parse_matrices(SourceFile(data_file)) + solutions = restructure_solutions(solutions, args) - # NOTE: When subprocess gets spawned, the logical operations will - # have a different memory address than what's expected in interpretation. - # Therefore, we need to pass the model functions directly instead. - solutions_expanded = [] - for model, interpretation in solutions: - # If skip_to is defined, then don't process models - # until then. - if not start_processing and model.name == args.get("skip_to"): - start_processing = True - if not start_processing: - continue - impfunction = interpretation[Implication] - mconjunction = interpretation.get(Conjunction) - mdisjunction = interpretation.get(Disjunction) - mnegation = interpretation.get(Negation) - solutions_expanded.append((model, impfunction, mconjunction, mdisjunction, mnegation)) + num_cpu = args.get("c") + if num_cpu is None: + num_cpu = max(process_cpu_count() - 2, 1) + # solution_chunks = create_chunks(solutions, num_cpu * 2) + + # Set up parallel verification + num_tested = 0 num_has_vsp = 0 - num_cpu = args.get("c", max(cpu_count() - 2, 1)) with mp.Pool(processes=num_cpu) as pool: - results = [ - pool.apply_async(has_vsp, (model, impfunction, mconjunction, mdisjunction, mnegation)) - for model, impfunction, mconjunction, mdisjunction, mnegation in solutions_expanded - ] + task_pool = [] + done_parsing = False - for i, result in enumerate(results): - vsp_result: VSP_Result = result.get() - print(vsp_result) + # Populate initial task pool + for _ in range(num_cpu): + try: + model, impfunction, mconjunction, mdisjunction, mnegation = next(solutions) + except StopIteration: + done_parsing = True + break + result_async = pool.apply_async(has_vsp_plus_model, (model, impfunction, mconjunction, mdisjunction, mnegation)) + task_pool.append(result_async) - if args['verbose'] or vsp_result.has_vsp: - model = solutions_expanded[i][0] - print(model) + while len(task_pool) > 0: + next_task_pool = [] + # Check the status of all the tasks, and spawn + # new ones if finished + for result_async in task_pool: + if result_async.ready(): + model, vsp_result = result_async.get() + print(vsp_result) + num_tested += 1 - if vsp_result.has_vsp: - num_has_vsp += 1 + if args['verbose'] or vsp_result.has_vsp: + print(model) - print(f"Tested {len(solutions_expanded)} models, {num_has_vsp} of which satisfy VSP") + if vsp_result.has_vsp: + num_has_vsp += 1 + + if done_parsing: + continue + + # Submit new task if available + try: + model, impfunction, mconjunction, mdisjunction, mnegation = next(solutions) + next_result_async = pool.apply_async(has_vsp_plus_model, (model, impfunction, mconjunction, mdisjunction, mnegation)) + next_task_pool.append(next_result_async) + except StopIteration: + done_parsing = True + else: + next_task_pool.append(result_async) + task_pool = next_task_pool + sleep(0.01) + + print(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP") + + data_file.close()