mirror of
https://github.com/Brandon-Rozek/matmod.git
synced 2025-07-29 20:52:01 +00:00
Code cleanup
This commit is contained in:
parent
fa9e5026ca
commit
01204a9551
4 changed files with 286 additions and 353 deletions
171
model.py
171
model.py
|
@ -16,9 +16,9 @@ from typing import Dict, List, Optional, Set, Tuple
|
|||
__all__ = ['ModelValue', 'ModelFunction', 'Model', 'Interpretation']
|
||||
|
||||
class ModelValue:
|
||||
def __init__(self, name):
|
||||
def __init__(self, name: str, hashed_value: Optional[int] = None):
|
||||
self.name = name
|
||||
self.hashed_value = hash(self.name)
|
||||
self.hashed_value = hashed_value if hashed_value is not None else hash(self.name)
|
||||
self.__setattr__ = immutable
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
@ -27,7 +27,7 @@ class ModelValue:
|
|||
def __eq__(self, other):
|
||||
return isinstance(other, ModelValue) and self.name == other.name
|
||||
def __deepcopy__(self, _):
|
||||
return ModelValue(self.name)
|
||||
return ModelValue(self.name, self.hashed_value)
|
||||
|
||||
class ModelFunction:
|
||||
def __init__(self, arity: int, mapping, operation_name = ""):
|
||||
|
@ -109,57 +109,75 @@ Interpretation = Dict[Operation, ModelFunction]
|
|||
class OrderTable:
|
||||
def __init__(self):
|
||||
# a : {x | x <= a }
|
||||
self.ordering: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
|
||||
self.le_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
|
||||
# a : {x | x >= a}
|
||||
self.ge_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
|
||||
|
||||
def add(self, x, y):
|
||||
"""
|
||||
Add x <= y
|
||||
"""
|
||||
self.ordering[y].add(x)
|
||||
self.le_map[y].add(x)
|
||||
self.ge_map[x].add(y)
|
||||
|
||||
def is_lt(self, x, y):
|
||||
return y in self.ordering[x]
|
||||
return x in self.le_map[y]
|
||||
|
||||
def meet(self, x, y) -> Optional[ModelValue]:
|
||||
X = self.ordering[x]
|
||||
Y = self.ordering[y]
|
||||
X = self.le_map[x]
|
||||
Y = self.le_map[y]
|
||||
|
||||
candidates = X.intersection(Y)
|
||||
|
||||
for m in candidates:
|
||||
gt_all_candidates = True
|
||||
for w in candidates:
|
||||
if not self.is_lt(w, m):
|
||||
gt_all_candidates = False
|
||||
break
|
||||
# Grab all elements greater than each of the candidates
|
||||
candidate_ge_maps = (self.ge_map[candidate] for candidate in candidates)
|
||||
common_ge_values = reduce(set.intersection, candidate_ge_maps)
|
||||
|
||||
if gt_all_candidates:
|
||||
return m
|
||||
# Intersect with candidates to get the values that satisfy
|
||||
# the meet properties
|
||||
result_set = candidates.intersection(common_ge_values)
|
||||
|
||||
# Otherwise the meet does not exist
|
||||
print("Meet does not exist", (x, y), candidates)
|
||||
return None
|
||||
# NOTE: The meet may not exist, in which case return None
|
||||
result = next(iter(result_set), None)
|
||||
return result
|
||||
|
||||
def join(self, x, y) -> Optional[ModelValue]:
|
||||
# Grab the collection of elements greater than x and y
|
||||
candidates = set()
|
||||
for w in self.ordering:
|
||||
if self.is_lt(x, w) and self.is_lt(y, w):
|
||||
candidates.add(w)
|
||||
X = self.ge_map[x]
|
||||
Y = self.ge_map[y]
|
||||
|
||||
for j in candidates:
|
||||
lt_all_candidates = True
|
||||
for w in candidates:
|
||||
if not self.is_lt(j, w):
|
||||
lt_all_candidates = False
|
||||
break
|
||||
candidates = X.intersection(Y)
|
||||
|
||||
if lt_all_candidates:
|
||||
return j
|
||||
# Grab all elements smaller than each of the candidates
|
||||
candidate_le_maps = (self.le_map[candidate] for candidate in candidates)
|
||||
common_le_values = reduce(set.intersection, candidate_le_maps)
|
||||
|
||||
# Otherwise the join does not exist
|
||||
print("Join does not exist", (x, y), candidates)
|
||||
return None
|
||||
# Intersect with candidatse to get the values that satisfy
|
||||
# the join properties
|
||||
result_set = candidates.intersection(common_le_values)
|
||||
|
||||
# NOTE: The join may not exist, in which case return None
|
||||
result = next(iter(result_set), None)
|
||||
return result
|
||||
|
||||
def top(self) -> Optional[ModelValue]:
|
||||
ge_maps = (self.ge_map[candidate] for candidate in self.ge_map)
|
||||
result_set = reduce(set.intersection, ge_maps)
|
||||
|
||||
# Either not unique or does not exist
|
||||
if len(result_set) != 1:
|
||||
return None
|
||||
|
||||
return next(iter(result_set))
|
||||
|
||||
def bottom(self) -> Optional[ModelValue]:
|
||||
le_maps = (self.le_map[candidate] for candidate in self.le_map)
|
||||
result_set = reduce(set.intersection, le_maps)
|
||||
|
||||
# Either not unique or does not exist
|
||||
if len(result_set) != 1:
|
||||
return None
|
||||
|
||||
return next(iter(result_set))
|
||||
|
||||
|
||||
class Model:
|
||||
|
@ -276,86 +294,61 @@ def satisfiable(logic: Logic, model: Model, interpretation: Dict[Operation, Mode
|
|||
return True
|
||||
|
||||
|
||||
|
||||
def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction], forbidden_element: Optional[ModelValue]) -> Set[ModelValue]:
|
||||
"""
|
||||
Given an initial set of model values and a set of model functions,
|
||||
compute the complete set of model values that are closed
|
||||
under the operations.
|
||||
|
||||
If top or bottom is encountered, then we end the saturation procedure early.
|
||||
If the forbidden element is encountered, then we end the saturation procedure early.
|
||||
"""
|
||||
closure_set: Set[ModelValue] = initial_set
|
||||
last_new: Set[ModelValue] = initial_set
|
||||
changed: bool = True
|
||||
forbidden_found = False
|
||||
|
||||
arities = set()
|
||||
for mfun in mfunctions:
|
||||
arities.add(mfun.arity)
|
||||
|
||||
while changed:
|
||||
changed = False
|
||||
new_elements: Set[ModelValue] = set()
|
||||
old_closure: Set[ModelValue] = closure_set - last_new
|
||||
|
||||
# arity -> args
|
||||
cached_args = defaultdict(list)
|
||||
args_by_arity = defaultdict(list)
|
||||
|
||||
# Pass elements into each model function
|
||||
for mfun in mfunctions:
|
||||
|
||||
# If a previous function shared the same arity,
|
||||
# we'll use the same set of computed arguments
|
||||
# to pass into the model functions.
|
||||
if mfun.arity in cached_args:
|
||||
for args in cached_args[mfun.arity]:
|
||||
# Compute the new elements
|
||||
# given the cached arguments.
|
||||
element = mfun(*args)
|
||||
if element not in closure_set:
|
||||
new_elements.add(element)
|
||||
|
||||
# Optimization: Break out of computation
|
||||
# early when forbidden element is found
|
||||
if forbidden_element is not None and element == forbidden_element:
|
||||
forbidden_found = True
|
||||
break
|
||||
|
||||
if forbidden_found:
|
||||
break
|
||||
|
||||
# We don't need to compute the arguments
|
||||
# thanks to the cache, so move onto the
|
||||
# next function.
|
||||
continue
|
||||
|
||||
# At this point, we don't have cached arguments, so we need
|
||||
# to compute this set.
|
||||
|
||||
# Each argument must have at least one new element to not repeat
|
||||
# work. We'll range over the number of new model values within our
|
||||
# argument.
|
||||
for num_new in range(1, mfun.arity + 1):
|
||||
# Motivation: We want to only compute arguments that we have not
|
||||
# seen before
|
||||
for arity in arities:
|
||||
for num_new in range(1, arity + 1):
|
||||
new_args = combinations_with_replacement(last_new, r=num_new)
|
||||
old_args = combinations_with_replacement(old_closure, r=mfun.arity - num_new)
|
||||
# Determine every possible ordering of the concatenated
|
||||
# new and old model values.
|
||||
old_args = combinations_with_replacement(old_closure, r=arity - num_new)
|
||||
# Determine every possible ordering of the concatenated new and
|
||||
# old model values.
|
||||
for new_arg, old_arg in product(new_args, old_args):
|
||||
for args in permutations(new_arg + old_arg):
|
||||
cached_args[mfun.arity].append(args)
|
||||
element = mfun(*args)
|
||||
if element not in closure_set:
|
||||
new_elements.add(element)
|
||||
for combined_args in permutations(new_arg + old_arg):
|
||||
args_by_arity[arity].append(combined_args)
|
||||
|
||||
# Optimization: Break out of computation
|
||||
# early when forbidden element is found
|
||||
if forbidden_element is not None and element == forbidden_element:
|
||||
forbidden_found = True
|
||||
break
|
||||
|
||||
if forbidden_found:
|
||||
break
|
||||
# Pass each argument into each model function
|
||||
for mfun in mfunctions:
|
||||
for args in args_by_arity[mfun.arity]:
|
||||
# Compute the new elements
|
||||
# given the cached arguments.
|
||||
element = mfun(*args)
|
||||
if element not in closure_set:
|
||||
new_elements.add(element)
|
||||
|
||||
if forbidden_found:
|
||||
# Optimization: Break out of computation
|
||||
# early when forbidden element is found
|
||||
if forbidden_element is not None and element == forbidden_element:
|
||||
forbidden_found = True
|
||||
break
|
||||
|
||||
if forbidden_found:
|
||||
break
|
||||
|
||||
closure_set.update(new_elements)
|
||||
changed = len(new_elements) > 0
|
||||
|
|
169
parse_magic.py
169
parse_magic.py
|
@ -72,6 +72,45 @@ class ModelBuilder:
|
|||
# Map symbol to model function
|
||||
self.custom_model_functions: Dict[str, ModelFunction] = {}
|
||||
|
||||
def build(self, model_name: str) -> Tuple[Model, Dict[Operation, ModelFunction]]:
|
||||
"""Create Model"""
|
||||
assert self.size > 0
|
||||
assert self.size + 1 == len(self.carrier_list)
|
||||
assert len(self.designated_values) <= len(self.carrier_list)
|
||||
assert self.mimplication is not None
|
||||
|
||||
# Implication is required to be present
|
||||
logical_operations = { self.mimplication }
|
||||
interpretation = {
|
||||
Implication: self.mimplication
|
||||
}
|
||||
|
||||
# Other model functions and logical
|
||||
# operations are optional
|
||||
if self.mnegation is not None:
|
||||
logical_operations.add(self.mnegation)
|
||||
interpretation[Negation] = self.mnegation
|
||||
if self.mconjunction is not None:
|
||||
logical_operations.add(self.mconjunction)
|
||||
interpretation[Conjunction] = self.mconjunction
|
||||
if self.mdisjunction is not None:
|
||||
logical_operations.add(self.mdisjunction)
|
||||
interpretation[Disjunction] = self.mdisjunction
|
||||
if self.mnecessitation is not None:
|
||||
logical_operations.add(self.mnecessitation)
|
||||
interpretation[Necessitation] = self.mnecessitation
|
||||
|
||||
# Custom model function definitions
|
||||
for custom_mf in self.custom_model_functions.values():
|
||||
if custom_mf is not None:
|
||||
logical_operations.add(custom_mf)
|
||||
op = Operation(custom_mf.operation_name, custom_mf.arity)
|
||||
interpretation[op] = custom_mf
|
||||
|
||||
model = Model(set(self.carrier_list), logical_operations, self.designated_values, ordering=self.ordering, name=model_name)
|
||||
return (model, interpretation)
|
||||
|
||||
|
||||
class Stage:
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
|
@ -194,7 +233,7 @@ def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation,
|
|||
case "end":
|
||||
break
|
||||
case "process_model":
|
||||
yield process_model(stages.name(), current_model_parts)
|
||||
yield current_model_parts.build(stages.name())
|
||||
stage = stage.next
|
||||
case "size":
|
||||
processed = process_sizes(infile, current_model_parts, first_run)
|
||||
|
@ -300,7 +339,7 @@ def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> boo
|
|||
|
||||
def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||
"""Stage 4"""
|
||||
designated_values = parse_single_designated(infile, current_model_parts.size)
|
||||
designated_values = parse_single_designated(infile, current_model_parts.size, current_model_parts.carrier_list)
|
||||
if designated_values is None:
|
||||
return False
|
||||
|
||||
|
@ -309,7 +348,7 @@ def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -
|
|||
|
||||
def process_implications(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||
"""Stage 5"""
|
||||
mimplication = parse_single_dyadic_connective(infile, "→", current_model_parts.size)
|
||||
mimplication = parse_single_dyadic_connective(infile, "→", current_model_parts.size, current_model_parts.carrier_list)
|
||||
if mimplication is None:
|
||||
return False
|
||||
|
||||
|
@ -330,7 +369,7 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
|
|||
elif adicity == 1:
|
||||
mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list)
|
||||
elif adicity == 2:
|
||||
mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size)
|
||||
mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list)
|
||||
else:
|
||||
raise NotImplementedError("Unable to process connectives of adicity greater than 2")
|
||||
|
||||
|
@ -340,39 +379,6 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
|
|||
current_model_parts.custom_model_functions[symbol] = mfunction
|
||||
return True
|
||||
|
||||
def process_model(model_name: str, mp: ModelBuilder) -> Tuple[Model, Dict[Operation, ModelFunction]]:
|
||||
"""Create Model"""
|
||||
assert mp.size > 0
|
||||
assert mp.size + 1 == len(mp.carrier_list)
|
||||
assert len(mp.designated_values) <= len(mp.carrier_list)
|
||||
assert mp.mimplication is not None
|
||||
|
||||
logical_operations = { mp.mimplication }
|
||||
interpretation = {
|
||||
Implication: mp.mimplication
|
||||
}
|
||||
if mp.mnegation is not None:
|
||||
logical_operations.add(mp.mnegation)
|
||||
interpretation[Negation] = mp.mnegation
|
||||
if mp.mconjunction is not None:
|
||||
logical_operations.add(mp.mconjunction)
|
||||
interpretation[Conjunction] = mp.mconjunction
|
||||
if mp.mdisjunction is not None:
|
||||
logical_operations.add(mp.mdisjunction)
|
||||
interpretation[Disjunction] = mp.mdisjunction
|
||||
if mp.mnecessitation is not None:
|
||||
logical_operations.add(mp.mnecessitation)
|
||||
interpretation[Necessitation] = mp.mnecessitation
|
||||
|
||||
for custom_mf in mp.custom_model_functions.values():
|
||||
if custom_mf is not None:
|
||||
logical_operations.add(custom_mf)
|
||||
op = Operation(custom_mf.operation_name, custom_mf.arity)
|
||||
interpretation[op] = custom_mf
|
||||
|
||||
model = Model(set(mp.carrier_list), logical_operations, mp.designated_values, ordering=mp.ordering, name=model_name)
|
||||
return (model, interpretation)
|
||||
|
||||
|
||||
def parse_header(infile: SourceFile) -> UglyHeader:
|
||||
"""
|
||||
|
@ -406,7 +412,6 @@ def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
|
|||
"""
|
||||
Parse the line representing the matrix size.
|
||||
"""
|
||||
|
||||
size = int(infile.next_line())
|
||||
|
||||
# HACK: When necessitation and custom connectives are enabled
|
||||
|
@ -417,7 +422,9 @@ def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
|
|||
|
||||
if size == -1:
|
||||
return None
|
||||
|
||||
assert size > 0, f"Unexpected size at line {infile.line_in_file}"
|
||||
|
||||
return size
|
||||
|
||||
def mvalue_from_index(i: int) -> ModelValue:
|
||||
|
@ -433,55 +440,9 @@ def parse_mvalue(x: str) -> ModelValue:
|
|||
"""
|
||||
return mvalue_from_index(int(x))
|
||||
|
||||
def determine_cresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue:
|
||||
"""
|
||||
Determine what a ∧ b should be given the ordering table.
|
||||
"""
|
||||
for i in range(size + 1):
|
||||
c = mvalue_from_index(i)
|
||||
|
||||
if not ordering[(c, a)]:
|
||||
continue
|
||||
if not ordering[(c, b)]:
|
||||
continue
|
||||
|
||||
invalid = False
|
||||
for j in range(size + 1):
|
||||
d = mvalue_from_index(j)
|
||||
if c == d:
|
||||
continue
|
||||
if ordering[(c, d)]:
|
||||
if ordering[(d, a)] and ordering [(d, b)]:
|
||||
invalid = True
|
||||
|
||||
if not invalid:
|
||||
return c
|
||||
|
||||
def determine_dresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue:
|
||||
"""
|
||||
Determine what a ∨ b should be given the ordering table.
|
||||
"""
|
||||
for i in range(size + 1):
|
||||
c = mvalue_from_index(i)
|
||||
if not ordering[(a, c)]:
|
||||
continue
|
||||
if not ordering[(b, c)]:
|
||||
continue
|
||||
|
||||
invalid = False
|
||||
|
||||
for j in range(size + 1):
|
||||
d = mvalue_from_index(j)
|
||||
if d == c:
|
||||
continue
|
||||
if ordering[(d, c)]:
|
||||
if ordering[(a, d)] and ordering[(b, d)]:
|
||||
invalid = True
|
||||
|
||||
if not invalid:
|
||||
return c
|
||||
|
||||
def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[Tuple[OrderTable, ModelFunction, ModelFunction]]:
|
||||
def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[
|
||||
Tuple[OrderTable, Optional[ModelFunction], Optional[ModelFunction]]]:
|
||||
"""
|
||||
Parse the line representing the ordering table
|
||||
"""
|
||||
|
@ -509,21 +470,12 @@ def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelVa
|
|||
for x, y in product(carrier_list, carrier_list):
|
||||
cresult = ordering.meet(x, y)
|
||||
if cresult is None:
|
||||
print("[Warning] Conjunction and Disjunction are not well-defined")
|
||||
print(f"{x} ∧ {y} = ??")
|
||||
return None, None
|
||||
else:
|
||||
print(f"{x} ∧ {y} = {cresult}")
|
||||
return ordering, None, None
|
||||
cmapping[(x, y)] = cresult
|
||||
|
||||
dresult = ordering.join(x, y)
|
||||
# dresult = determine_dresult(size, omapping, x, y)
|
||||
if dresult is None:
|
||||
print("[Warning] Conjunction and Disjunction are not well-defined")
|
||||
print(f"{x} ∨ {y} = ??")
|
||||
return None, None
|
||||
else:
|
||||
print(f"{x} ∨ {y} = {dresult}")
|
||||
return ordering, None, None
|
||||
dmapping[(x, y)] = dresult
|
||||
|
||||
mconjunction = ModelFunction(2, cmapping, "∧")
|
||||
|
@ -531,7 +483,7 @@ def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelVa
|
|||
|
||||
return ordering, mconjunction, mdisjunction
|
||||
|
||||
def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[ModelValue]]:
|
||||
def parse_single_designated(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[Set[ModelValue]]:
|
||||
"""
|
||||
Parse the line representing which model values are designated.
|
||||
"""
|
||||
|
@ -544,9 +496,8 @@ def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[Model
|
|||
|
||||
designated_values = set()
|
||||
|
||||
for i, j in zip(range(size + 1), row):
|
||||
for x, j in zip(carrier_list, row):
|
||||
if j == '1':
|
||||
x = mvalue_from_index(i)
|
||||
designated_values.add(x)
|
||||
|
||||
return designated_values
|
||||
|
@ -579,7 +530,7 @@ def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int,
|
|||
|
||||
return ModelFunction(1, mapping, symbol)
|
||||
|
||||
def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]:
|
||||
def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]:
|
||||
first_token = next(infile)
|
||||
if first_token == "-1":
|
||||
return None
|
||||
|
@ -588,20 +539,14 @@ def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) -
|
|||
try:
|
||||
table = [first_token] + [next(infile) for _ in range((size + 1)**2 - 1)]
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
assert len(table) == (size + 1)**2, f"{symbol} table does not match expected size at line {infile.line_in_file}"
|
||||
raise Exception(f"{symbol} table does not match expected size at line {infile.line_in_file}")
|
||||
|
||||
mapping = {}
|
||||
table_i = 0
|
||||
for i in range(size + 1):
|
||||
x = mvalue_from_index(i)
|
||||
for j in range(size + 1):
|
||||
y = mvalue_from_index(j)
|
||||
|
||||
r = parse_mvalue(table[table_i])
|
||||
table_i += 1
|
||||
|
||||
mapping[(x, y)] = r
|
||||
for x, y in product(carrier_list, carrier_list):
|
||||
r = parse_mvalue(table[table_i])
|
||||
table_i += 1
|
||||
mapping[(x, y)] = r
|
||||
|
||||
return ModelFunction(2, mapping, symbol)
|
||||
|
|
200
vsp.py
200
vsp.py
|
@ -2,6 +2,7 @@
|
|||
Check to see if the model has the variable
|
||||
sharing property.
|
||||
"""
|
||||
from collections import defaultdict
|
||||
from itertools import chain, combinations, product
|
||||
from typing import List, Optional, Set, Tuple
|
||||
from common import set_to_str
|
||||
|
@ -9,75 +10,36 @@ from model import (
|
|||
Model, model_closure, ModelFunction, ModelValue, OrderTable
|
||||
)
|
||||
|
||||
def preseed(
|
||||
initial_set: Set[ModelValue],
|
||||
cache:List[Tuple[Set[ModelValue], Set[ModelValue]]]):
|
||||
"""
|
||||
Given a cache of previous model_closure calls,
|
||||
use this to compute an initial model closure
|
||||
set based on the initial set.
|
||||
class Cache:
|
||||
def __init__(self):
|
||||
# input size -> cached (inputs, outputs)
|
||||
self.c = defaultdict(list)
|
||||
|
||||
Basic Idea:
|
||||
Let {1, 2, 3} -> X be in the cache.
|
||||
If {1,2,3} is a subset of initial set,
|
||||
then X is the subset of the output of model_closure.
|
||||
def add(self, i: Set[ModelValue], o: Set[ModelValue]):
|
||||
self.c[len(i)].append((i, o))
|
||||
|
||||
This is used to speed up subsequent calls to model_closure
|
||||
"""
|
||||
candidate_preseed: Tuple[Set[ModelValue], int] = (None, None)
|
||||
def get_closest(self, initial_set: Set[ModelValue]) -> Optional[Tuple[Set[ModelValue], bool]]:
|
||||
"""
|
||||
Iterate through our cache starting with the cached
|
||||
inputs closest in size to the initial_set and
|
||||
find the one that's a subset of initial_set.
|
||||
|
||||
for i, o in cache:
|
||||
if i < initial_set:
|
||||
cost = len(initial_set - i)
|
||||
# If i is a subset with less missing elements than
|
||||
# the previous candidate, then it's the new candidate.
|
||||
if candidate_preseed[1] is None or cost < candidate_preseed[1]:
|
||||
candidate_preseed = o, cost
|
||||
Returns cached_output, and whether the initial_set is the same
|
||||
as the cached_input.
|
||||
"""
|
||||
initial_set_size = len(initial_set)
|
||||
sizes = range(initial_set_size, 0, -1)
|
||||
|
||||
same_set = candidate_preseed[1] == 0
|
||||
return candidate_preseed[0], same_set
|
||||
for size in sizes:
|
||||
if size not in self.c:
|
||||
continue
|
||||
|
||||
for cached_input, cached_output in self.c[size]:
|
||||
if cached_input <= initial_set:
|
||||
return cached_output, size == initial_set_size
|
||||
|
||||
def find_top(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]:
|
||||
"""
|
||||
Find the top of the order lattice.
|
||||
T || a = T, T && a = a for all a in the carrier set
|
||||
"""
|
||||
if mconjunction is None or mdisjunction is None:
|
||||
return None
|
||||
|
||||
for x in algebra:
|
||||
is_top = True
|
||||
for y in algebra:
|
||||
if mdisjunction(x, y) != x or mconjunction(x, y) != y:
|
||||
is_top = False
|
||||
break
|
||||
if is_top:
|
||||
return x
|
||||
|
||||
print("[Warning] Failed to find the top of the lattice")
|
||||
return None
|
||||
|
||||
def find_bottom(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]:
|
||||
"""
|
||||
Find the bottom of the order lattice
|
||||
F || a = a, F && a = F for all a in the carrier set
|
||||
"""
|
||||
if mconjunction is None or mdisjunction is None:
|
||||
return None
|
||||
|
||||
for x in algebra:
|
||||
is_bottom = True
|
||||
for y in algebra:
|
||||
if mdisjunction(x, y) != y or mconjunction(x, y) != x:
|
||||
is_bottom = False
|
||||
break
|
||||
if is_bottom:
|
||||
return x
|
||||
|
||||
print("[Warning] Failed to find the bottom of the lattice")
|
||||
return None
|
||||
|
||||
def order_dependent(subalgebra1: Set[ModelValue], subalegbra2: Set[ModelValue], ordering: OrderTable):
|
||||
"""
|
||||
Returns true if there exists a value in subalgebra1 that's less than a value in subalgebra2
|
||||
|
@ -106,17 +68,49 @@ Subalgebra 1: {set_to_str(self.subalgebra1)}
|
|||
Subalgebra 2: {set_to_str(self.subalgebra2)}
|
||||
"""
|
||||
|
||||
def quick_vsp_unsat_incomplete(xs, ys, model, top, bottom, negation_defined) -> bool:
|
||||
"""
|
||||
Return True if VSP cannot be satisfied
|
||||
through some incomplete checks.
|
||||
"""
|
||||
# If the left subalgebra contains bottom
|
||||
# or the right subalgebra contains top
|
||||
# skip this pair
|
||||
if top is not None and top in ys:
|
||||
return True
|
||||
if bottom is not None and bottom in xs:
|
||||
return True
|
||||
|
||||
# If a subalgebra doesn't have at least one
|
||||
# designated value, move onto the next pair.
|
||||
# Depends on no intersection between xs and ys
|
||||
if xs.isdisjoint(model.designated_values):
|
||||
return True
|
||||
|
||||
if ys.isdisjoint(model.designated_values):
|
||||
return True
|
||||
|
||||
# If the two subalgebras intersect, move
|
||||
# onto the next pair.
|
||||
if not xs.isdisjoint(ys):
|
||||
return True
|
||||
|
||||
# If the subalgebras are order-dependent, skip this pair
|
||||
if order_dependent(xs, ys, model.ordering):
|
||||
return True
|
||||
if negation_defined and order_dependent(ys, xs, model.ordering):
|
||||
return True
|
||||
|
||||
# We can't immediately rule out that these
|
||||
# subalgebras don't exhibit VSP
|
||||
return False
|
||||
|
||||
def has_vsp(model: Model, impfunction: ModelFunction,
|
||||
mconjunction: Optional[ModelFunction] = None,
|
||||
mdisjunction: Optional[ModelFunction] = None,
|
||||
mnegation: Optional[ModelFunction] = None) -> VSP_Result:
|
||||
negation_defined: bool) -> VSP_Result:
|
||||
"""
|
||||
Checks whether a model has the variable
|
||||
sharing property.
|
||||
"""
|
||||
top = find_top(model.carrier_set, mconjunction, mdisjunction)
|
||||
bottom = find_bottom(model.carrier_set, mconjunction, mdisjunction)
|
||||
|
||||
# NOTE: No models with only one designated
|
||||
# value satisfies VSP
|
||||
if len(model.designated_values) == 1:
|
||||
|
@ -124,68 +118,44 @@ def has_vsp(model: Model, impfunction: ModelFunction,
|
|||
|
||||
assert model.ordering is not None, "Expected ordering table in model"
|
||||
|
||||
top = model.ordering.top()
|
||||
bottom = model.ordering.bottom()
|
||||
|
||||
# Compute I the set of tuples (x, y) where
|
||||
# x -> y does not take a designiated value
|
||||
I: Set[Tuple[ModelValue, ModelValue]] = set()
|
||||
I: List[Tuple[ModelValue, ModelValue]] = []
|
||||
|
||||
for (x, y) in product(model.carrier_set, model.carrier_set):
|
||||
if impfunction(x, y) not in model.designated_values:
|
||||
I.add((x, y))
|
||||
I.append((x, y))
|
||||
|
||||
# Construct the powerset of I without the empty set
|
||||
s = list(I)
|
||||
I_power = chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1))
|
||||
I_power = chain.from_iterable(combinations(I, r) for r in range(1, len(I) + 1))
|
||||
# ((x1, y1)), ((x1, y1), (x2, y2)), ...
|
||||
|
||||
# Closure cache
|
||||
closure_cache: List[Tuple[Set[ModelValue], Set[ModelValue]]] = []
|
||||
closure_cache = Cache()
|
||||
|
||||
# Find the subalgebras which falsify implication
|
||||
for xys in I_power:
|
||||
|
||||
xs = {xy[0] for xy in xys}
|
||||
xs = { xy[0] for xy in xys }
|
||||
ys = { xy[1] for xy in xys }
|
||||
|
||||
if quick_vsp_unsat_incomplete(xs, ys, model, top, bottom, negation_defined):
|
||||
continue
|
||||
|
||||
orig_xs = xs
|
||||
cached_xs = preseed(xs, closure_cache)
|
||||
if cached_xs[0] is not None:
|
||||
cached_xs = closure_cache.get_closest(xs)
|
||||
if cached_xs is not None:
|
||||
xs |= cached_xs[0]
|
||||
|
||||
ys = {xy[1] for xy in xys}
|
||||
orig_ys = ys
|
||||
cached_ys = preseed(ys, closure_cache)
|
||||
if cached_ys[0] is not None:
|
||||
cached_ys = closure_cache.get_closest(ys)
|
||||
if cached_ys is not None:
|
||||
ys |= cached_ys[0]
|
||||
|
||||
|
||||
# NOTE: Optimziation before model_closure
|
||||
# If the two subalgebras intersect, move
|
||||
# onto the next pair.
|
||||
if len(xs & ys) > 0:
|
||||
continue
|
||||
|
||||
# NOTE: Optimization
|
||||
# If a subalgebra doesn't have at least one
|
||||
# designated value, move onto the next pair.
|
||||
# Depends on no intersection between xs and ys
|
||||
if len(xs & model.designated_values) == 0:
|
||||
continue
|
||||
|
||||
if len(ys & model.designated_values) == 0:
|
||||
continue
|
||||
|
||||
# NOTE: Optimization
|
||||
# If the left subalgebra contains bottom
|
||||
# or the right subalgebra contains top
|
||||
# skip this pair
|
||||
if top is not None and top in ys:
|
||||
continue
|
||||
if bottom is not None and bottom in xs:
|
||||
continue
|
||||
|
||||
# NOTE: Optimization
|
||||
# If the subalgebras are order-dependent, skip this pair
|
||||
if order_dependent(xs, ys, model.ordering):
|
||||
continue
|
||||
if mnegation is not None and order_dependent(ys, xs, model.ordering):
|
||||
xs_ys_updated = cached_xs is not None or cached_ys is not None
|
||||
if xs_ys_updated and quick_vsp_unsat_incomplete(xs, ys, model, top, bottom, negation_defined):
|
||||
continue
|
||||
|
||||
# Compute the closure of all operations
|
||||
|
@ -193,8 +163,8 @@ def has_vsp(model: Model, impfunction: ModelFunction,
|
|||
carrier_set_left: Set[ModelValue] = model_closure(xs, model.logical_operations, bottom)
|
||||
|
||||
# Save to cache
|
||||
if cached_xs[0] is not None and not cached_ys[1]:
|
||||
closure_cache.append((orig_xs, carrier_set_left))
|
||||
if cached_xs is None or (cached_xs is not None and not cached_xs[1]):
|
||||
closure_cache.add(orig_xs, carrier_set_left)
|
||||
|
||||
if bottom is not None and bottom in carrier_set_left:
|
||||
continue
|
||||
|
@ -204,15 +174,15 @@ def has_vsp(model: Model, impfunction: ModelFunction,
|
|||
carrier_set_right: Set[ModelValue] = model_closure(ys, model.logical_operations, top)
|
||||
|
||||
# Save to cache
|
||||
if cached_ys[0] is not None and not cached_ys[1]:
|
||||
closure_cache.append((orig_ys, carrier_set_right))
|
||||
if cached_ys is None or (cached_ys is not None and not cached_ys[1]):
|
||||
closure_cache.add(orig_ys, carrier_set_right)
|
||||
|
||||
if top is not None and top in carrier_set_right:
|
||||
continue
|
||||
|
||||
# If the carrier set intersects, then move on to the next
|
||||
# subalgebra
|
||||
if len(carrier_set_left & carrier_set_right) > 0:
|
||||
if not carrier_set_left.isdisjoint(carrier_set_right):
|
||||
continue
|
||||
|
||||
# See if for all pairs in the subalgebras, that
|
||||
|
|
99
vspursuer.py
99
vspursuer.py
|
@ -1,17 +1,20 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
# NOTE: Perhaps we should use process_cpu_count but that's not available to all Python versions
|
||||
from os import cpu_count
|
||||
from datetime import datetime
|
||||
from typing import Dict, Iterator, Optional, Tuple
|
||||
from queue import Empty as QueueEmpty
|
||||
import argparse
|
||||
import multiprocessing as mp
|
||||
|
||||
from logic import Conjunction, Disjunction, Negation, Implication, Operation
|
||||
from logic import Negation, Implication, Operation
|
||||
from model import Model, ModelFunction
|
||||
from parse_magic import SourceFile, parse_matrices
|
||||
from vsp import has_vsp, VSP_Result
|
||||
|
||||
def print_with_timestamp(message):
|
||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{current_time}] {message}")
|
||||
|
||||
def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], skip_to: Optional[str]) -> \
|
||||
Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]:
|
||||
"""
|
||||
|
@ -32,17 +35,15 @@ def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, Model
|
|||
|
||||
# NOTE: Implication must be defined, the rest may not
|
||||
impfunction = interpretation[Implication]
|
||||
mconjunction = interpretation.get(Conjunction)
|
||||
mdisjunction = interpretation.get(Disjunction)
|
||||
mnegation = interpretation.get(Negation)
|
||||
yield (model, impfunction, mconjunction, mdisjunction, mnegation)
|
||||
negation_defined = Negation in interpretation
|
||||
yield (model, impfunction, negation_defined)
|
||||
|
||||
def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation) -> Tuple[Optional[Model], VSP_Result]:
|
||||
def has_vsp_plus_model(model, impfunction, negation_defined) -> Tuple[Optional[Model], VSP_Result]:
|
||||
"""
|
||||
Wrapper which also stores the models along with its vsp result
|
||||
"""
|
||||
vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation)
|
||||
# NOTE: Memory optimization - Don't return model if it doens't have VSP
|
||||
vsp_result = has_vsp(model, impfunction, negation_defined)
|
||||
# NOTE: Memory optimization - Don't return model if it doesn't have VSP
|
||||
model = model if vsp_result.has_vsp else None
|
||||
return (model, vsp_result)
|
||||
|
||||
|
@ -60,8 +61,8 @@ def worker_vsp(task_queue: mp.Queue, result_queue: mp.Queue):
|
|||
# If sentinal value, break
|
||||
if task is None:
|
||||
break
|
||||
(model, impfunction, mconjunction, mdisjunction, mnegation) = task
|
||||
result = has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation)
|
||||
(model, impfunction, negation_defined) = task
|
||||
result = has_vsp_plus_model(model, impfunction, negation_defined)
|
||||
result_queue.put(result)
|
||||
finally:
|
||||
# Either an exception occured or the worker finished
|
||||
|
@ -91,37 +92,22 @@ def worker_parser(data_file_path: str, num_sentinal_values: int, task_queue: mp.
|
|||
for _ in range(num_sentinal_values):
|
||||
task_queue.put(None)
|
||||
|
||||
def multi_process_runner(num_cpu: int, data_file_path: str, skip_to: Optional[str]):
|
||||
"""
|
||||
Run VSPursuer in a multi-process configuration.
|
||||
"""
|
||||
assert num_cpu > 1
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="VSP Checker")
|
||||
parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices")
|
||||
parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file")
|
||||
parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: MAX - 2.")
|
||||
parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.")
|
||||
args = vars(parser.parse_args())
|
||||
|
||||
data_file_path = args.get("i")
|
||||
if data_file_path is None:
|
||||
data_file_path = input("Path to MaGIC Ugly Data File: ")
|
||||
|
||||
|
||||
num_cpu = args.get("c")
|
||||
if num_cpu is None:
|
||||
num_cpu = max(cpu_count() - 2, 1)
|
||||
|
||||
# Set up parallel verification
|
||||
num_tested = 0
|
||||
num_has_vsp = 0
|
||||
num_workers = max(num_cpu - 1, 1)
|
||||
num_workers = num_cpu - 1
|
||||
|
||||
# Create queues
|
||||
task_queue = mp.Queue(maxsize=1000)
|
||||
result_queue = mp.Queue()
|
||||
|
||||
# Create dedicated process to parse the MaGIC file
|
||||
process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, args.get("skip_to")))
|
||||
process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, skip_to))
|
||||
process_parser.start()
|
||||
|
||||
# Create dedicated processes which check VSP
|
||||
|
@ -135,7 +121,6 @@ if __name__ == "__main__":
|
|||
# Check results and add new tasks until finished
|
||||
result_sentinal_count = 0
|
||||
while True:
|
||||
|
||||
# Read a result
|
||||
try:
|
||||
result = result_queue.get(True, 60)
|
||||
|
@ -171,7 +156,7 @@ if __name__ == "__main__":
|
|||
|
||||
# Process result
|
||||
model, vsp_result = result
|
||||
print(vsp_result)
|
||||
print_with_timestamp(vsp_result)
|
||||
num_tested += 1
|
||||
|
||||
if vsp_result.has_vsp:
|
||||
|
@ -180,7 +165,47 @@ if __name__ == "__main__":
|
|||
if vsp_result.has_vsp:
|
||||
num_has_vsp += 1
|
||||
|
||||
print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
|
||||
|
||||
print(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
|
||||
def single_process_runner(data_file_path: str, skip_to: Optional[str]):
|
||||
num_tested = 0
|
||||
num_has_vsp = 0
|
||||
|
||||
data_file = open(data_file_path, "r")
|
||||
solutions = parse_matrices(SourceFile(data_file))
|
||||
solutions = restructure_solutions(solutions, skip_to)
|
||||
|
||||
for model, impfunction, negation_defined in solutions:
|
||||
model, vsp_result = has_vsp_plus_model(model, impfunction, negation_defined)
|
||||
print_with_timestamp(vsp_result)
|
||||
num_tested += 1
|
||||
|
||||
if vsp_result.has_vsp:
|
||||
print(model)
|
||||
|
||||
if vsp_result.has_vsp:
|
||||
num_has_vsp += 1
|
||||
|
||||
print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="VSP Checker")
|
||||
parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices")
|
||||
parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file")
|
||||
parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: 1")
|
||||
parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.")
|
||||
args = vars(parser.parse_args())
|
||||
|
||||
data_file_path = args.get("i")
|
||||
if data_file_path is None:
|
||||
data_file_path = input("Path to MaGIC Ugly Data File: ")
|
||||
|
||||
num_cpu = args.get("c")
|
||||
if num_cpu is None:
|
||||
num_cpu = 1
|
||||
|
||||
if num_cpu == 1:
|
||||
single_process_runner(data_file_path, args.get("skip_to"))
|
||||
else:
|
||||
multi_process_runner(num_cpu, data_file_path, args.get("skip_to"))
|
||||
|
|
Loading…
Add table
Reference in a new issue