mirror of
https://github.com/Brandon-Rozek/matmod.git
synced 2025-09-18 00:02:00 +00:00
Compare commits
No commits in common. "6d7fc9094af0a136172fc52bc325580620385410" and "29526dbec38afbd3f5a19f9a7df1478ed96fd5c5" have entirely different histories.
6d7fc9094a
...
29526dbec3
10 changed files with 408 additions and 726 deletions
12
R.py
12
R.py
|
@ -12,7 +12,7 @@ from logic import (
|
||||||
)
|
)
|
||||||
from model import Model, ModelFunction, ModelValue, satisfiable
|
from model import Model, ModelFunction, ModelValue, satisfiable
|
||||||
from generate_model import generate_model
|
from generate_model import generate_model
|
||||||
# from vsp import has_vsp
|
from vsp import has_vsp
|
||||||
|
|
||||||
|
|
||||||
# ===================================================
|
# ===================================================
|
||||||
|
@ -106,7 +106,7 @@ designated_values = {a1}
|
||||||
logical_operations = {
|
logical_operations = {
|
||||||
mnegation, mimplication, mconjunction, mdisjunction
|
mnegation, mimplication, mconjunction, mdisjunction
|
||||||
}
|
}
|
||||||
R_model_2 = Model(carrier_set, logical_operations, designated_values, None, "R2")
|
R_model_2 = Model(carrier_set, logical_operations, designated_values, "R2")
|
||||||
|
|
||||||
interpretation = {
|
interpretation = {
|
||||||
Negation: mnegation,
|
Negation: mnegation,
|
||||||
|
@ -132,8 +132,8 @@ solutions = generate_model(R_logic, model_size, print_model=False)
|
||||||
|
|
||||||
print(f"Found {len(solutions)} satisfiable models")
|
print(f"Found {len(solutions)} satisfiable models")
|
||||||
|
|
||||||
# for model, interpretation in solutions:
|
for model, interpretation in solutions:
|
||||||
# print(has_vsp(model, interpretation))
|
print(has_vsp(model, interpretation))
|
||||||
|
|
||||||
print("*" * 30)
|
print("*" * 30)
|
||||||
|
|
||||||
|
@ -301,7 +301,7 @@ mdisjunction = ModelFunction(2, {
|
||||||
logical_operations = {
|
logical_operations = {
|
||||||
mnegation, mimplication, mconjunction, mdisjunction
|
mnegation, mimplication, mconjunction, mdisjunction
|
||||||
}
|
}
|
||||||
R_model_6 = Model(carrier_set, logical_operations, designated_values, None, "R6")
|
R_model_6 = Model(carrier_set, logical_operations, designated_values, "R6")
|
||||||
|
|
||||||
interpretation = {
|
interpretation = {
|
||||||
Negation: mnegation,
|
Negation: mnegation,
|
||||||
|
@ -312,4 +312,4 @@ interpretation = {
|
||||||
|
|
||||||
print(R_model_6)
|
print(R_model_6)
|
||||||
print(f"Model {R_model_6.name} satisfies logic {R_logic.name}?", satisfiable(R_logic, R_model_6, interpretation))
|
print(f"Model {R_model_6.name} satisfies logic {R_logic.name}?", satisfiable(R_logic, R_model_6, interpretation))
|
||||||
# print(has_vsp(R_model_6, interpretation))
|
print(has_vsp(R_model_6, interpretation))
|
||||||
|
|
19
logic.py
19
logic.py
|
@ -27,6 +27,8 @@ class Operation:
|
||||||
class Term:
|
class Term:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
pass
|
pass
|
||||||
|
def __lt__(self, y):
|
||||||
|
return Inequation(self, y)
|
||||||
|
|
||||||
class PropositionalVariable(Term):
|
class PropositionalVariable(Term):
|
||||||
def __init__(self, name):
|
def __init__(self, name):
|
||||||
|
@ -68,6 +70,23 @@ Disjunction = Operation("∨", 2)
|
||||||
Implication = Operation("→", 2)
|
Implication = Operation("→", 2)
|
||||||
Necessitation = Operation("!", 1)
|
Necessitation = Operation("!", 1)
|
||||||
|
|
||||||
|
class Inequation:
|
||||||
|
def __init__(self, antecedant : Term, consequent: Term):
|
||||||
|
self.antecedant = antecedant
|
||||||
|
self.consequent = consequent
|
||||||
|
def __str__(self):
|
||||||
|
return str(self.antecedant) + "≤" + str(self.consequent)
|
||||||
|
|
||||||
|
class InequalityRule:
|
||||||
|
def __init__(self, premises : Set[Inequation], conclusion: Inequation):
|
||||||
|
self.premises = premises
|
||||||
|
self.conclusion = conclusion
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
str_premises = [str(p) for p in self.premises]
|
||||||
|
str_premises2 = "{" + ",".join(str_premises) + "}"
|
||||||
|
return str(str_premises2) + "=>" + str(self.conclusion)
|
||||||
|
|
||||||
class Rule:
|
class Rule:
|
||||||
def __init__(self, premises : Set[Term], conclusion: Term):
|
def __init__(self, premises : Set[Term], conclusion: Term):
|
||||||
self.premises = premises
|
self.premises = premises
|
||||||
|
|
220
model.py
220
model.py
|
@ -9,19 +9,16 @@ from logic import (
|
||||||
)
|
)
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from functools import cached_property, lru_cache, reduce
|
from functools import cached_property, lru_cache, reduce
|
||||||
from itertools import (
|
from itertools import chain, combinations_with_replacement, permutations, product
|
||||||
chain, combinations_with_replacement,
|
|
||||||
permutations, product
|
|
||||||
)
|
|
||||||
from typing import Dict, List, Optional, Set, Tuple
|
from typing import Dict, List, Optional, Set, Tuple
|
||||||
|
|
||||||
|
|
||||||
__all__ = ['ModelValue', 'ModelFunction', 'Model', 'Interpretation']
|
__all__ = ['ModelValue', 'ModelFunction', 'Model', 'Interpretation']
|
||||||
|
|
||||||
class ModelValue:
|
class ModelValue:
|
||||||
def __init__(self, name: str, hashed_value: Optional[int] = None):
|
def __init__(self, name):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.hashed_value = hashed_value if hashed_value is not None else hash(self.name)
|
self.hashed_value = hash(self.name)
|
||||||
self.__setattr__ = immutable
|
self.__setattr__ = immutable
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.name
|
return self.name
|
||||||
|
@ -30,7 +27,7 @@ class ModelValue:
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
return isinstance(other, ModelValue) and self.name == other.name
|
return isinstance(other, ModelValue) and self.name == other.name
|
||||||
def __deepcopy__(self, _):
|
def __deepcopy__(self, _):
|
||||||
return ModelValue(self.name, self.hashed_value)
|
return ModelValue(self.name)
|
||||||
|
|
||||||
class ModelFunction:
|
class ModelFunction:
|
||||||
def __init__(self, arity: int, mapping, operation_name = ""):
|
def __init__(self, arity: int, mapping, operation_name = ""):
|
||||||
|
@ -106,94 +103,18 @@ def binary_function_str(f: ModelFunction) -> str:
|
||||||
|
|
||||||
Interpretation = Dict[Operation, ModelFunction]
|
Interpretation = Dict[Operation, ModelFunction]
|
||||||
|
|
||||||
class OrderTable:
|
|
||||||
def __init__(self):
|
|
||||||
# a : {x | x <= a }
|
|
||||||
self.le_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
|
|
||||||
# a : {x | x >= a}
|
|
||||||
self.ge_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
|
|
||||||
|
|
||||||
def add(self, x, y):
|
|
||||||
"""
|
|
||||||
Add x <= y
|
|
||||||
"""
|
|
||||||
self.le_map[y].add(x)
|
|
||||||
self.ge_map[x].add(y)
|
|
||||||
|
|
||||||
def is_lt(self, x, y):
|
|
||||||
return x in self.le_map[y]
|
|
||||||
|
|
||||||
def meet(self, x, y) -> Optional[ModelValue]:
|
|
||||||
X = self.le_map[x]
|
|
||||||
Y = self.le_map[y]
|
|
||||||
|
|
||||||
candidates = X.intersection(Y)
|
|
||||||
|
|
||||||
# Grab all elements greater than each of the candidates
|
|
||||||
candidate_ge_maps = (self.ge_map[candidate] for candidate in candidates)
|
|
||||||
common_ge_values = reduce(set.intersection, candidate_ge_maps)
|
|
||||||
|
|
||||||
# Intersect with candidates to get the values that satisfy
|
|
||||||
# the meet properties
|
|
||||||
result_set = candidates.intersection(common_ge_values)
|
|
||||||
|
|
||||||
# NOTE: The meet may not exist, in which case return None
|
|
||||||
result = next(iter(result_set), None)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def join(self, x, y) -> Optional[ModelValue]:
|
|
||||||
X = self.ge_map[x]
|
|
||||||
Y = self.ge_map[y]
|
|
||||||
|
|
||||||
candidates = X.intersection(Y)
|
|
||||||
|
|
||||||
# Grab all elements smaller than each of the candidates
|
|
||||||
candidate_le_maps = (self.le_map[candidate] for candidate in candidates)
|
|
||||||
common_le_values = reduce(set.intersection, candidate_le_maps)
|
|
||||||
|
|
||||||
# Intersect with candidatse to get the values that satisfy
|
|
||||||
# the join properties
|
|
||||||
result_set = candidates.intersection(common_le_values)
|
|
||||||
|
|
||||||
# NOTE: The join may not exist, in which case return None
|
|
||||||
result = next(iter(result_set), None)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def top(self) -> Optional[ModelValue]:
|
|
||||||
ge_maps = (self.ge_map[candidate] for candidate in self.ge_map)
|
|
||||||
result_set = reduce(set.intersection, ge_maps)
|
|
||||||
|
|
||||||
# Either not unique or does not exist
|
|
||||||
if len(result_set) != 1:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return next(iter(result_set))
|
|
||||||
|
|
||||||
def bottom(self) -> Optional[ModelValue]:
|
|
||||||
le_maps = (self.le_map[candidate] for candidate in self.le_map)
|
|
||||||
result_set = reduce(set.intersection, le_maps)
|
|
||||||
|
|
||||||
# Either not unique or does not exist
|
|
||||||
if len(result_set) != 1:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return next(iter(result_set))
|
|
||||||
|
|
||||||
|
|
||||||
class Model:
|
class Model:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
carrier_set: Set[ModelValue],
|
carrier_set: Set[ModelValue],
|
||||||
logical_operations: Set[ModelFunction],
|
logical_operations: Set[ModelFunction],
|
||||||
designated_values: Set[ModelValue],
|
designated_values: Set[ModelValue],
|
||||||
ordering: Optional[OrderTable] = None,
|
|
||||||
name: Optional[str] = None
|
name: Optional[str] = None
|
||||||
):
|
):
|
||||||
assert designated_values <= carrier_set
|
assert designated_values <= carrier_set
|
||||||
self.carrier_set = carrier_set
|
self.carrier_set = carrier_set
|
||||||
self.logical_operations = logical_operations
|
self.logical_operations = logical_operations
|
||||||
self.designated_values = designated_values
|
self.designated_values = designated_values
|
||||||
self.ordering = ordering
|
|
||||||
self.name = str(abs(hash((
|
self.name = str(abs(hash((
|
||||||
frozenset(carrier_set),
|
frozenset(carrier_set),
|
||||||
frozenset(logical_operations),
|
frozenset(logical_operations),
|
||||||
|
@ -294,61 +215,86 @@ def satisfiable(logic: Logic, model: Model, interpretation: Dict[Operation, Mode
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction], forbidden_element: Optional[ModelValue]) -> Set[ModelValue]:
|
def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction], forbidden_element: Optional[ModelValue]) -> Set[ModelValue]:
|
||||||
"""
|
"""
|
||||||
Given an initial set of model values and a set of model functions,
|
Given an initial set of model values and a set of model functions,
|
||||||
compute the complete set of model values that are closed
|
compute the complete set of model values that are closed
|
||||||
under the operations.
|
under the operations.
|
||||||
|
|
||||||
If the forbidden element is encountered, then we end the saturation procedure early.
|
If top or bottom is encountered, then we end the saturation procedure early.
|
||||||
"""
|
"""
|
||||||
closure_set: Set[ModelValue] = initial_set
|
closure_set: Set[ModelValue] = initial_set
|
||||||
last_new: Set[ModelValue] = initial_set
|
last_new: Set[ModelValue] = initial_set
|
||||||
changed: bool = True
|
changed: bool = True
|
||||||
forbidden_found = False
|
forbidden_found = False
|
||||||
|
|
||||||
arities = set()
|
|
||||||
for mfun in mfunctions:
|
|
||||||
arities.add(mfun.arity)
|
|
||||||
|
|
||||||
while changed:
|
while changed:
|
||||||
changed = False
|
changed = False
|
||||||
new_elements: Set[ModelValue] = set()
|
new_elements: Set[ModelValue] = set()
|
||||||
old_closure: Set[ModelValue] = closure_set - last_new
|
old_closure: Set[ModelValue] = closure_set - last_new
|
||||||
|
|
||||||
# arity -> args
|
# arity -> args
|
||||||
args_by_arity = defaultdict(list)
|
cached_args = defaultdict(list)
|
||||||
|
|
||||||
# Motivation: We want to only compute arguments that we have not
|
# Pass elements into each model function
|
||||||
# seen before
|
|
||||||
for arity in arities:
|
|
||||||
for num_new in range(1, arity + 1):
|
|
||||||
new_args = combinations_with_replacement(last_new, r=num_new)
|
|
||||||
old_args = combinations_with_replacement(old_closure, r=arity - num_new)
|
|
||||||
# Determine every possible ordering of the concatenated new and
|
|
||||||
# old model values.
|
|
||||||
for new_arg, old_arg in product(new_args, old_args):
|
|
||||||
for combined_args in permutations(new_arg + old_arg):
|
|
||||||
args_by_arity[arity].append(combined_args)
|
|
||||||
|
|
||||||
|
|
||||||
# Pass each argument into each model function
|
|
||||||
for mfun in mfunctions:
|
for mfun in mfunctions:
|
||||||
for args in args_by_arity[mfun.arity]:
|
|
||||||
# Compute the new elements
|
|
||||||
# given the cached arguments.
|
|
||||||
element = mfun(*args)
|
|
||||||
if element not in closure_set:
|
|
||||||
new_elements.add(element)
|
|
||||||
|
|
||||||
# Optimization: Break out of computation
|
# If a previous function shared the same arity,
|
||||||
# early when forbidden element is found
|
# we'll use the same set of computed arguments
|
||||||
if forbidden_element is not None and element == forbidden_element:
|
# to pass into the model functions.
|
||||||
forbidden_found = True
|
if mfun.arity in cached_args:
|
||||||
|
for args in cached_args[mfun.arity]:
|
||||||
|
# Compute the new elements
|
||||||
|
# given the cached arguments.
|
||||||
|
element = mfun(*args)
|
||||||
|
if element not in closure_set:
|
||||||
|
new_elements.add(element)
|
||||||
|
|
||||||
|
# Optimization: Break out of computation
|
||||||
|
# early when forbidden element is found
|
||||||
|
if forbidden_element is not None and element == forbidden_element:
|
||||||
|
forbidden_found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if forbidden_found:
|
||||||
|
break
|
||||||
|
|
||||||
|
# We don't need to compute the arguments
|
||||||
|
# thanks to the cache, so move onto the
|
||||||
|
# next function.
|
||||||
|
continue
|
||||||
|
|
||||||
|
# At this point, we don't have cached arguments, so we need
|
||||||
|
# to compute this set.
|
||||||
|
|
||||||
|
# Each argument must have at least one new element to not repeat
|
||||||
|
# work. We'll range over the number of new model values within our
|
||||||
|
# argument.
|
||||||
|
for num_new in range(1, mfun.arity + 1):
|
||||||
|
new_args = combinations_with_replacement(last_new, r=num_new)
|
||||||
|
old_args = combinations_with_replacement(old_closure, r=mfun.arity - num_new)
|
||||||
|
# Determine every possible ordering of the concatenated
|
||||||
|
# new and old model values.
|
||||||
|
for new_arg, old_arg in product(new_args, old_args):
|
||||||
|
for args in permutations(new_arg + old_arg):
|
||||||
|
cached_args[mfun.arity].append(args)
|
||||||
|
element = mfun(*args)
|
||||||
|
if element not in closure_set:
|
||||||
|
new_elements.add(element)
|
||||||
|
|
||||||
|
# Optimization: Break out of computation
|
||||||
|
# early when forbidden element is found
|
||||||
|
if forbidden_element is not None and element == forbidden_element:
|
||||||
|
forbidden_found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if forbidden_found:
|
||||||
|
break
|
||||||
|
|
||||||
|
if forbidden_found:
|
||||||
break
|
break
|
||||||
|
|
||||||
if forbidden_found:
|
|
||||||
break
|
|
||||||
|
|
||||||
closure_set.update(new_elements)
|
closure_set.update(new_elements)
|
||||||
changed = len(new_elements) > 0
|
changed = len(new_elements) > 0
|
||||||
|
@ -358,47 +304,3 @@ def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction],
|
||||||
break
|
break
|
||||||
|
|
||||||
return closure_set
|
return closure_set
|
||||||
|
|
||||||
|
|
||||||
def model_equivalence(model1: Model, model2: Model, ignore_constants: bool = False) -> bool:
|
|
||||||
"""
|
|
||||||
Takes two models and determines if they are equivalent.
|
|
||||||
Assumes for the model to be equilvalent that their
|
|
||||||
value names are equivalent as well.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if model1.carrier_set != model2.carrier_set:
|
|
||||||
return False
|
|
||||||
|
|
||||||
if model1.designated_values != model2.designated_values:
|
|
||||||
return False
|
|
||||||
|
|
||||||
model1_fn_names = set()
|
|
||||||
for fn in model1.logical_operations:
|
|
||||||
if fn.arity == 0 and ignore_constants:
|
|
||||||
continue
|
|
||||||
model1_fn_names.add(fn.operation_name)
|
|
||||||
|
|
||||||
model2_fn_names = set()
|
|
||||||
for fn in model2.logical_operations:
|
|
||||||
if fn.arity == 0 and ignore_constants:
|
|
||||||
continue
|
|
||||||
model2_fn_names.add(fn.operation_name)
|
|
||||||
|
|
||||||
if model1_fn_names != model2_fn_names:
|
|
||||||
return False
|
|
||||||
|
|
||||||
for fn_name in model1_fn_names:
|
|
||||||
fn1 = next((fn for fn in model1.logical_operations if fn.operation_name == fn_name))
|
|
||||||
fn2 = next((fn for fn in model2.logical_operations if fn.operation_name == fn_name))
|
|
||||||
|
|
||||||
if fn1.arity != fn2.arity:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Check for functional equilvance
|
|
||||||
# That is for all inputs in the carrier set, the outputs are the same
|
|
||||||
for args in product(model1.carrier_set, repeat=fn1.arity):
|
|
||||||
if fn1(*args) != fn2(*args):
|
|
||||||
return False
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
285
parse_magic.py
285
parse_magic.py
|
@ -4,10 +4,9 @@ Parses the Magic Ugly Data File Format
|
||||||
Assumes the base logic is R with no extra connectives
|
Assumes the base logic is R with no extra connectives
|
||||||
"""
|
"""
|
||||||
import re
|
import re
|
||||||
from typing import TextIO, List, Iterator, Optional, Tuple, Set, Dict
|
from typing import TextIO, List, Optional, Tuple, Set, Dict
|
||||||
from itertools import product
|
|
||||||
|
|
||||||
from model import Model, ModelValue, ModelFunction, OrderTable
|
from model import Model, ModelValue, ModelFunction
|
||||||
from logic import (
|
from logic import (
|
||||||
Implication,
|
Implication,
|
||||||
Conjunction,
|
Conjunction,
|
||||||
|
@ -20,7 +19,7 @@ from logic import (
|
||||||
class SourceFile:
|
class SourceFile:
|
||||||
def __init__(self, fileobj: TextIO):
|
def __init__(self, fileobj: TextIO):
|
||||||
self.fileobj = fileobj
|
self.fileobj = fileobj
|
||||||
self.line_in_file = 0
|
self.current_line = 0
|
||||||
self.reststr = ""
|
self.reststr = ""
|
||||||
|
|
||||||
def next_line(self):
|
def next_line(self):
|
||||||
|
@ -35,7 +34,7 @@ class SourceFile:
|
||||||
return reststr
|
return reststr
|
||||||
|
|
||||||
contents = next(self.fileobj).strip()
|
contents = next(self.fileobj).strip()
|
||||||
self.line_in_file += 1
|
self.current_line += 1
|
||||||
return contents
|
return contents
|
||||||
|
|
||||||
def __next__(self):
|
def __next__(self):
|
||||||
|
@ -44,9 +43,12 @@ class SourceFile:
|
||||||
"""
|
"""
|
||||||
if self.reststr == "":
|
if self.reststr == "":
|
||||||
self.reststr = next(self.fileobj).strip()
|
self.reststr = next(self.fileobj).strip()
|
||||||
self.line_in_file += 1
|
self.current_line += 1
|
||||||
|
|
||||||
|
tokens = self.reststr.split(" ")
|
||||||
|
next_token = tokens[0]
|
||||||
|
self.reststr = " ".join(tokens[1:])
|
||||||
|
|
||||||
next_token, _, self.reststr = self.reststr.partition(" ")
|
|
||||||
return next_token
|
return next_token
|
||||||
|
|
||||||
class UglyHeader:
|
class UglyHeader:
|
||||||
|
@ -61,9 +63,8 @@ class UglyHeader:
|
||||||
class ModelBuilder:
|
class ModelBuilder:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.size : int = 0
|
self.size : int = 0
|
||||||
self.carrier_list : List[ModelValue] = []
|
self.carrier_set : Set[ModelValue] = set()
|
||||||
self.mnegation: Optional[ModelFunction] = None
|
self.mnegation: Optional[ModelFunction] = None
|
||||||
self.ordering: Optional[OrderTable] = None
|
|
||||||
self.mconjunction: Optional[ModelFunction] = None
|
self.mconjunction: Optional[ModelFunction] = None
|
||||||
self.mdisjunction: Optional[ModelFunction] = None
|
self.mdisjunction: Optional[ModelFunction] = None
|
||||||
self.designated_values: Set[ModelValue] = set()
|
self.designated_values: Set[ModelValue] = set()
|
||||||
|
@ -72,45 +73,6 @@ class ModelBuilder:
|
||||||
# Map symbol to model function
|
# Map symbol to model function
|
||||||
self.custom_model_functions: Dict[str, ModelFunction] = {}
|
self.custom_model_functions: Dict[str, ModelFunction] = {}
|
||||||
|
|
||||||
def build(self, model_name: str) -> Tuple[Model, Dict[Operation, ModelFunction]]:
|
|
||||||
"""Create Model"""
|
|
||||||
assert self.size > 0
|
|
||||||
assert self.size + 1 == len(self.carrier_list)
|
|
||||||
assert len(self.designated_values) <= len(self.carrier_list)
|
|
||||||
assert self.mimplication is not None
|
|
||||||
|
|
||||||
# Implication is required to be present
|
|
||||||
logical_operations = { self.mimplication }
|
|
||||||
interpretation = {
|
|
||||||
Implication: self.mimplication
|
|
||||||
}
|
|
||||||
|
|
||||||
# Other model functions and logical
|
|
||||||
# operations are optional
|
|
||||||
if self.mnegation is not None:
|
|
||||||
logical_operations.add(self.mnegation)
|
|
||||||
interpretation[Negation] = self.mnegation
|
|
||||||
if self.mconjunction is not None:
|
|
||||||
logical_operations.add(self.mconjunction)
|
|
||||||
interpretation[Conjunction] = self.mconjunction
|
|
||||||
if self.mdisjunction is not None:
|
|
||||||
logical_operations.add(self.mdisjunction)
|
|
||||||
interpretation[Disjunction] = self.mdisjunction
|
|
||||||
if self.mnecessitation is not None:
|
|
||||||
logical_operations.add(self.mnecessitation)
|
|
||||||
interpretation[Necessitation] = self.mnecessitation
|
|
||||||
|
|
||||||
# Custom model function definitions
|
|
||||||
for custom_mf in self.custom_model_functions.values():
|
|
||||||
if custom_mf is not None:
|
|
||||||
logical_operations.add(custom_mf)
|
|
||||||
op = Operation(custom_mf.operation_name, custom_mf.arity)
|
|
||||||
interpretation[op] = custom_mf
|
|
||||||
|
|
||||||
model = Model(set(self.carrier_list), logical_operations, self.designated_values, ordering=self.ordering, name=model_name)
|
|
||||||
return (model, interpretation)
|
|
||||||
|
|
||||||
|
|
||||||
class Stage:
|
class Stage:
|
||||||
def __init__(self, name: str):
|
def __init__(self, name: str):
|
||||||
self.name = name
|
self.name = name
|
||||||
|
@ -137,6 +99,8 @@ class Stages:
|
||||||
|
|
||||||
def add(self, name: str):
|
def add(self, name: str):
|
||||||
stage = Stage(name)
|
stage = Stage(name)
|
||||||
|
stage.next = stage
|
||||||
|
|
||||||
stage.previous = self.last_added_stage
|
stage.previous = self.last_added_stage
|
||||||
|
|
||||||
# End stage is a sink so don't
|
# End stage is a sink so don't
|
||||||
|
@ -153,16 +117,11 @@ class Stages:
|
||||||
|
|
||||||
def reset_after(self, name):
|
def reset_after(self, name):
|
||||||
"""
|
"""
|
||||||
Resets the counters of every stage after
|
Resets the stage counters after a given stage.
|
||||||
a given stage.
|
This is to accurately reflect the name of the
|
||||||
|
model within MaGIC.
|
||||||
This is to accurately reflect how names are
|
|
||||||
generated within magic.
|
|
||||||
Example: 1.1, 1.2, (reset after 1), 2.1, 2.2
|
|
||||||
"""
|
"""
|
||||||
stage = self.stages[name]
|
stage = self.stages[name]
|
||||||
# NOTE: The process_model stage doesn't
|
|
||||||
# have a counter associated with it.
|
|
||||||
while stage.name != "process_model":
|
while stage.name != "process_model":
|
||||||
stage.reset()
|
stage.reset()
|
||||||
stage = stage.next
|
stage = stage.next
|
||||||
|
@ -171,26 +130,15 @@ class Stages:
|
||||||
return self.stages[name]
|
return self.stages[name]
|
||||||
|
|
||||||
def name(self):
|
def name(self):
|
||||||
"""
|
|
||||||
Get the full name of where we are within
|
|
||||||
the parsing process. Takes into account
|
|
||||||
the stage number of all the stages seen
|
|
||||||
so far.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Stages haven't been added yet
|
|
||||||
result = ""
|
result = ""
|
||||||
stage = self.first_stage
|
stage = self.first_stage
|
||||||
if stage is None:
|
if stage is None:
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
# There's only one stage
|
|
||||||
result = f"{stage.num}"
|
result = f"{stage.num}"
|
||||||
if stage.next == "process_model":
|
if stage.next == "process_model":
|
||||||
return result
|
return result
|
||||||
|
|
||||||
# Add every subsequent stage counter
|
|
||||||
# by appending .stage_num
|
|
||||||
stage = stage.next
|
stage = stage.next
|
||||||
while stage is not None:
|
while stage is not None:
|
||||||
result += f".{stage.num}"
|
result += f".{stage.num}"
|
||||||
|
@ -221,19 +169,19 @@ def derive_stages(header: UglyHeader) -> Stages:
|
||||||
|
|
||||||
return stages
|
return stages
|
||||||
|
|
||||||
def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation, ModelFunction]]]:
|
def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]:
|
||||||
|
solutions = []
|
||||||
header = parse_header(infile)
|
header = parse_header(infile)
|
||||||
stages = derive_stages(header)
|
stages = derive_stages(header)
|
||||||
first_run = True
|
first_run = True
|
||||||
current_model_parts = ModelBuilder()
|
current_model_parts = ModelBuilder()
|
||||||
|
|
||||||
stage = stages.first_stage
|
stage = stages.first_stage
|
||||||
while True:
|
while True:
|
||||||
match stage.name:
|
match stage.name:
|
||||||
case "end":
|
case "end":
|
||||||
break
|
break
|
||||||
case "process_model":
|
case "process_model":
|
||||||
yield current_model_parts.build(stages.name())
|
process_model(stages.name(), current_model_parts, solutions)
|
||||||
stage = stage.next
|
stage = stage.next
|
||||||
case "size":
|
case "size":
|
||||||
processed = process_sizes(infile, current_model_parts, first_run)
|
processed = process_sizes(infile, current_model_parts, first_run)
|
||||||
|
@ -299,25 +247,25 @@ def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation,
|
||||||
stages.reset_after(stage.name)
|
stages.reset_after(stage.name)
|
||||||
stage = stage.previous
|
stage = stage.previous
|
||||||
|
|
||||||
|
return solutions
|
||||||
|
|
||||||
def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool:
|
def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool:
|
||||||
size: Optional[int] = None
|
|
||||||
try:
|
try:
|
||||||
size = parse_size(infile, first_run)
|
size = parse_size(infile, first_run)
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
pass
|
return False
|
||||||
|
|
||||||
if size is None:
|
if size is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
carrier_list = carrier_list_from_size(size)
|
carrier_set = carrier_set_from_size(size)
|
||||||
current_model_parts.carrier_list = carrier_list
|
current_model_parts.carrier_set = carrier_set
|
||||||
current_model_parts.size = size
|
current_model_parts.size = size
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||||
"""Stage 2 (Optional)"""
|
"""Stage 2 (Optional)"""
|
||||||
mnegation = parse_single_monadic_connective(infile, "¬", current_model_parts.size, current_model_parts.carrier_list)
|
mnegation = parse_single_monadic_connective(infile, "¬", current_model_parts.size)
|
||||||
if mnegation is None:
|
if mnegation is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -326,12 +274,11 @@ def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) ->
|
||||||
|
|
||||||
def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||||
"""Stage 3"""
|
"""Stage 3"""
|
||||||
result = parse_single_order(infile, current_model_parts.size, current_model_parts.carrier_list)
|
result = parse_single_order(infile, current_model_parts.size)
|
||||||
if result is None:
|
if result is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
ordering, mconjunction, mdisjunction = result
|
mconjunction, mdisjunction = result
|
||||||
current_model_parts.ordering = ordering
|
|
||||||
current_model_parts.mconjunction = mconjunction
|
current_model_parts.mconjunction = mconjunction
|
||||||
current_model_parts.mdisjunction = mdisjunction
|
current_model_parts.mdisjunction = mdisjunction
|
||||||
|
|
||||||
|
@ -339,7 +286,7 @@ def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> boo
|
||||||
|
|
||||||
def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||||
"""Stage 4"""
|
"""Stage 4"""
|
||||||
designated_values = parse_single_designated(infile, current_model_parts.size, current_model_parts.carrier_list)
|
designated_values = parse_single_designated(infile, current_model_parts.size)
|
||||||
if designated_values is None:
|
if designated_values is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -348,7 +295,7 @@ def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -
|
||||||
|
|
||||||
def process_implications(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
def process_implications(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||||
"""Stage 5"""
|
"""Stage 5"""
|
||||||
mimplication = parse_single_dyadic_connective(infile, "→", current_model_parts.size, current_model_parts.carrier_list)
|
mimplication = parse_single_dyadic_connective(infile, "→", current_model_parts.size)
|
||||||
if mimplication is None:
|
if mimplication is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -356,7 +303,7 @@ def process_implications(infile: SourceFile, current_model_parts: ModelBuilder)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def process_necessitations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
def process_necessitations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||||
mnecessitation = parse_single_monadic_connective(infile, "!", current_model_parts.size, current_model_parts.carrier_list)
|
mnecessitation = parse_single_monadic_connective(infile, "!", current_model_parts.size)
|
||||||
if mnecessitation is None:
|
if mnecessitation is None:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -367,9 +314,9 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
|
||||||
if adicity == 0:
|
if adicity == 0:
|
||||||
mfunction = parse_single_nullary_connective(infile, symbol)
|
mfunction = parse_single_nullary_connective(infile, symbol)
|
||||||
elif adicity == 1:
|
elif adicity == 1:
|
||||||
mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list)
|
mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size)
|
||||||
elif adicity == 2:
|
elif adicity == 2:
|
||||||
mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list)
|
mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size)
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError("Unable to process connectives of adicity greater than 2")
|
raise NotImplementedError("Unable to process connectives of adicity greater than 2")
|
||||||
|
|
||||||
|
@ -379,6 +326,38 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
|
||||||
current_model_parts.custom_model_functions[symbol] = mfunction
|
current_model_parts.custom_model_functions[symbol] = mfunction
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Model, Dict]]):
|
||||||
|
"""Create Model"""
|
||||||
|
assert mp.size > 0
|
||||||
|
assert mp.size + 1 == len(mp.carrier_set)
|
||||||
|
assert len(mp.designated_values) <= len(mp.carrier_set)
|
||||||
|
assert mp.mimplication is not None
|
||||||
|
|
||||||
|
logical_operations = { mp.mimplication }
|
||||||
|
model = Model(mp.carrier_set, logical_operations, mp.designated_values, name=model_name)
|
||||||
|
interpretation = {
|
||||||
|
Implication: mp.mimplication
|
||||||
|
}
|
||||||
|
if mp.mnegation is not None:
|
||||||
|
logical_operations.add(mp.mnegation)
|
||||||
|
interpretation[Negation] = mp.mnegation
|
||||||
|
if mp.mconjunction is not None:
|
||||||
|
logical_operations.add(mp.mconjunction)
|
||||||
|
interpretation[Conjunction] = mp.mconjunction
|
||||||
|
if mp.mdisjunction is not None:
|
||||||
|
logical_operations.add(mp.mdisjunction)
|
||||||
|
interpretation[Disjunction] = mp.mdisjunction
|
||||||
|
if mp.mnecessitation is not None:
|
||||||
|
logical_operations.add(mp.mnecessitation)
|
||||||
|
interpretation[Necessitation] = mp.mnecessitation
|
||||||
|
|
||||||
|
for custom_mf in mp.custom_model_functions.values():
|
||||||
|
if custom_mf is not None:
|
||||||
|
logical_operations.add(custom_mf)
|
||||||
|
op = Operation(custom_mf.operation_name, custom_mf.arity)
|
||||||
|
interpretation[op] = custom_mf
|
||||||
|
|
||||||
|
solutions.append((model, interpretation))
|
||||||
|
|
||||||
def parse_header(infile: SourceFile) -> UglyHeader:
|
def parse_header(infile: SourceFile) -> UglyHeader:
|
||||||
"""
|
"""
|
||||||
|
@ -399,19 +378,20 @@ def parse_header(infile: SourceFile) -> UglyHeader:
|
||||||
custom_model_functions.append((arity, symbol))
|
custom_model_functions.append((arity, symbol))
|
||||||
return UglyHeader(negation_defined, necessitation_defined, custom_model_functions)
|
return UglyHeader(negation_defined, necessitation_defined, custom_model_functions)
|
||||||
|
|
||||||
def carrier_list_from_size(size: int) -> List[ModelValue]:
|
def carrier_set_from_size(size: int) -> Set[ModelValue]:
|
||||||
"""
|
"""
|
||||||
Construct a carrier set of model values
|
Construct a carrier set of model values
|
||||||
based on the desired size.
|
based on the desired size.
|
||||||
"""
|
"""
|
||||||
return [
|
return {
|
||||||
mvalue_from_index(i) for i in range(size + 1)
|
mvalue_from_index(i) for i in range(size + 1)
|
||||||
]
|
}
|
||||||
|
|
||||||
def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
|
def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
|
||||||
"""
|
"""
|
||||||
Parse the line representing the matrix size.
|
Parse the line representing the matrix size.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
size = int(infile.next_line())
|
size = int(infile.next_line())
|
||||||
|
|
||||||
# HACK: When necessitation and custom connectives are enabled
|
# HACK: When necessitation and custom connectives are enabled
|
||||||
|
@ -422,9 +402,7 @@ def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
|
||||||
|
|
||||||
if size == -1:
|
if size == -1:
|
||||||
return None
|
return None
|
||||||
|
assert size > 0, f"Unexpected size at line {infile.current_line}"
|
||||||
assert size > 0, f"Unexpected size at line {infile.line_in_file}"
|
|
||||||
|
|
||||||
return size
|
return size
|
||||||
|
|
||||||
def mvalue_from_index(i: int) -> ModelValue:
|
def mvalue_from_index(i: int) -> ModelValue:
|
||||||
|
@ -440,9 +418,55 @@ def parse_mvalue(x: str) -> ModelValue:
|
||||||
"""
|
"""
|
||||||
return mvalue_from_index(int(x))
|
return mvalue_from_index(int(x))
|
||||||
|
|
||||||
|
def determine_cresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue:
|
||||||
|
"""
|
||||||
|
Determine what a ∧ b should be given the ordering table.
|
||||||
|
"""
|
||||||
|
for i in range(size + 1):
|
||||||
|
c = mvalue_from_index(i)
|
||||||
|
|
||||||
def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[
|
if not ordering[(c, a)]:
|
||||||
Tuple[OrderTable, Optional[ModelFunction], Optional[ModelFunction]]]:
|
continue
|
||||||
|
if not ordering[(c, b)]:
|
||||||
|
continue
|
||||||
|
|
||||||
|
invalid = False
|
||||||
|
for j in range(size + 1):
|
||||||
|
d = mvalue_from_index(j)
|
||||||
|
if c == d:
|
||||||
|
continue
|
||||||
|
if ordering[(c, d)]:
|
||||||
|
if ordering[(d, a)] and ordering [(d, b)]:
|
||||||
|
invalid = True
|
||||||
|
|
||||||
|
if not invalid:
|
||||||
|
return c
|
||||||
|
|
||||||
|
def determine_dresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue:
|
||||||
|
"""
|
||||||
|
Determine what a ∨ b should be given the ordering table.
|
||||||
|
"""
|
||||||
|
for i in range(size + 1):
|
||||||
|
c = mvalue_from_index(i)
|
||||||
|
if not ordering[(a, c)]:
|
||||||
|
continue
|
||||||
|
if not ordering[(b, c)]:
|
||||||
|
continue
|
||||||
|
|
||||||
|
invalid = False
|
||||||
|
|
||||||
|
for j in range(size + 1):
|
||||||
|
d = mvalue_from_index(j)
|
||||||
|
if d == c:
|
||||||
|
continue
|
||||||
|
if ordering[(d, c)]:
|
||||||
|
if ordering[(a, d)] and ordering[(b, d)]:
|
||||||
|
invalid = True
|
||||||
|
|
||||||
|
if not invalid:
|
||||||
|
return c
|
||||||
|
|
||||||
|
def parse_single_order(infile: SourceFile, size: int) -> Optional[Tuple[ModelFunction, ModelFunction]]:
|
||||||
"""
|
"""
|
||||||
Parse the line representing the ordering table
|
Parse the line representing the ordering table
|
||||||
"""
|
"""
|
||||||
|
@ -452,38 +476,47 @@ def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelVa
|
||||||
|
|
||||||
table = line.split(" ")
|
table = line.split(" ")
|
||||||
|
|
||||||
assert len(table) == (size + 1)**2, f"Order table doesn't match expected size at line {infile.line_in_file}"
|
assert len(table) == (size + 1)**2, f"Order table doesn't match expected size at line {infile.current_line}"
|
||||||
|
|
||||||
ordering = OrderTable()
|
|
||||||
omapping = {}
|
omapping = {}
|
||||||
table_i = 0
|
table_i = 0
|
||||||
|
|
||||||
for x, y in product(carrier_list, carrier_list):
|
for i in range(size + 1):
|
||||||
omapping[(x, y)] = table[table_i] == '1'
|
x = mvalue_from_index(i)
|
||||||
if table[table_i] == '1':
|
for j in range(size + 1):
|
||||||
ordering.add(x, y)
|
y = mvalue_from_index(j)
|
||||||
table_i += 1
|
omapping[(x, y)] = table[table_i] == '1'
|
||||||
|
table_i += 1
|
||||||
|
|
||||||
cmapping = {}
|
cmapping = {}
|
||||||
dmapping = {}
|
dmapping = {}
|
||||||
|
|
||||||
for x, y in product(carrier_list, carrier_list):
|
for i in range(size + 1):
|
||||||
cresult = ordering.meet(x, y)
|
x = mvalue_from_index(i)
|
||||||
if cresult is None:
|
for j in range(size + 1):
|
||||||
return ordering, None, None
|
y = mvalue_from_index(j)
|
||||||
cmapping[(x, y)] = cresult
|
|
||||||
|
cresult = determine_cresult(size, omapping, x, y)
|
||||||
|
if cresult is None:
|
||||||
|
print("[Warning] Conjunction and Disjunction are not well-defined")
|
||||||
|
print(f"{x} ∧ {y} = ??")
|
||||||
|
return None, None
|
||||||
|
cmapping[(x, y)] = cresult
|
||||||
|
|
||||||
|
dresult = determine_dresult(size, omapping, x, y)
|
||||||
|
if dresult is None:
|
||||||
|
print("[Warning] Conjunction and Disjunction are not well-defined")
|
||||||
|
print(f"{x} ∨ {y} = ??")
|
||||||
|
return None, None
|
||||||
|
dmapping[(x, y)] = dresult
|
||||||
|
|
||||||
dresult = ordering.join(x, y)
|
|
||||||
if dresult is None:
|
|
||||||
return ordering, None, None
|
|
||||||
dmapping[(x, y)] = dresult
|
|
||||||
|
|
||||||
mconjunction = ModelFunction(2, cmapping, "∧")
|
mconjunction = ModelFunction(2, cmapping, "∧")
|
||||||
mdisjunction = ModelFunction(2, dmapping, "∨")
|
mdisjunction = ModelFunction(2, dmapping, "∨")
|
||||||
|
|
||||||
return ordering, mconjunction, mdisjunction
|
return mconjunction, mdisjunction
|
||||||
|
|
||||||
def parse_single_designated(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[Set[ModelValue]]:
|
def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[ModelValue]]:
|
||||||
"""
|
"""
|
||||||
Parse the line representing which model values are designated.
|
Parse the line representing which model values are designated.
|
||||||
"""
|
"""
|
||||||
|
@ -492,12 +525,13 @@ def parse_single_designated(infile: SourceFile, size: int, carrier_list: List[Mo
|
||||||
return None
|
return None
|
||||||
|
|
||||||
row = line.split(" ")
|
row = line.split(" ")
|
||||||
assert len(row) == size + 1, f"Designated table doesn't match expected size at line {infile.line_in_file}"
|
assert len(row) == size + 1, f"Designated table doesn't match expected size at line {infile.current_line}"
|
||||||
|
|
||||||
designated_values = set()
|
designated_values = set()
|
||||||
|
|
||||||
for x, j in zip(carrier_list, row):
|
for i, j in zip(range(size + 1), row):
|
||||||
if j == '1':
|
if j == '1':
|
||||||
|
x = mvalue_from_index(i)
|
||||||
designated_values.add(x)
|
designated_values.add(x)
|
||||||
|
|
||||||
return designated_values
|
return designated_values
|
||||||
|
@ -509,28 +543,29 @@ def parse_single_nullary_connective(infile: SourceFile, symbol: str) -> Optional
|
||||||
return None
|
return None
|
||||||
|
|
||||||
row = line.split(" ")
|
row = line.split(" ")
|
||||||
assert len(row) == 1, f"More than one assignment for a nullary connective was provided at line {infile.line_in_file}"
|
assert len(row) == 1, f"More than one assignment for a nullary connective was provided at line {infile.current_line}"
|
||||||
|
|
||||||
mapping = {}
|
mapping = {}
|
||||||
mapping[()] = parse_mvalue(row[0])
|
mapping[()] = parse_mvalue(row[0])
|
||||||
return ModelFunction(0, mapping, symbol)
|
return ModelFunction(0, mapping, symbol)
|
||||||
|
|
||||||
def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]:
|
def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]:
|
||||||
line = infile.next_line()
|
line = infile.next_line()
|
||||||
if line == '-1':
|
if line == '-1':
|
||||||
return None
|
return None
|
||||||
|
|
||||||
row = line.split(" ")
|
row = line.split(" ")
|
||||||
assert len(row) == size + 1, f"{symbol} table doesn't match size at line {infile.line_in_file}"
|
assert len(row) == size + 1, f"{symbol} table doesn't match size at line {infile.current_line}"
|
||||||
mapping = {}
|
mapping = {}
|
||||||
|
|
||||||
for x, j in zip(carrier_list, row):
|
for i, j in zip(range(size + 1), row):
|
||||||
|
x = mvalue_from_index(i)
|
||||||
y = parse_mvalue(j)
|
y = parse_mvalue(j)
|
||||||
mapping[(x, )] = y
|
mapping[(x, )] = y
|
||||||
|
|
||||||
return ModelFunction(1, mapping, symbol)
|
return ModelFunction(1, mapping, symbol)
|
||||||
|
|
||||||
def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]:
|
def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]:
|
||||||
first_token = next(infile)
|
first_token = next(infile)
|
||||||
if first_token == "-1":
|
if first_token == "-1":
|
||||||
return None
|
return None
|
||||||
|
@ -539,14 +574,20 @@ def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int, c
|
||||||
try:
|
try:
|
||||||
table = [first_token] + [next(infile) for _ in range((size + 1)**2 - 1)]
|
table = [first_token] + [next(infile) for _ in range((size + 1)**2 - 1)]
|
||||||
except StopIteration:
|
except StopIteration:
|
||||||
raise Exception(f"{symbol} table does not match expected size at line {infile.line_in_file}")
|
pass
|
||||||
|
|
||||||
|
assert len(table) == (size + 1)**2, f"{symbol} table does not match expected size at line {infile.current_line}"
|
||||||
|
|
||||||
mapping = {}
|
mapping = {}
|
||||||
table_i = 0
|
table_i = 0
|
||||||
|
for i in range(size + 1):
|
||||||
|
x = mvalue_from_index(i)
|
||||||
|
for j in range(size + 1):
|
||||||
|
y = mvalue_from_index(j)
|
||||||
|
|
||||||
for x, y in product(carrier_list, carrier_list):
|
r = parse_mvalue(table[table_i])
|
||||||
r = parse_mvalue(table[table_i])
|
table_i += 1
|
||||||
table_i += 1
|
|
||||||
mapping[(x, y)] = r
|
mapping[(x, y)] = r
|
||||||
|
|
||||||
return ModelFunction(2, mapping, symbol)
|
return ModelFunction(2, mapping, symbol)
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
This folder contains scripts that may be used during experimentation. These are intended to be used ad-hoc and we do not guarentee the maitainance of these scripts.
|
|
|
@ -1,105 +0,0 @@
|
||||||
"""
|
|
||||||
Given two MaGIC ugly data files that correspond to
|
|
||||||
the same logic. Report any differences in the models
|
|
||||||
that exhibit VSP.
|
|
||||||
|
|
||||||
Overall process:
|
|
||||||
- Determine which models in file 1 have VSP
|
|
||||||
- Print if model does not exist in file 2
|
|
||||||
- For models in file 2 that were not already encountered for,
|
|
||||||
check if they have VSP.
|
|
||||||
- Print models that do
|
|
||||||
"""
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
|
|
||||||
from model import model_equivalence
|
|
||||||
from parse_magic import SourceFile, parse_matrices
|
|
||||||
from vsp import has_vsp
|
|
||||||
from vspursuer import restructure_solutions
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = argparse.ArgumentParser(description="Compare models that have VSP in two ugly files")
|
|
||||||
parser.add_argument("ugly1", type=str, help="First ugly data file")
|
|
||||||
parser.add_argument("ugly2", type=str, help="Second ugly data file")
|
|
||||||
parser.add_argument("--ignore-constants", action='store_true', help="When it comes to model equivalance, ignore constants")
|
|
||||||
args = vars(parser.parse_args())
|
|
||||||
|
|
||||||
data_file1 = open(args['ugly1'], "r")
|
|
||||||
solutions1 = parse_matrices(SourceFile(data_file1))
|
|
||||||
solutions1 = restructure_solutions(solutions1, None)
|
|
||||||
|
|
||||||
data_file2 = open(args['ugly2'], "r")
|
|
||||||
solutions2 = parse_matrices(SourceFile(data_file2))
|
|
||||||
solutions2 = list(restructure_solutions(solutions2, None))
|
|
||||||
|
|
||||||
ignore_constants = args.get("ignore_constants", False)
|
|
||||||
|
|
||||||
# Total count of models
|
|
||||||
total_models1 = 0
|
|
||||||
total_models2 = 0
|
|
||||||
|
|
||||||
# Models that exhibit VSP
|
|
||||||
good_models1 = 0
|
|
||||||
good_models2 = 0
|
|
||||||
|
|
||||||
# Models that don't exhibit VSP
|
|
||||||
bad_models1 = 0
|
|
||||||
bad_models2 = 0
|
|
||||||
|
|
||||||
# Models that exhibit VSP but does
|
|
||||||
# not exist in the other file.
|
|
||||||
extra_models1 = 0
|
|
||||||
extra_models2 = 0
|
|
||||||
|
|
||||||
for model, impfunction, negation_defined in solutions1:
|
|
||||||
total_models1 += 1
|
|
||||||
vsp_result = has_vsp(model, impfunction, negation_defined)
|
|
||||||
|
|
||||||
if vsp_result.has_vsp:
|
|
||||||
good_models1 += 1
|
|
||||||
# Check to see if model exists in file 2
|
|
||||||
match_found_index = (False, -1)
|
|
||||||
for i in range(len(solutions2) - 1, -1, -1):
|
|
||||||
if model_equivalence(model, solutions2[i][0], ignore_constants):
|
|
||||||
match_found_index = (True, i)
|
|
||||||
break
|
|
||||||
|
|
||||||
if match_found_index[0]:
|
|
||||||
# If so, remove the model from the second set
|
|
||||||
total_models2 += 1
|
|
||||||
good_models2 += 1
|
|
||||||
del solutions2[match_found_index[1]]
|
|
||||||
else:
|
|
||||||
extra_models1 += 1
|
|
||||||
print(f"VSP Model {model.name} not found in file 2.")
|
|
||||||
print(model)
|
|
||||||
else:
|
|
||||||
bad_models1 += 1
|
|
||||||
|
|
||||||
|
|
||||||
# Check through the remaining models in the second set
|
|
||||||
for model, impfunction, negation_defined in solutions2:
|
|
||||||
total_models2 += 1
|
|
||||||
vsp_result = has_vsp(model, impfunction, negation_defined)
|
|
||||||
|
|
||||||
if not vsp_result.has_vsp:
|
|
||||||
bad_models2 += 1
|
|
||||||
else:
|
|
||||||
print("VSP model", model.name, "does not appear in file 1")
|
|
||||||
good_models2 += 1
|
|
||||||
extra_models2 += 1
|
|
||||||
|
|
||||||
|
|
||||||
print("File 1 has a total of", total_models1, "models.")
|
|
||||||
print("Out of which,", good_models1, "exhibit VSP while", bad_models1, "do not.")
|
|
||||||
print("File 1 has a total of", extra_models1, "which exhibit VSP but do not appear in file 2.")
|
|
||||||
|
|
||||||
print("")
|
|
||||||
print("File 2 has a total of", total_models2, "models")
|
|
||||||
print("Out of which,", good_models2, "exhibit VSP while", bad_models2, "do not.")
|
|
||||||
print("File 2 has a total of", extra_models2, "which exhibit VSP but do not appear in file 1.")
|
|
|
@ -1,54 +0,0 @@
|
||||||
"""
|
|
||||||
Given a model, create a Hasse diagram.
|
|
||||||
|
|
||||||
Note: This has a dependency on the hasse-diagram library
|
|
||||||
https://pypi.org/project/hasse-diagram/
|
|
||||||
"""
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
||||||
|
|
||||||
from model import Model
|
|
||||||
from parse_magic import SourceFile, parse_matrices
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import hassediagram
|
|
||||||
|
|
||||||
__all__ = ['plot_model_hassee']
|
|
||||||
|
|
||||||
def plot_model_hassee(model: Model):
|
|
||||||
assert model.ordering is not None
|
|
||||||
carrier_list = list(model.carrier_set)
|
|
||||||
hasse_ordering = []
|
|
||||||
for elem1 in carrier_list:
|
|
||||||
elem_ordering = []
|
|
||||||
for elem2 in carrier_list:
|
|
||||||
elem_ordering.append(
|
|
||||||
1 if model.ordering.is_lt(elem1, elem2) else 0
|
|
||||||
)
|
|
||||||
hasse_ordering.append(elem_ordering)
|
|
||||||
hassediagram.plot_hasse(np.array(hasse_ordering), carrier_list)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = argparse.ArgumentParser(description="Show hassee diagram for model")
|
|
||||||
parser.add_argument("uglyfile", type=str, help="Path to ugly data file")
|
|
||||||
parser.add_argument("modelname", type=str, help="Name of model within file")
|
|
||||||
args = vars(parser.parse_args())
|
|
||||||
|
|
||||||
data_file = open(args['uglyfile'], "r")
|
|
||||||
solutions = parse_matrices(SourceFile(data_file))
|
|
||||||
|
|
||||||
requested_model = None
|
|
||||||
|
|
||||||
for model, _ in solutions:
|
|
||||||
if model.name == args['modelname']:
|
|
||||||
requested_model = model
|
|
||||||
break
|
|
||||||
|
|
||||||
if requested_model is None:
|
|
||||||
print("Model name", args['modelname'], "not found.")
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
plot_model_hassee(requested_model)
|
|
|
@ -1,30 +0,0 @@
|
||||||
"""
|
|
||||||
Print a model given it's name
|
|
||||||
and ugly data file
|
|
||||||
"""
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
||||||
|
|
||||||
from parse_magic import SourceFile, parse_matrices
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = argparse.ArgumentParser(description="Show hassee diagram for model")
|
|
||||||
parser.add_argument("uglyfile", type=str, help="Path to ugly data file")
|
|
||||||
parser.add_argument("modelname", type=str, help="Name of model within file")
|
|
||||||
args = vars(parser.parse_args())
|
|
||||||
|
|
||||||
data_file = open(args['uglyfile'], "r")
|
|
||||||
solutions = parse_matrices(SourceFile(data_file))
|
|
||||||
|
|
||||||
model_found = False
|
|
||||||
|
|
||||||
for model, _ in solutions:
|
|
||||||
if model.name == args['modelname']:
|
|
||||||
model_found = True
|
|
||||||
print(model)
|
|
||||||
break
|
|
||||||
|
|
||||||
if not model_found:
|
|
||||||
print("Model", args['modelname'], "not found.")
|
|
181
vsp.py
181
vsp.py
|
@ -2,12 +2,83 @@
|
||||||
Check to see if the model has the variable
|
Check to see if the model has the variable
|
||||||
sharing property.
|
sharing property.
|
||||||
"""
|
"""
|
||||||
from itertools import product
|
from itertools import chain, combinations, product
|
||||||
from typing import List, Optional, Set, Tuple
|
from typing import Dict, List, Optional, Set, Tuple
|
||||||
from common import set_to_str
|
from common import set_to_str
|
||||||
from model import (
|
from model import (
|
||||||
Model, model_closure, ModelFunction, ModelValue
|
Model, model_closure, ModelFunction, ModelValue
|
||||||
)
|
)
|
||||||
|
from logic import Conjunction, Disjunction, Implication, Operation
|
||||||
|
|
||||||
|
def preseed(
|
||||||
|
initial_set: Set[ModelValue],
|
||||||
|
cache:List[Tuple[Set[ModelValue], Set[ModelValue]]]):
|
||||||
|
"""
|
||||||
|
Given a cache of previous model_closure calls,
|
||||||
|
use this to compute an initial model closure
|
||||||
|
set based on the initial set.
|
||||||
|
|
||||||
|
Basic Idea:
|
||||||
|
Let {1, 2, 3} -> X be in the cache.
|
||||||
|
If {1,2,3} is a subset of initial set,
|
||||||
|
then X is the subset of the output of model_closure.
|
||||||
|
|
||||||
|
This is used to speed up subsequent calls to model_closure
|
||||||
|
"""
|
||||||
|
candidate_preseed: Tuple[Set[ModelValue], int] = (None, None)
|
||||||
|
|
||||||
|
for i, o in cache:
|
||||||
|
if i < initial_set:
|
||||||
|
cost = len(initial_set - i)
|
||||||
|
# If i is a subset with less missing elements than
|
||||||
|
# the previous candidate, then it's the new candidate.
|
||||||
|
if candidate_preseed[1] is None or cost < candidate_preseed[1]:
|
||||||
|
candidate_preseed = o, cost
|
||||||
|
|
||||||
|
same_set = candidate_preseed[1] == 0
|
||||||
|
return candidate_preseed[0], same_set
|
||||||
|
|
||||||
|
|
||||||
|
def find_top(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]:
|
||||||
|
"""
|
||||||
|
Find the top of the order lattice.
|
||||||
|
T || a = T, T && a = a for all a in the carrier set
|
||||||
|
"""
|
||||||
|
if mconjunction is None or mdisjunction is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
for x in algebra:
|
||||||
|
is_top = True
|
||||||
|
for y in algebra:
|
||||||
|
if mdisjunction(x, y) != x or mconjunction(x, y) != y:
|
||||||
|
is_top = False
|
||||||
|
break
|
||||||
|
if is_top:
|
||||||
|
return x
|
||||||
|
|
||||||
|
print("[Warning] Failed to find the top of the lattice")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def find_bottom(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]:
|
||||||
|
"""
|
||||||
|
Find the bottom of the order lattice
|
||||||
|
F || a = a, F && a = F for all a in the carrier set
|
||||||
|
"""
|
||||||
|
if mconjunction is None or mdisjunction is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
for x in algebra:
|
||||||
|
is_bottom = True
|
||||||
|
for y in algebra:
|
||||||
|
if mdisjunction(x, y) != y or mconjunction(x, y) != x:
|
||||||
|
is_bottom = False
|
||||||
|
break
|
||||||
|
if is_bottom:
|
||||||
|
return x
|
||||||
|
|
||||||
|
print("[Warning] Failed to find the bottom of the lattice")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
class VSP_Result:
|
class VSP_Result:
|
||||||
def __init__(
|
def __init__(
|
||||||
|
@ -27,90 +98,98 @@ Subalgebra 1: {set_to_str(self.subalgebra1)}
|
||||||
Subalgebra 2: {set_to_str(self.subalgebra2)}
|
Subalgebra 2: {set_to_str(self.subalgebra2)}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def has_vsp(model: Model, impfunction: ModelFunction,
|
def has_vsp(model: Model, interpretation: Dict[Operation, ModelFunction]) -> VSP_Result:
|
||||||
negation_defined: bool) -> VSP_Result:
|
|
||||||
"""
|
"""
|
||||||
Checks whether a model has the variable
|
Checks whether a model has the variable
|
||||||
sharing property.
|
sharing property.
|
||||||
"""
|
"""
|
||||||
|
impfunction = interpretation[Implication]
|
||||||
|
mconjunction = interpretation.get(Conjunction)
|
||||||
|
mdisjunction = interpretation.get(Disjunction)
|
||||||
|
top = find_top(model.carrier_set, mconjunction, mdisjunction)
|
||||||
|
bottom = find_bottom(model.carrier_set, mconjunction, mdisjunction)
|
||||||
|
|
||||||
# NOTE: No models with only one designated
|
# NOTE: No models with only one designated
|
||||||
# value satisfies VSP
|
# value satisfies VSP
|
||||||
if len(model.designated_values) == 1:
|
if len(model.designated_values) == 1:
|
||||||
return VSP_Result(False, model.name)
|
return VSP_Result(False, model.name)
|
||||||
|
|
||||||
assert model.ordering is not None, "Expected ordering table in model"
|
|
||||||
|
|
||||||
top = model.ordering.top()
|
|
||||||
bottom = model.ordering.bottom()
|
|
||||||
|
|
||||||
# Compute I the set of tuples (x, y) where
|
# Compute I the set of tuples (x, y) where
|
||||||
# x -> y does not take a designiated value
|
# x -> y does not take a designiated value
|
||||||
I: List[Tuple[ModelValue, ModelValue]] = []
|
I: Set[Tuple[ModelValue, ModelValue]] = set()
|
||||||
|
|
||||||
for (x, y) in product(model.carrier_set, model.carrier_set):
|
for (x, y) in product(model.carrier_set, model.carrier_set):
|
||||||
if impfunction(x, y) not in model.designated_values:
|
if impfunction(x, y) not in model.designated_values:
|
||||||
I.append((x, y))
|
I.add((x, y))
|
||||||
|
|
||||||
|
# Construct the powerset of I without the empty set
|
||||||
|
s = list(I)
|
||||||
|
I_power = chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1))
|
||||||
|
# ((x1, y1)), ((x1, y1), (x2, y2)), ...
|
||||||
|
|
||||||
|
# Closure cache
|
||||||
|
closure_cache: List[Tuple[Set[ModelValue], Set[ModelValue]]] = []
|
||||||
|
|
||||||
# Find the subalgebras which falsify implication
|
# Find the subalgebras which falsify implication
|
||||||
for xys in I:
|
for xys in I_power:
|
||||||
|
|
||||||
xi = xys[0]
|
xs = {xy[0] for xy in xys}
|
||||||
|
orig_xs = xs
|
||||||
|
cached_xs = preseed(xs, closure_cache)
|
||||||
|
if cached_xs[0] is not None:
|
||||||
|
xs |= cached_xs[0]
|
||||||
|
|
||||||
# Discard ({⊥} ∪ A', B) subalgebras
|
ys = {xy[1] for xy in xys}
|
||||||
if bottom is not None and xi == bottom:
|
orig_ys = ys
|
||||||
|
cached_ys = preseed(ys, closure_cache)
|
||||||
|
if cached_ys[0] is not None:
|
||||||
|
ys |= cached_ys[0]
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE: Optimziation before model_closure
|
||||||
|
# If the two subalgebras intersect, move
|
||||||
|
# onto the next pair
|
||||||
|
if len(xs & ys) > 0:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Discard ({⊤} ∪ A', B) subalgebras when negation is defined
|
# NOTE: Optimization
|
||||||
if top is not None and negation_defined and xi == top:
|
# If the left subalgebra contains bottom
|
||||||
|
# or the right subalgebra contains top
|
||||||
|
# skip this pair
|
||||||
|
if top is not None and top in ys:
|
||||||
|
continue
|
||||||
|
if bottom is not None and bottom in xs:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
yi = xys[1]
|
# Compute the closure of all operations
|
||||||
|
# with just the xs
|
||||||
|
carrier_set_left: Set[ModelValue] = model_closure(xs, model.logical_operations, bottom)
|
||||||
|
|
||||||
# Discard (A, {⊤} ∪ B') subalgebras
|
# Save to cache
|
||||||
if top is not None and yi == top:
|
if cached_xs[0] is not None and not cached_ys[1]:
|
||||||
continue
|
closure_cache.append((orig_xs, carrier_set_left))
|
||||||
|
|
||||||
# Discard (A, {⊥} ∪ B') subalgebras when negation is defined
|
|
||||||
if bottom is not None and negation_defined and yi == bottom:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Discard ({a} ∪ A', {b} ∪ B') subalgebras when a <= b
|
|
||||||
if model.ordering.is_lt(xi, yi):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Discard ({a} ∪ A', {b} ∪ B') subalgebras when b <= a and negation is defined
|
|
||||||
if negation_defined and model.ordering.is_lt(yi, xi):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Compute the left closure of the set containing xi under all the operations
|
|
||||||
carrier_set_left: Set[ModelValue] = model_closure({xi,}, model.logical_operations, bottom)
|
|
||||||
|
|
||||||
# Discard ({⊥} ∪ A', B) subalgebras
|
|
||||||
if bottom is not None and bottom in carrier_set_left:
|
if bottom is not None and bottom in carrier_set_left:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Discard ({⊤} ∪ A', B) subalgebras when negation is defined
|
|
||||||
if top is not None and negation_defined and top in carrier_set_left:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Compute the closure of all operations
|
# Compute the closure of all operations
|
||||||
# with just the ys
|
# with just the ys
|
||||||
carrier_set_right: Set[ModelValue] = model_closure({yi,}, model.logical_operations, top)
|
carrier_set_right: Set[ModelValue] = model_closure(ys, model.logical_operations, top)
|
||||||
|
|
||||||
|
# Save to cache
|
||||||
|
if cached_ys[0] is not None and not cached_ys[1]:
|
||||||
|
closure_cache.append((orig_ys, carrier_set_right))
|
||||||
|
|
||||||
# Discard (A, {⊤} ∪ B') subalgebras
|
|
||||||
if top is not None and top in carrier_set_right:
|
if top is not None and top in carrier_set_right:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Discard (A, {⊥} ∪ B') subalgebras when negation is defined
|
# If the carrier set intersects, then move on to the next
|
||||||
if bottom is not None and negation_defined and bottom in carrier_set_right:
|
# subalgebra
|
||||||
|
if len(carrier_set_left & carrier_set_right) > 0:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Discard subalgebras that intersect
|
# See if for all pairs in the subalgebras, that
|
||||||
if not carrier_set_left.isdisjoint(carrier_set_right):
|
# implication is falsified
|
||||||
continue
|
|
||||||
|
|
||||||
# Check whether for all pairs in the subalgebra,
|
|
||||||
# that implication is falsified.
|
|
||||||
falsified = True
|
falsified = True
|
||||||
for (x2, y2) in product(carrier_set_left, carrier_set_right):
|
for (x2, y2) in product(carrier_set_left, carrier_set_right):
|
||||||
if impfunction(x2, y2) in model.designated_values:
|
if impfunction(x2, y2) in model.designated_values:
|
||||||
|
|
227
vspursuer.py
227
vspursuer.py
|
@ -1,214 +1,45 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
from os import cpu_count
|
||||||
from datetime import datetime
|
|
||||||
from typing import Dict, Iterator, Optional, Tuple
|
|
||||||
from queue import Empty as QueueEmpty
|
|
||||||
import argparse
|
import argparse
|
||||||
import multiprocessing as mp
|
import multiprocessing
|
||||||
|
|
||||||
from logic import Negation, Implication, Operation
|
from parse_magic import (
|
||||||
from model import Model, ModelFunction
|
SourceFile,
|
||||||
from parse_magic import SourceFile, parse_matrices
|
parse_matrices
|
||||||
|
)
|
||||||
from vsp import has_vsp, VSP_Result
|
from vsp import has_vsp, VSP_Result
|
||||||
|
|
||||||
def print_with_timestamp(message):
|
|
||||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
print(f"[{current_time}] {message}", flush=True)
|
|
||||||
|
|
||||||
def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], skip_to: Optional[str]) -> \
|
|
||||||
Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]:
|
|
||||||
"""
|
|
||||||
When subprocess gets spawned, the logical operations will
|
|
||||||
have a different memory address than what's expected in interpretation.
|
|
||||||
Therefore, we need to pass the model functions directly instead.
|
|
||||||
|
|
||||||
While we're at it, filter out models until we get to --skip-to
|
|
||||||
if applicable.
|
|
||||||
"""
|
|
||||||
start_processing = skip_to is None
|
|
||||||
for model, interpretation in solutions:
|
|
||||||
# If skip_to is defined, then don't process models
|
|
||||||
# until then.
|
|
||||||
if not start_processing and model.name != skip_to:
|
|
||||||
continue
|
|
||||||
start_processing = True
|
|
||||||
|
|
||||||
# NOTE: Implication must be defined, the rest may not
|
|
||||||
impfunction = interpretation[Implication]
|
|
||||||
negation_defined = Negation in interpretation
|
|
||||||
yield (model, impfunction, negation_defined)
|
|
||||||
|
|
||||||
def has_vsp_plus_model(model, impfunction, negation_defined) -> Tuple[Optional[Model], VSP_Result]:
|
|
||||||
"""
|
|
||||||
Wrapper which also stores the models along with its vsp result
|
|
||||||
"""
|
|
||||||
vsp_result = has_vsp(model, impfunction, negation_defined)
|
|
||||||
# NOTE: Memory optimization - Don't return model if it doesn't have VSP
|
|
||||||
model = model if vsp_result.has_vsp else None
|
|
||||||
return (model, vsp_result)
|
|
||||||
|
|
||||||
def worker_vsp(task_queue: mp.Queue, result_queue: mp.Queue):
|
|
||||||
"""
|
|
||||||
Worker process which processes models from the task
|
|
||||||
queue and adds the result to the result_queue.
|
|
||||||
|
|
||||||
Adds the sentinal value None when exception occurs and when there's
|
|
||||||
no more to process.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
task = task_queue.get()
|
|
||||||
# If sentinal value, break
|
|
||||||
if task is None:
|
|
||||||
break
|
|
||||||
(model, impfunction, negation_defined) = task
|
|
||||||
result = has_vsp_plus_model(model, impfunction, negation_defined)
|
|
||||||
result_queue.put(result)
|
|
||||||
finally:
|
|
||||||
# Either an exception occured or the worker finished
|
|
||||||
# Push sentinal value
|
|
||||||
result_queue.put(None)
|
|
||||||
|
|
||||||
def worker_parser(data_file_path: str, num_sentinal_values: int, task_queue: mp.Queue, skip_to: Optional[str]):
|
|
||||||
"""
|
|
||||||
Function which parses the MaGIC file
|
|
||||||
and adds models to the task_queue.
|
|
||||||
|
|
||||||
Intended to be deployed with a dedicated process.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
data_file = open(data_file_path, "r")
|
|
||||||
solutions = parse_matrices(SourceFile(data_file))
|
|
||||||
solutions = restructure_solutions(solutions, skip_to)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
item = next(solutions)
|
|
||||||
task_queue.put(item)
|
|
||||||
except StopIteration:
|
|
||||||
break
|
|
||||||
finally:
|
|
||||||
data_file.close()
|
|
||||||
for _ in range(num_sentinal_values):
|
|
||||||
task_queue.put(None)
|
|
||||||
|
|
||||||
def multi_process_runner(num_cpu: int, data_file_path: str, skip_to: Optional[str]):
|
|
||||||
"""
|
|
||||||
Run VSPursuer in a multi-process configuration.
|
|
||||||
"""
|
|
||||||
assert num_cpu > 1
|
|
||||||
|
|
||||||
num_tested = 0
|
|
||||||
num_has_vsp = 0
|
|
||||||
num_workers = num_cpu - 1
|
|
||||||
|
|
||||||
# Create queues
|
|
||||||
task_queue = mp.Queue(maxsize=1000)
|
|
||||||
result_queue = mp.Queue()
|
|
||||||
|
|
||||||
# Create dedicated process to parse the MaGIC file
|
|
||||||
process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, skip_to))
|
|
||||||
process_parser.start()
|
|
||||||
|
|
||||||
# Create dedicated processes which check VSP
|
|
||||||
process_workers = []
|
|
||||||
for _ in range(num_workers):
|
|
||||||
p = mp.Process(target=worker_vsp, args=(task_queue, result_queue))
|
|
||||||
process_workers.append(p)
|
|
||||||
p.start()
|
|
||||||
|
|
||||||
|
|
||||||
# Check results and add new tasks until finished
|
|
||||||
result_sentinal_count = 0
|
|
||||||
while True:
|
|
||||||
# Read a result
|
|
||||||
try:
|
|
||||||
result = result_queue.get(True, 60)
|
|
||||||
except QueueEmpty:
|
|
||||||
if all((not p.is_alive() for p in process_workers)):
|
|
||||||
# All workers finished without us receiving all the
|
|
||||||
# sentinal values.
|
|
||||||
break
|
|
||||||
|
|
||||||
task_queue_size = 0
|
|
||||||
try:
|
|
||||||
task_queue_size = task_queue.qsize()
|
|
||||||
except NotImplementedError:
|
|
||||||
# MacOS doesn't implement this
|
|
||||||
pass
|
|
||||||
|
|
||||||
if task_queue_size == 0 and not process_parser.is_alive():
|
|
||||||
# For Linux/Windows this means that the process_parser
|
|
||||||
# died before sending the sentinal values.
|
|
||||||
# For Mac, this doesn't guarentee anything but might
|
|
||||||
# as well push more sentinal values.
|
|
||||||
for _ in range(num_workers):
|
|
||||||
task_queue.put(None)
|
|
||||||
|
|
||||||
# Don't do anymore work, wait again for a result
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
|
||||||
# When we receive None, it means a child process has finished
|
|
||||||
if result is None:
|
|
||||||
result_sentinal_count += 1
|
|
||||||
# If all workers have finished break
|
|
||||||
if result_sentinal_count == len(process_workers):
|
|
||||||
break
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Process result
|
|
||||||
model, vsp_result = result
|
|
||||||
print_with_timestamp(vsp_result)
|
|
||||||
num_tested += 1
|
|
||||||
|
|
||||||
if vsp_result.has_vsp:
|
|
||||||
print(model)
|
|
||||||
|
|
||||||
if vsp_result.has_vsp:
|
|
||||||
num_has_vsp += 1
|
|
||||||
|
|
||||||
print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
|
|
||||||
|
|
||||||
def single_process_runner(data_file_path: str, skip_to: Optional[str]):
|
|
||||||
num_tested = 0
|
|
||||||
num_has_vsp = 0
|
|
||||||
|
|
||||||
data_file = open(data_file_path, "r")
|
|
||||||
solutions = parse_matrices(SourceFile(data_file))
|
|
||||||
solutions = restructure_solutions(solutions, skip_to)
|
|
||||||
|
|
||||||
for model, impfunction, negation_defined in solutions:
|
|
||||||
model, vsp_result = has_vsp_plus_model(model, impfunction, negation_defined)
|
|
||||||
print_with_timestamp(vsp_result)
|
|
||||||
num_tested += 1
|
|
||||||
|
|
||||||
if vsp_result.has_vsp:
|
|
||||||
print(model)
|
|
||||||
|
|
||||||
if vsp_result.has_vsp:
|
|
||||||
num_has_vsp += 1
|
|
||||||
|
|
||||||
print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser(description="VSP Checker")
|
parser = argparse.ArgumentParser(description="VSP Checker")
|
||||||
parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices")
|
parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices")
|
||||||
parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file")
|
parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file")
|
||||||
parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: 1")
|
|
||||||
parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.")
|
|
||||||
args = vars(parser.parse_args())
|
args = vars(parser.parse_args())
|
||||||
|
|
||||||
data_file_path = args.get("i")
|
data_file_path = args.get("i")
|
||||||
if data_file_path is None:
|
if data_file_path is None:
|
||||||
data_file_path = input("Path to MaGIC Ugly Data File: ")
|
data_file_path = input("Path to MaGIC Ugly Data File: ")
|
||||||
|
|
||||||
num_cpu = args.get("c")
|
solutions = []
|
||||||
if num_cpu is None:
|
with open(data_file_path, "r") as data_file:
|
||||||
num_cpu = 1
|
solutions = parse_matrices(SourceFile(data_file))
|
||||||
|
print(f"Parsed {len(solutions)} matrices")
|
||||||
|
|
||||||
if num_cpu == 1:
|
num_has_vsp = 0
|
||||||
single_process_runner(data_file_path, args.get("skip_to"))
|
with multiprocessing.Pool(processes=max(cpu_count() - 2, 1)) as pool:
|
||||||
else:
|
results = [
|
||||||
multi_process_runner(num_cpu, data_file_path, args.get("skip_to"))
|
pool.apply_async(has_vsp, (model, interpretation,))
|
||||||
|
for model, interpretation in solutions
|
||||||
|
]
|
||||||
|
|
||||||
|
for i, result in enumerate(results):
|
||||||
|
vsp_result: VSP_Result = result.get()
|
||||||
|
print(vsp_result)
|
||||||
|
|
||||||
|
if args['verbose'] or vsp_result.has_vsp:
|
||||||
|
model = solutions[i][0]
|
||||||
|
print(model)
|
||||||
|
|
||||||
|
if vsp_result.has_vsp:
|
||||||
|
num_has_vsp += 1
|
||||||
|
|
||||||
|
print(f"Tested {len(solutions)} models, {num_has_vsp} of which satisfy VSP")
|
||||||
|
|
Loading…
Add table
Reference in a new issue