From 01204a9551bb873f22116745182b77d620dc91a6 Mon Sep 17 00:00:00 2001 From: Brandon Rozek Date: Sat, 3 May 2025 16:42:15 -0400 Subject: [PATCH] Code cleanup --- model.py | 171 ++++++++++++++++++++---------------------- parse_magic.py | 169 ++++++++++++++--------------------------- vsp.py | 200 +++++++++++++++++++++---------------------------- vspursuer.py | 99 +++++++++++++++--------- 4 files changed, 286 insertions(+), 353 deletions(-) diff --git a/model.py b/model.py index 39498f7..b9b3af1 100644 --- a/model.py +++ b/model.py @@ -16,9 +16,9 @@ from typing import Dict, List, Optional, Set, Tuple __all__ = ['ModelValue', 'ModelFunction', 'Model', 'Interpretation'] class ModelValue: - def __init__(self, name): + def __init__(self, name: str, hashed_value: Optional[int] = None): self.name = name - self.hashed_value = hash(self.name) + self.hashed_value = hashed_value if hashed_value is not None else hash(self.name) self.__setattr__ = immutable def __str__(self): return self.name @@ -27,7 +27,7 @@ class ModelValue: def __eq__(self, other): return isinstance(other, ModelValue) and self.name == other.name def __deepcopy__(self, _): - return ModelValue(self.name) + return ModelValue(self.name, self.hashed_value) class ModelFunction: def __init__(self, arity: int, mapping, operation_name = ""): @@ -109,57 +109,75 @@ Interpretation = Dict[Operation, ModelFunction] class OrderTable: def __init__(self): # a : {x | x <= a } - self.ordering: Dict[ModelValue, Set[ModelValue]] = defaultdict(set) + self.le_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set) + # a : {x | x >= a} + self.ge_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set) def add(self, x, y): """ Add x <= y """ - self.ordering[y].add(x) + self.le_map[y].add(x) + self.ge_map[x].add(y) def is_lt(self, x, y): - return y in self.ordering[x] + return x in self.le_map[y] def meet(self, x, y) -> Optional[ModelValue]: - X = self.ordering[x] - Y = self.ordering[y] + X = self.le_map[x] + Y = self.le_map[y] candidates = X.intersection(Y) - for m in candidates: - gt_all_candidates = True - for w in candidates: - if not self.is_lt(w, m): - gt_all_candidates = False - break + # Grab all elements greater than each of the candidates + candidate_ge_maps = (self.ge_map[candidate] for candidate in candidates) + common_ge_values = reduce(set.intersection, candidate_ge_maps) - if gt_all_candidates: - return m + # Intersect with candidates to get the values that satisfy + # the meet properties + result_set = candidates.intersection(common_ge_values) - # Otherwise the meet does not exist - print("Meet does not exist", (x, y), candidates) - return None + # NOTE: The meet may not exist, in which case return None + result = next(iter(result_set), None) + return result def join(self, x, y) -> Optional[ModelValue]: - # Grab the collection of elements greater than x and y - candidates = set() - for w in self.ordering: - if self.is_lt(x, w) and self.is_lt(y, w): - candidates.add(w) + X = self.ge_map[x] + Y = self.ge_map[y] - for j in candidates: - lt_all_candidates = True - for w in candidates: - if not self.is_lt(j, w): - lt_all_candidates = False - break + candidates = X.intersection(Y) - if lt_all_candidates: - return j + # Grab all elements smaller than each of the candidates + candidate_le_maps = (self.le_map[candidate] for candidate in candidates) + common_le_values = reduce(set.intersection, candidate_le_maps) - # Otherwise the join does not exist - print("Join does not exist", (x, y), candidates) - return None + # Intersect with candidatse to get the values that satisfy + # the join properties + result_set = candidates.intersection(common_le_values) + + # NOTE: The join may not exist, in which case return None + result = next(iter(result_set), None) + return result + + def top(self) -> Optional[ModelValue]: + ge_maps = (self.ge_map[candidate] for candidate in self.ge_map) + result_set = reduce(set.intersection, ge_maps) + + # Either not unique or does not exist + if len(result_set) != 1: + return None + + return next(iter(result_set)) + + def bottom(self) -> Optional[ModelValue]: + le_maps = (self.le_map[candidate] for candidate in self.le_map) + result_set = reduce(set.intersection, le_maps) + + # Either not unique or does not exist + if len(result_set) != 1: + return None + + return next(iter(result_set)) class Model: @@ -276,86 +294,61 @@ def satisfiable(logic: Logic, model: Model, interpretation: Dict[Operation, Mode return True - def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction], forbidden_element: Optional[ModelValue]) -> Set[ModelValue]: """ Given an initial set of model values and a set of model functions, compute the complete set of model values that are closed under the operations. - If top or bottom is encountered, then we end the saturation procedure early. + If the forbidden element is encountered, then we end the saturation procedure early. """ closure_set: Set[ModelValue] = initial_set last_new: Set[ModelValue] = initial_set changed: bool = True forbidden_found = False + arities = set() + for mfun in mfunctions: + arities.add(mfun.arity) + while changed: changed = False new_elements: Set[ModelValue] = set() old_closure: Set[ModelValue] = closure_set - last_new # arity -> args - cached_args = defaultdict(list) + args_by_arity = defaultdict(list) - # Pass elements into each model function - for mfun in mfunctions: - - # If a previous function shared the same arity, - # we'll use the same set of computed arguments - # to pass into the model functions. - if mfun.arity in cached_args: - for args in cached_args[mfun.arity]: - # Compute the new elements - # given the cached arguments. - element = mfun(*args) - if element not in closure_set: - new_elements.add(element) - - # Optimization: Break out of computation - # early when forbidden element is found - if forbidden_element is not None and element == forbidden_element: - forbidden_found = True - break - - if forbidden_found: - break - - # We don't need to compute the arguments - # thanks to the cache, so move onto the - # next function. - continue - - # At this point, we don't have cached arguments, so we need - # to compute this set. - - # Each argument must have at least one new element to not repeat - # work. We'll range over the number of new model values within our - # argument. - for num_new in range(1, mfun.arity + 1): + # Motivation: We want to only compute arguments that we have not + # seen before + for arity in arities: + for num_new in range(1, arity + 1): new_args = combinations_with_replacement(last_new, r=num_new) - old_args = combinations_with_replacement(old_closure, r=mfun.arity - num_new) - # Determine every possible ordering of the concatenated - # new and old model values. + old_args = combinations_with_replacement(old_closure, r=arity - num_new) + # Determine every possible ordering of the concatenated new and + # old model values. for new_arg, old_arg in product(new_args, old_args): - for args in permutations(new_arg + old_arg): - cached_args[mfun.arity].append(args) - element = mfun(*args) - if element not in closure_set: - new_elements.add(element) + for combined_args in permutations(new_arg + old_arg): + args_by_arity[arity].append(combined_args) - # Optimization: Break out of computation - # early when forbidden element is found - if forbidden_element is not None and element == forbidden_element: - forbidden_found = True - break - if forbidden_found: - break + # Pass each argument into each model function + for mfun in mfunctions: + for args in args_by_arity[mfun.arity]: + # Compute the new elements + # given the cached arguments. + element = mfun(*args) + if element not in closure_set: + new_elements.add(element) - if forbidden_found: + # Optimization: Break out of computation + # early when forbidden element is found + if forbidden_element is not None and element == forbidden_element: + forbidden_found = True break + if forbidden_found: + break closure_set.update(new_elements) changed = len(new_elements) > 0 diff --git a/parse_magic.py b/parse_magic.py index 758d735..78fc495 100644 --- a/parse_magic.py +++ b/parse_magic.py @@ -72,6 +72,45 @@ class ModelBuilder: # Map symbol to model function self.custom_model_functions: Dict[str, ModelFunction] = {} + def build(self, model_name: str) -> Tuple[Model, Dict[Operation, ModelFunction]]: + """Create Model""" + assert self.size > 0 + assert self.size + 1 == len(self.carrier_list) + assert len(self.designated_values) <= len(self.carrier_list) + assert self.mimplication is not None + + # Implication is required to be present + logical_operations = { self.mimplication } + interpretation = { + Implication: self.mimplication + } + + # Other model functions and logical + # operations are optional + if self.mnegation is not None: + logical_operations.add(self.mnegation) + interpretation[Negation] = self.mnegation + if self.mconjunction is not None: + logical_operations.add(self.mconjunction) + interpretation[Conjunction] = self.mconjunction + if self.mdisjunction is not None: + logical_operations.add(self.mdisjunction) + interpretation[Disjunction] = self.mdisjunction + if self.mnecessitation is not None: + logical_operations.add(self.mnecessitation) + interpretation[Necessitation] = self.mnecessitation + + # Custom model function definitions + for custom_mf in self.custom_model_functions.values(): + if custom_mf is not None: + logical_operations.add(custom_mf) + op = Operation(custom_mf.operation_name, custom_mf.arity) + interpretation[op] = custom_mf + + model = Model(set(self.carrier_list), logical_operations, self.designated_values, ordering=self.ordering, name=model_name) + return (model, interpretation) + + class Stage: def __init__(self, name: str): self.name = name @@ -194,7 +233,7 @@ def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation, case "end": break case "process_model": - yield process_model(stages.name(), current_model_parts) + yield current_model_parts.build(stages.name()) stage = stage.next case "size": processed = process_sizes(infile, current_model_parts, first_run) @@ -300,7 +339,7 @@ def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> boo def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: """Stage 4""" - designated_values = parse_single_designated(infile, current_model_parts.size) + designated_values = parse_single_designated(infile, current_model_parts.size, current_model_parts.carrier_list) if designated_values is None: return False @@ -309,7 +348,7 @@ def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) - def process_implications(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: """Stage 5""" - mimplication = parse_single_dyadic_connective(infile, "→", current_model_parts.size) + mimplication = parse_single_dyadic_connective(infile, "→", current_model_parts.size, current_model_parts.carrier_list) if mimplication is None: return False @@ -330,7 +369,7 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur elif adicity == 1: mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list) elif adicity == 2: - mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size) + mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list) else: raise NotImplementedError("Unable to process connectives of adicity greater than 2") @@ -340,39 +379,6 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur current_model_parts.custom_model_functions[symbol] = mfunction return True -def process_model(model_name: str, mp: ModelBuilder) -> Tuple[Model, Dict[Operation, ModelFunction]]: - """Create Model""" - assert mp.size > 0 - assert mp.size + 1 == len(mp.carrier_list) - assert len(mp.designated_values) <= len(mp.carrier_list) - assert mp.mimplication is not None - - logical_operations = { mp.mimplication } - interpretation = { - Implication: mp.mimplication - } - if mp.mnegation is not None: - logical_operations.add(mp.mnegation) - interpretation[Negation] = mp.mnegation - if mp.mconjunction is not None: - logical_operations.add(mp.mconjunction) - interpretation[Conjunction] = mp.mconjunction - if mp.mdisjunction is not None: - logical_operations.add(mp.mdisjunction) - interpretation[Disjunction] = mp.mdisjunction - if mp.mnecessitation is not None: - logical_operations.add(mp.mnecessitation) - interpretation[Necessitation] = mp.mnecessitation - - for custom_mf in mp.custom_model_functions.values(): - if custom_mf is not None: - logical_operations.add(custom_mf) - op = Operation(custom_mf.operation_name, custom_mf.arity) - interpretation[op] = custom_mf - - model = Model(set(mp.carrier_list), logical_operations, mp.designated_values, ordering=mp.ordering, name=model_name) - return (model, interpretation) - def parse_header(infile: SourceFile) -> UglyHeader: """ @@ -406,7 +412,6 @@ def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]: """ Parse the line representing the matrix size. """ - size = int(infile.next_line()) # HACK: When necessitation and custom connectives are enabled @@ -417,7 +422,9 @@ def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]: if size == -1: return None + assert size > 0, f"Unexpected size at line {infile.line_in_file}" + return size def mvalue_from_index(i: int) -> ModelValue: @@ -433,55 +440,9 @@ def parse_mvalue(x: str) -> ModelValue: """ return mvalue_from_index(int(x)) -def determine_cresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue: - """ - Determine what a ∧ b should be given the ordering table. - """ - for i in range(size + 1): - c = mvalue_from_index(i) - if not ordering[(c, a)]: - continue - if not ordering[(c, b)]: - continue - - invalid = False - for j in range(size + 1): - d = mvalue_from_index(j) - if c == d: - continue - if ordering[(c, d)]: - if ordering[(d, a)] and ordering [(d, b)]: - invalid = True - - if not invalid: - return c - -def determine_dresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue: - """ - Determine what a ∨ b should be given the ordering table. - """ - for i in range(size + 1): - c = mvalue_from_index(i) - if not ordering[(a, c)]: - continue - if not ordering[(b, c)]: - continue - - invalid = False - - for j in range(size + 1): - d = mvalue_from_index(j) - if d == c: - continue - if ordering[(d, c)]: - if ordering[(a, d)] and ordering[(b, d)]: - invalid = True - - if not invalid: - return c - -def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[Tuple[OrderTable, ModelFunction, ModelFunction]]: +def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[ + Tuple[OrderTable, Optional[ModelFunction], Optional[ModelFunction]]]: """ Parse the line representing the ordering table """ @@ -509,21 +470,12 @@ def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelVa for x, y in product(carrier_list, carrier_list): cresult = ordering.meet(x, y) if cresult is None: - print("[Warning] Conjunction and Disjunction are not well-defined") - print(f"{x} ∧ {y} = ??") - return None, None - else: - print(f"{x} ∧ {y} = {cresult}") + return ordering, None, None cmapping[(x, y)] = cresult dresult = ordering.join(x, y) - # dresult = determine_dresult(size, omapping, x, y) if dresult is None: - print("[Warning] Conjunction and Disjunction are not well-defined") - print(f"{x} ∨ {y} = ??") - return None, None - else: - print(f"{x} ∨ {y} = {dresult}") + return ordering, None, None dmapping[(x, y)] = dresult mconjunction = ModelFunction(2, cmapping, "∧") @@ -531,7 +483,7 @@ def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelVa return ordering, mconjunction, mdisjunction -def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[ModelValue]]: +def parse_single_designated(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[Set[ModelValue]]: """ Parse the line representing which model values are designated. """ @@ -544,9 +496,8 @@ def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[Model designated_values = set() - for i, j in zip(range(size + 1), row): + for x, j in zip(carrier_list, row): if j == '1': - x = mvalue_from_index(i) designated_values.add(x) return designated_values @@ -579,7 +530,7 @@ def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int, return ModelFunction(1, mapping, symbol) -def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]: +def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]: first_token = next(infile) if first_token == "-1": return None @@ -588,20 +539,14 @@ def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) - try: table = [first_token] + [next(infile) for _ in range((size + 1)**2 - 1)] except StopIteration: - pass - - assert len(table) == (size + 1)**2, f"{symbol} table does not match expected size at line {infile.line_in_file}" + raise Exception(f"{symbol} table does not match expected size at line {infile.line_in_file}") mapping = {} table_i = 0 - for i in range(size + 1): - x = mvalue_from_index(i) - for j in range(size + 1): - y = mvalue_from_index(j) - r = parse_mvalue(table[table_i]) - table_i += 1 - - mapping[(x, y)] = r + for x, y in product(carrier_list, carrier_list): + r = parse_mvalue(table[table_i]) + table_i += 1 + mapping[(x, y)] = r return ModelFunction(2, mapping, symbol) diff --git a/vsp.py b/vsp.py index 19b3d5d..1efd84c 100644 --- a/vsp.py +++ b/vsp.py @@ -2,6 +2,7 @@ Check to see if the model has the variable sharing property. """ +from collections import defaultdict from itertools import chain, combinations, product from typing import List, Optional, Set, Tuple from common import set_to_str @@ -9,75 +10,36 @@ from model import ( Model, model_closure, ModelFunction, ModelValue, OrderTable ) -def preseed( - initial_set: Set[ModelValue], - cache:List[Tuple[Set[ModelValue], Set[ModelValue]]]): - """ - Given a cache of previous model_closure calls, - use this to compute an initial model closure - set based on the initial set. +class Cache: + def __init__(self): + # input size -> cached (inputs, outputs) + self.c = defaultdict(list) - Basic Idea: - Let {1, 2, 3} -> X be in the cache. - If {1,2,3} is a subset of initial set, - then X is the subset of the output of model_closure. + def add(self, i: Set[ModelValue], o: Set[ModelValue]): + self.c[len(i)].append((i, o)) - This is used to speed up subsequent calls to model_closure - """ - candidate_preseed: Tuple[Set[ModelValue], int] = (None, None) + def get_closest(self, initial_set: Set[ModelValue]) -> Optional[Tuple[Set[ModelValue], bool]]: + """ + Iterate through our cache starting with the cached + inputs closest in size to the initial_set and + find the one that's a subset of initial_set. - for i, o in cache: - if i < initial_set: - cost = len(initial_set - i) - # If i is a subset with less missing elements than - # the previous candidate, then it's the new candidate. - if candidate_preseed[1] is None or cost < candidate_preseed[1]: - candidate_preseed = o, cost + Returns cached_output, and whether the initial_set is the same + as the cached_input. + """ + initial_set_size = len(initial_set) + sizes = range(initial_set_size, 0, -1) - same_set = candidate_preseed[1] == 0 - return candidate_preseed[0], same_set + for size in sizes: + if size not in self.c: + continue + for cached_input, cached_output in self.c[size]: + if cached_input <= initial_set: + return cached_output, size == initial_set_size -def find_top(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]: - """ - Find the top of the order lattice. - T || a = T, T && a = a for all a in the carrier set - """ - if mconjunction is None or mdisjunction is None: return None - for x in algebra: - is_top = True - for y in algebra: - if mdisjunction(x, y) != x or mconjunction(x, y) != y: - is_top = False - break - if is_top: - return x - - print("[Warning] Failed to find the top of the lattice") - return None - -def find_bottom(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]: - """ - Find the bottom of the order lattice - F || a = a, F && a = F for all a in the carrier set - """ - if mconjunction is None or mdisjunction is None: - return None - - for x in algebra: - is_bottom = True - for y in algebra: - if mdisjunction(x, y) != y or mconjunction(x, y) != x: - is_bottom = False - break - if is_bottom: - return x - - print("[Warning] Failed to find the bottom of the lattice") - return None - def order_dependent(subalgebra1: Set[ModelValue], subalegbra2: Set[ModelValue], ordering: OrderTable): """ Returns true if there exists a value in subalgebra1 that's less than a value in subalgebra2 @@ -106,17 +68,49 @@ Subalgebra 1: {set_to_str(self.subalgebra1)} Subalgebra 2: {set_to_str(self.subalgebra2)} """ +def quick_vsp_unsat_incomplete(xs, ys, model, top, bottom, negation_defined) -> bool: + """ + Return True if VSP cannot be satisfied + through some incomplete checks. + """ + # If the left subalgebra contains bottom + # or the right subalgebra contains top + # skip this pair + if top is not None and top in ys: + return True + if bottom is not None and bottom in xs: + return True + + # If a subalgebra doesn't have at least one + # designated value, move onto the next pair. + # Depends on no intersection between xs and ys + if xs.isdisjoint(model.designated_values): + return True + + if ys.isdisjoint(model.designated_values): + return True + + # If the two subalgebras intersect, move + # onto the next pair. + if not xs.isdisjoint(ys): + return True + + # If the subalgebras are order-dependent, skip this pair + if order_dependent(xs, ys, model.ordering): + return True + if negation_defined and order_dependent(ys, xs, model.ordering): + return True + + # We can't immediately rule out that these + # subalgebras don't exhibit VSP + return False + def has_vsp(model: Model, impfunction: ModelFunction, - mconjunction: Optional[ModelFunction] = None, - mdisjunction: Optional[ModelFunction] = None, - mnegation: Optional[ModelFunction] = None) -> VSP_Result: + negation_defined: bool) -> VSP_Result: """ Checks whether a model has the variable sharing property. """ - top = find_top(model.carrier_set, mconjunction, mdisjunction) - bottom = find_bottom(model.carrier_set, mconjunction, mdisjunction) - # NOTE: No models with only one designated # value satisfies VSP if len(model.designated_values) == 1: @@ -124,68 +118,44 @@ def has_vsp(model: Model, impfunction: ModelFunction, assert model.ordering is not None, "Expected ordering table in model" + top = model.ordering.top() + bottom = model.ordering.bottom() + # Compute I the set of tuples (x, y) where # x -> y does not take a designiated value - I: Set[Tuple[ModelValue, ModelValue]] = set() + I: List[Tuple[ModelValue, ModelValue]] = [] for (x, y) in product(model.carrier_set, model.carrier_set): if impfunction(x, y) not in model.designated_values: - I.add((x, y)) + I.append((x, y)) # Construct the powerset of I without the empty set - s = list(I) - I_power = chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1)) + I_power = chain.from_iterable(combinations(I, r) for r in range(1, len(I) + 1)) # ((x1, y1)), ((x1, y1), (x2, y2)), ... - # Closure cache - closure_cache: List[Tuple[Set[ModelValue], Set[ModelValue]]] = [] + closure_cache = Cache() # Find the subalgebras which falsify implication for xys in I_power: - xs = {xy[0] for xy in xys} + xs = { xy[0] for xy in xys } + ys = { xy[1] for xy in xys } + + if quick_vsp_unsat_incomplete(xs, ys, model, top, bottom, negation_defined): + continue + orig_xs = xs - cached_xs = preseed(xs, closure_cache) - if cached_xs[0] is not None: + cached_xs = closure_cache.get_closest(xs) + if cached_xs is not None: xs |= cached_xs[0] - ys = {xy[1] for xy in xys} orig_ys = ys - cached_ys = preseed(ys, closure_cache) - if cached_ys[0] is not None: + cached_ys = closure_cache.get_closest(ys) + if cached_ys is not None: ys |= cached_ys[0] - - # NOTE: Optimziation before model_closure - # If the two subalgebras intersect, move - # onto the next pair. - if len(xs & ys) > 0: - continue - - # NOTE: Optimization - # If a subalgebra doesn't have at least one - # designated value, move onto the next pair. - # Depends on no intersection between xs and ys - if len(xs & model.designated_values) == 0: - continue - - if len(ys & model.designated_values) == 0: - continue - - # NOTE: Optimization - # If the left subalgebra contains bottom - # or the right subalgebra contains top - # skip this pair - if top is not None and top in ys: - continue - if bottom is not None and bottom in xs: - continue - - # NOTE: Optimization - # If the subalgebras are order-dependent, skip this pair - if order_dependent(xs, ys, model.ordering): - continue - if mnegation is not None and order_dependent(ys, xs, model.ordering): + xs_ys_updated = cached_xs is not None or cached_ys is not None + if xs_ys_updated and quick_vsp_unsat_incomplete(xs, ys, model, top, bottom, negation_defined): continue # Compute the closure of all operations @@ -193,8 +163,8 @@ def has_vsp(model: Model, impfunction: ModelFunction, carrier_set_left: Set[ModelValue] = model_closure(xs, model.logical_operations, bottom) # Save to cache - if cached_xs[0] is not None and not cached_ys[1]: - closure_cache.append((orig_xs, carrier_set_left)) + if cached_xs is None or (cached_xs is not None and not cached_xs[1]): + closure_cache.add(orig_xs, carrier_set_left) if bottom is not None and bottom in carrier_set_left: continue @@ -204,15 +174,15 @@ def has_vsp(model: Model, impfunction: ModelFunction, carrier_set_right: Set[ModelValue] = model_closure(ys, model.logical_operations, top) # Save to cache - if cached_ys[0] is not None and not cached_ys[1]: - closure_cache.append((orig_ys, carrier_set_right)) + if cached_ys is None or (cached_ys is not None and not cached_ys[1]): + closure_cache.add(orig_ys, carrier_set_right) if top is not None and top in carrier_set_right: continue # If the carrier set intersects, then move on to the next # subalgebra - if len(carrier_set_left & carrier_set_right) > 0: + if not carrier_set_left.isdisjoint(carrier_set_right): continue # See if for all pairs in the subalgebras, that diff --git a/vspursuer.py b/vspursuer.py index 284c0ef..02047e3 100755 --- a/vspursuer.py +++ b/vspursuer.py @@ -1,17 +1,20 @@ #!/usr/bin/env python3 -# NOTE: Perhaps we should use process_cpu_count but that's not available to all Python versions -from os import cpu_count +from datetime import datetime from typing import Dict, Iterator, Optional, Tuple from queue import Empty as QueueEmpty import argparse import multiprocessing as mp -from logic import Conjunction, Disjunction, Negation, Implication, Operation +from logic import Negation, Implication, Operation from model import Model, ModelFunction from parse_magic import SourceFile, parse_matrices from vsp import has_vsp, VSP_Result +def print_with_timestamp(message): + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{current_time}] {message}") + def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], skip_to: Optional[str]) -> \ Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]: """ @@ -32,17 +35,15 @@ def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, Model # NOTE: Implication must be defined, the rest may not impfunction = interpretation[Implication] - mconjunction = interpretation.get(Conjunction) - mdisjunction = interpretation.get(Disjunction) - mnegation = interpretation.get(Negation) - yield (model, impfunction, mconjunction, mdisjunction, mnegation) + negation_defined = Negation in interpretation + yield (model, impfunction, negation_defined) -def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation) -> Tuple[Optional[Model], VSP_Result]: +def has_vsp_plus_model(model, impfunction, negation_defined) -> Tuple[Optional[Model], VSP_Result]: """ Wrapper which also stores the models along with its vsp result """ - vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation) - # NOTE: Memory optimization - Don't return model if it doens't have VSP + vsp_result = has_vsp(model, impfunction, negation_defined) + # NOTE: Memory optimization - Don't return model if it doesn't have VSP model = model if vsp_result.has_vsp else None return (model, vsp_result) @@ -60,8 +61,8 @@ def worker_vsp(task_queue: mp.Queue, result_queue: mp.Queue): # If sentinal value, break if task is None: break - (model, impfunction, mconjunction, mdisjunction, mnegation) = task - result = has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation) + (model, impfunction, negation_defined) = task + result = has_vsp_plus_model(model, impfunction, negation_defined) result_queue.put(result) finally: # Either an exception occured or the worker finished @@ -91,37 +92,22 @@ def worker_parser(data_file_path: str, num_sentinal_values: int, task_queue: mp. for _ in range(num_sentinal_values): task_queue.put(None) +def multi_process_runner(num_cpu: int, data_file_path: str, skip_to: Optional[str]): + """ + Run VSPursuer in a multi-process configuration. + """ + assert num_cpu > 1 - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="VSP Checker") - parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices") - parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file") - parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: MAX - 2.") - parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.") - args = vars(parser.parse_args()) - - data_file_path = args.get("i") - if data_file_path is None: - data_file_path = input("Path to MaGIC Ugly Data File: ") - - - num_cpu = args.get("c") - if num_cpu is None: - num_cpu = max(cpu_count() - 2, 1) - - # Set up parallel verification num_tested = 0 num_has_vsp = 0 - num_workers = max(num_cpu - 1, 1) + num_workers = num_cpu - 1 # Create queues task_queue = mp.Queue(maxsize=1000) result_queue = mp.Queue() # Create dedicated process to parse the MaGIC file - process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, args.get("skip_to"))) + process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, skip_to)) process_parser.start() # Create dedicated processes which check VSP @@ -135,7 +121,6 @@ if __name__ == "__main__": # Check results and add new tasks until finished result_sentinal_count = 0 while True: - # Read a result try: result = result_queue.get(True, 60) @@ -171,7 +156,7 @@ if __name__ == "__main__": # Process result model, vsp_result = result - print(vsp_result) + print_with_timestamp(vsp_result) num_tested += 1 if vsp_result.has_vsp: @@ -180,7 +165,47 @@ if __name__ == "__main__": if vsp_result.has_vsp: num_has_vsp += 1 + print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP") - print(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP") +def single_process_runner(data_file_path: str, skip_to: Optional[str]): + num_tested = 0 + num_has_vsp = 0 + + data_file = open(data_file_path, "r") + solutions = parse_matrices(SourceFile(data_file)) + solutions = restructure_solutions(solutions, skip_to) + + for model, impfunction, negation_defined in solutions: + model, vsp_result = has_vsp_plus_model(model, impfunction, negation_defined) + print_with_timestamp(vsp_result) + num_tested += 1 + + if vsp_result.has_vsp: + print(model) + + if vsp_result.has_vsp: + num_has_vsp += 1 + + print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP") +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="VSP Checker") + parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices") + parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file") + parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: 1") + parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.") + args = vars(parser.parse_args()) + + data_file_path = args.get("i") + if data_file_path is None: + data_file_path = input("Path to MaGIC Ugly Data File: ") + + num_cpu = args.get("c") + if num_cpu is None: + num_cpu = 1 + + if num_cpu == 1: + single_process_runner(data_file_path, args.get("skip_to")) + else: + multi_process_runner(num_cpu, data_file_path, args.get("skip_to"))