From 2fa8aa9c158d217bec2e19207c4e04536a344eba Mon Sep 17 00:00:00 2001 From: Brandon Rozek Date: Sun, 12 May 2024 13:03:28 -0400 Subject: [PATCH] Updates - Parses multiple implication tables from magic - Speed improvements to model closure - Make use of prior model_closure computations --- model.py | 93 +++++++++++++++++----------------------- parse_magic.py | 72 +++++++++++++++---------------- vsp.py | 114 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 92 deletions(-) create mode 100644 vsp.py diff --git a/model.py b/model.py index 5c07d1a..63cd62e 100644 --- a/model.py +++ b/model.py @@ -8,7 +8,7 @@ Operation, Conjunction, Disjunction, Implication ) from typing import Set, Dict, Tuple, Optional from functools import lru_cache -from itertools import combinations, chain, product +from itertools import combinations, chain, product, permutations from copy import deepcopy @@ -32,6 +32,8 @@ class ModelValue: def __lt__(self, other): assert isinstance(other, ModelValue) return ModelOrderConstraint(self, other) + def __deepcopy__(self, memo): + return ModelValue(self.name) class ModelFunction: @@ -195,66 +197,47 @@ def satisfiable(logic: Logic, model: Model, interpretation: Dict[Operation, Mode return True +from itertools import combinations_with_replacement +from collections import defaultdict def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction]): - last_set: Set[ModelValue] = set() - current_set: Set[ModelValue] = initial_set + closure_set: Set[ModelValue] = initial_set + last_new = initial_set + changed = True - while last_set != current_set: - last_set = deepcopy(current_set) + while changed: + changed = False + new_elements = set() + old_closure = closure_set - last_new + + # arity -> args + cached_args = defaultdict(list) for mfun in mfunctions: - # Get output for every possible input configuration - # from last_set - for args in product(last_set, repeat=mfun.arity): - current_set.add(mfun(*args)) - return current_set + # Use cached args if this arity was looked at before + if mfun.arity in cached_args: + for args in cached_args[mfun.arity]: + element = mfun(*args) + if element not in closure_set: + new_elements.add(element) + # Move onto next function + continue -def has_vsp(model: Model, interpretation: Dict[Operation, ModelFunction]) -> bool: - """ - Tells you whether a model violates the - variable sharing property. - """ + # Iterate over how many new elements would be within the arguments + # NOTE: To not repeat work, there must be at least one new element + for num_new in range(1, mfun.arity + 1): + new_args = combinations_with_replacement(last_new, r=num_new) + old_args = combinations_with_replacement(old_closure, r=mfun.arity - num_new) + for new_arg, old_arg in product(new_args, old_args): + for args in permutations(new_arg + old_arg): + cached_args[mfun.arity].append(args) + element = mfun(*args) + if element not in closure_set: + new_elements.add(element) - impfunction = interpretation[Implication] - - # Compute I the set of tuples (x, y) where - # x -> y does not take a designiated value - I: Set[Tuple[ModelValue, ModelValue]] = set() - - for (x, y) in product(model.carrier_set, model.carrier_set): - if impfunction(x, y) not in model.designated_values: - I.add((x, y)) - - # Construct the powerset without the empty set - s = list(I) - I_power = chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1)) - # ((x1, y1)), ((x1, y1), (x2, y2)), ... - - for xys in I_power: - # Compute the closure of all operations - # with just the xs - xs = {xy[0] for xy in xys} - carrier_set_left: Set[ModelValue] = model_closure(xs, model.logical_operations) - - # Compute the closure of all operations - # with just the ys - ys = {xy[1] for xy in xys} - carrier_set_right: Set[ModelValue] = model_closure(ys, model.logical_operations) - - # If the carrier set intersects, then we violate VSP - if len(carrier_set_left & carrier_set_right) > 0: - continue - - invalid = False - for (x2, y2) in product(carrier_set_left, carrier_set_right): - if impfunction(x2, y2) in model.designated_values: - invalid = True - break - - if not invalid: - return True - - return False + closure_set.update(new_elements) + changed = len(new_elements) > 0 + last_new = new_elements + return closure_set diff --git a/parse_magic.py b/parse_magic.py index 5dd9f99..4b95ba4 100644 --- a/parse_magic.py +++ b/parse_magic.py @@ -13,6 +13,7 @@ from logic import ( Negation, Disjunction ) +from vsp import has_vsp def parse_matrices(infile: TextIO) -> List[Tuple[Model, Dict]]: next(infile) # Skip header line @@ -42,12 +43,11 @@ def parse_matrices(infile: TextIO) -> List[Tuple[Model, Dict]]: if designated_values is None: break - while True: - result = parse_implication(infile, size) - if result is None: - break - mimplication, hasnext = result + results = parse_implication(infile, size) + if result is None: + break + for mimplication in results: logical_operations = { mnegation, mconjunction, mdisjunction, mimplication @@ -60,10 +60,7 @@ def parse_matrices(infile: TextIO) -> List[Tuple[Model, Dict]]: Implication: mimplication } solutions.append((model, interpretation)) - print(f"Parsed {len(solutions)} so far") - if not hasnext: - break return solutions def carrier_set_from_size(size: int): @@ -78,7 +75,7 @@ def parse_size(infile: TextIO) -> Optional[int]: return None assert size > 0, "Unexpected size" return size - + def parse_negation(infile: TextIO, size: int) -> Optional[ModelFunction]: line = next(infile).strip() if line == '-1': @@ -87,12 +84,12 @@ def parse_negation(infile: TextIO, size: int) -> Optional[ModelFunction]: row = line.split(" ") assert len(row) == size + 1, "Negation table doesn't match size" mapping = {} - + for i, j in zip(range(size + 1), row): x = mvalue_from_index(i) y = parse_mvalue(j) mapping[(x, )] = y - + return ModelFunction(1, mapping, "Negation") @@ -118,7 +115,7 @@ def determine_cresult(size: int, ordering: Dict[ModelValue, ModelValue], a: Mode if ordering[(c, d)]: if ordering[(d, a)] and ordering [(d, b)]: invalid = True - + if not invalid: return c @@ -131,7 +128,7 @@ def determine_dresult(size: int, ordering: Dict[ModelValue, ModelValue], a: Mode continue if not ordering[(b, c)]: continue - + invalid = False for j in range(size + 1): @@ -148,8 +145,8 @@ def determine_dresult(size: int, ordering: Dict[ModelValue, ModelValue], a: Mode def parse_order(infile: TextIO, size: int) -> Optional[Tuple[ModelFunction, ModelFunction]]: line = next(infile).strip() if line == '-1': - return None - + return None + table = line.split(" ") assert len(table) == (size + 1)**2 @@ -186,7 +183,7 @@ def parse_designated(infile: TextIO, size: int) -> Optional[Set[ModelValue]]: line = next(infile).strip() if line == '-1': return None - + row = line.split(" ") assert len(row) == size + 1, "Designated table doesn't match size" @@ -200,44 +197,45 @@ def parse_designated(infile: TextIO, size: int) -> Optional[Set[ModelValue]]: return designated_values -def parse_implication(infile: TextIO, size: int) -> Optional[Tuple[ModelFunction, bool]]: +def parse_implication(infile: TextIO, size: int) -> Optional[List[ModelFunction]]: line = next(infile).strip() if line == '-1': return None - - table = line.split(" ") - has_next = True - if table[-1] == '-1': - has_next = False - table = table[:-1] - assert len(table) == (size + 1)**2 + # Split and remove the last '-1' character + table = line.split(" ")[:-1] - mapping = {} + assert len(table) % (size + 1)**2 == 0 table_i = 0 + mimplications: List[ModelFunction] = [] - for i in range(size + 1): - x = mvalue_from_index(i) - for j in range(size + 1): - y = mvalue_from_index(j) + for _ in range(len(table) // (size + 1)**2): + mapping = {} - r = parse_mvalue(table[table_i]) - table_i += 1 + for i in range(size + 1): + x = mvalue_from_index(i) + for j in range(size + 1): + y = mvalue_from_index(j) - mapping[(x, y)] = r + r = parse_mvalue(table[table_i]) + table_i += 1 - mimplication = ModelFunction(2, mapping, "Implication") - return mimplication, has_next + mapping[(x, y)] = r + + mimplication = ModelFunction(2, mapping, "Implication") + mimplications.append(mimplication) + + return mimplications if __name__ == "__main__": - from model import has_vsp - solutions: List[Model] = parse_matrices(sys.stdin) print(f"Parsed {len(solutions)} matrices") - for model, interpretation in solutions: + for i, (model, interpretation) in enumerate(solutions): # print(model) if has_vsp(model, interpretation): print(model) print("Has VSP") + else: + print("Model", i, "does not have VSP") diff --git a/vsp.py b/vsp.py new file mode 100644 index 0000000..ff142af --- /dev/null +++ b/vsp.py @@ -0,0 +1,114 @@ +""" +Check to see if the model has the variable +sharing property. +""" +from itertools import chain, combinations, product +from typing import Dict, Set, Tuple, List +from model import ( + Model, ModelFunction, ModelValue, model_closure +) +from logic import Implication, Operation + +def preseed(initial_set: Set[ModelValue], cache:List[Tuple[Set[ModelValue], Set[ModelValue]]]): + """ + Cache contains caches of model closure calls: + Ex: + {1, 2, 3} -> {....} + + If {1,2,3} is a subset of initial set, + then {....} is the subset of the output of model_closure. + + We'll use the output to speed up the saturation procedure + """ + candidate_preseed: Tuple[Set[ModelValue], int] = (None, None) + + for i, o in cache: + if i < initial_set: + cost = len(initial_set - i) + if candidate_preseed[1] is None or cost < candidate_preseed[1]: + candidate_preseed = o, cost + + same_set = candidate_preseed[1] == 0 + return candidate_preseed[0], same_set + +def has_vsp(model: Model, interpretation: Dict[Operation, ModelFunction]) -> bool: + """ + Tells you whether a model has the + variable sharing property. + """ + + impfunction = interpretation[Implication] + + # Compute I the set of tuples (x, y) where + # x -> y does not take a designiated value + I: Set[Tuple[ModelValue, ModelValue]] = set() + + for (x, y) in product(model.carrier_set, model.carrier_set): + if impfunction(x, y) not in model.designated_values: + I.add((x, y)) + + # Construct the powerset of I without the empty set + s = list(I) + I_power = chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1)) + # ((x1, y1)), ((x1, y1), (x2, y2)), ... + + # Closure cache + closure_cache: List[Tuple[Set[ModelValue], Set[ModelValue]]] = [] + + # Find the subalgebras which falsify implication + for xys in I_power: + + xs = {xy[0] for xy in xys} + orig_xs = xs + cached_xs = preseed(xs, closure_cache) + if cached_xs[0] is not None: + xs |= cached_xs[0] + + ys = {xy[1] for xy in xys} + orig_ys = ys + cached_ys = preseed(ys, closure_cache) + if cached_ys[0] is not None: + ys |= cached_ys[0] + + + # NOTE: Optimziation before model_closure + # If the carrier set intersects, then move on to the next + # subalgebra + if len(xs & ys) > 0: + continue + + # Compute the closure of all operations + # with just the xs + carrier_set_left: Set[ModelValue] = model_closure(xs, model.logical_operations) + + # Save to cache + if cached_xs[0] is not None and not cached_ys[1]: + closure_cache.append((orig_xs, carrier_set_left)) + + + # Compute the closure of all operations + # with just the ys + carrier_set_right: Set[ModelValue] = model_closure(ys, model.logical_operations) + + # Save to cache + if cached_ys[0] is not None and not cached_ys[1]: + closure_cache.append((orig_ys, carrier_set_right)) + + + # If the carrier set intersects, then move on to the next + # subalgebra + if len(carrier_set_left & carrier_set_right) > 0: + continue + + # See if for all pairs in the subalgebras, that + # implication is falsified + falsified = True + for (x2, y2) in product(carrier_set_left, carrier_set_right): + if impfunction(x2, y2) in model.designated_values: + falsified = False + break + + if falsified: + return True + + return False