Compare commits

..

No commits in common. "6d7fc9094af0a136172fc52bc325580620385410" and "29526dbec38afbd3f5a19f9a7df1478ed96fd5c5" have entirely different histories.

10 changed files with 408 additions and 726 deletions

12
R.py
View file

@ -12,7 +12,7 @@ from logic import (
) )
from model import Model, ModelFunction, ModelValue, satisfiable from model import Model, ModelFunction, ModelValue, satisfiable
from generate_model import generate_model from generate_model import generate_model
# from vsp import has_vsp from vsp import has_vsp
# =================================================== # ===================================================
@ -106,7 +106,7 @@ designated_values = {a1}
logical_operations = { logical_operations = {
mnegation, mimplication, mconjunction, mdisjunction mnegation, mimplication, mconjunction, mdisjunction
} }
R_model_2 = Model(carrier_set, logical_operations, designated_values, None, "R2") R_model_2 = Model(carrier_set, logical_operations, designated_values, "R2")
interpretation = { interpretation = {
Negation: mnegation, Negation: mnegation,
@ -132,8 +132,8 @@ solutions = generate_model(R_logic, model_size, print_model=False)
print(f"Found {len(solutions)} satisfiable models") print(f"Found {len(solutions)} satisfiable models")
# for model, interpretation in solutions: for model, interpretation in solutions:
# print(has_vsp(model, interpretation)) print(has_vsp(model, interpretation))
print("*" * 30) print("*" * 30)
@ -301,7 +301,7 @@ mdisjunction = ModelFunction(2, {
logical_operations = { logical_operations = {
mnegation, mimplication, mconjunction, mdisjunction mnegation, mimplication, mconjunction, mdisjunction
} }
R_model_6 = Model(carrier_set, logical_operations, designated_values, None, "R6") R_model_6 = Model(carrier_set, logical_operations, designated_values, "R6")
interpretation = { interpretation = {
Negation: mnegation, Negation: mnegation,
@ -312,4 +312,4 @@ interpretation = {
print(R_model_6) print(R_model_6)
print(f"Model {R_model_6.name} satisfies logic {R_logic.name}?", satisfiable(R_logic, R_model_6, interpretation)) print(f"Model {R_model_6.name} satisfies logic {R_logic.name}?", satisfiable(R_logic, R_model_6, interpretation))
# print(has_vsp(R_model_6, interpretation)) print(has_vsp(R_model_6, interpretation))

View file

@ -27,6 +27,8 @@ class Operation:
class Term: class Term:
def __init__(self): def __init__(self):
pass pass
def __lt__(self, y):
return Inequation(self, y)
class PropositionalVariable(Term): class PropositionalVariable(Term):
def __init__(self, name): def __init__(self, name):
@ -68,6 +70,23 @@ Disjunction = Operation("", 2)
Implication = Operation("", 2) Implication = Operation("", 2)
Necessitation = Operation("!", 1) Necessitation = Operation("!", 1)
class Inequation:
def __init__(self, antecedant : Term, consequent: Term):
self.antecedant = antecedant
self.consequent = consequent
def __str__(self):
return str(self.antecedant) + "" + str(self.consequent)
class InequalityRule:
def __init__(self, premises : Set[Inequation], conclusion: Inequation):
self.premises = premises
self.conclusion = conclusion
def __str__(self):
str_premises = [str(p) for p in self.premises]
str_premises2 = "{" + ",".join(str_premises) + "}"
return str(str_premises2) + "=>" + str(self.conclusion)
class Rule: class Rule:
def __init__(self, premises : Set[Term], conclusion: Term): def __init__(self, premises : Set[Term], conclusion: Term):
self.premises = premises self.premises = premises

198
model.py
View file

@ -9,19 +9,16 @@ from logic import (
) )
from collections import defaultdict from collections import defaultdict
from functools import cached_property, lru_cache, reduce from functools import cached_property, lru_cache, reduce
from itertools import ( from itertools import chain, combinations_with_replacement, permutations, product
chain, combinations_with_replacement,
permutations, product
)
from typing import Dict, List, Optional, Set, Tuple from typing import Dict, List, Optional, Set, Tuple
__all__ = ['ModelValue', 'ModelFunction', 'Model', 'Interpretation'] __all__ = ['ModelValue', 'ModelFunction', 'Model', 'Interpretation']
class ModelValue: class ModelValue:
def __init__(self, name: str, hashed_value: Optional[int] = None): def __init__(self, name):
self.name = name self.name = name
self.hashed_value = hashed_value if hashed_value is not None else hash(self.name) self.hashed_value = hash(self.name)
self.__setattr__ = immutable self.__setattr__ = immutable
def __str__(self): def __str__(self):
return self.name return self.name
@ -30,7 +27,7 @@ class ModelValue:
def __eq__(self, other): def __eq__(self, other):
return isinstance(other, ModelValue) and self.name == other.name return isinstance(other, ModelValue) and self.name == other.name
def __deepcopy__(self, _): def __deepcopy__(self, _):
return ModelValue(self.name, self.hashed_value) return ModelValue(self.name)
class ModelFunction: class ModelFunction:
def __init__(self, arity: int, mapping, operation_name = ""): def __init__(self, arity: int, mapping, operation_name = ""):
@ -106,94 +103,18 @@ def binary_function_str(f: ModelFunction) -> str:
Interpretation = Dict[Operation, ModelFunction] Interpretation = Dict[Operation, ModelFunction]
class OrderTable:
def __init__(self):
# a : {x | x <= a }
self.le_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
# a : {x | x >= a}
self.ge_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
def add(self, x, y):
"""
Add x <= y
"""
self.le_map[y].add(x)
self.ge_map[x].add(y)
def is_lt(self, x, y):
return x in self.le_map[y]
def meet(self, x, y) -> Optional[ModelValue]:
X = self.le_map[x]
Y = self.le_map[y]
candidates = X.intersection(Y)
# Grab all elements greater than each of the candidates
candidate_ge_maps = (self.ge_map[candidate] for candidate in candidates)
common_ge_values = reduce(set.intersection, candidate_ge_maps)
# Intersect with candidates to get the values that satisfy
# the meet properties
result_set = candidates.intersection(common_ge_values)
# NOTE: The meet may not exist, in which case return None
result = next(iter(result_set), None)
return result
def join(self, x, y) -> Optional[ModelValue]:
X = self.ge_map[x]
Y = self.ge_map[y]
candidates = X.intersection(Y)
# Grab all elements smaller than each of the candidates
candidate_le_maps = (self.le_map[candidate] for candidate in candidates)
common_le_values = reduce(set.intersection, candidate_le_maps)
# Intersect with candidatse to get the values that satisfy
# the join properties
result_set = candidates.intersection(common_le_values)
# NOTE: The join may not exist, in which case return None
result = next(iter(result_set), None)
return result
def top(self) -> Optional[ModelValue]:
ge_maps = (self.ge_map[candidate] for candidate in self.ge_map)
result_set = reduce(set.intersection, ge_maps)
# Either not unique or does not exist
if len(result_set) != 1:
return None
return next(iter(result_set))
def bottom(self) -> Optional[ModelValue]:
le_maps = (self.le_map[candidate] for candidate in self.le_map)
result_set = reduce(set.intersection, le_maps)
# Either not unique or does not exist
if len(result_set) != 1:
return None
return next(iter(result_set))
class Model: class Model:
def __init__( def __init__(
self, self,
carrier_set: Set[ModelValue], carrier_set: Set[ModelValue],
logical_operations: Set[ModelFunction], logical_operations: Set[ModelFunction],
designated_values: Set[ModelValue], designated_values: Set[ModelValue],
ordering: Optional[OrderTable] = None,
name: Optional[str] = None name: Optional[str] = None
): ):
assert designated_values <= carrier_set assert designated_values <= carrier_set
self.carrier_set = carrier_set self.carrier_set = carrier_set
self.logical_operations = logical_operations self.logical_operations = logical_operations
self.designated_values = designated_values self.designated_values = designated_values
self.ordering = ordering
self.name = str(abs(hash(( self.name = str(abs(hash((
frozenset(carrier_set), frozenset(carrier_set),
frozenset(logical_operations), frozenset(logical_operations),
@ -294,47 +215,36 @@ def satisfiable(logic: Logic, model: Model, interpretation: Dict[Operation, Mode
return True return True
def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction], forbidden_element: Optional[ModelValue]) -> Set[ModelValue]: def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction], forbidden_element: Optional[ModelValue]) -> Set[ModelValue]:
""" """
Given an initial set of model values and a set of model functions, Given an initial set of model values and a set of model functions,
compute the complete set of model values that are closed compute the complete set of model values that are closed
under the operations. under the operations.
If the forbidden element is encountered, then we end the saturation procedure early. If top or bottom is encountered, then we end the saturation procedure early.
""" """
closure_set: Set[ModelValue] = initial_set closure_set: Set[ModelValue] = initial_set
last_new: Set[ModelValue] = initial_set last_new: Set[ModelValue] = initial_set
changed: bool = True changed: bool = True
forbidden_found = False forbidden_found = False
arities = set()
for mfun in mfunctions:
arities.add(mfun.arity)
while changed: while changed:
changed = False changed = False
new_elements: Set[ModelValue] = set() new_elements: Set[ModelValue] = set()
old_closure: Set[ModelValue] = closure_set - last_new old_closure: Set[ModelValue] = closure_set - last_new
# arity -> args # arity -> args
args_by_arity = defaultdict(list) cached_args = defaultdict(list)
# Motivation: We want to only compute arguments that we have not # Pass elements into each model function
# seen before
for arity in arities:
for num_new in range(1, arity + 1):
new_args = combinations_with_replacement(last_new, r=num_new)
old_args = combinations_with_replacement(old_closure, r=arity - num_new)
# Determine every possible ordering of the concatenated new and
# old model values.
for new_arg, old_arg in product(new_args, old_args):
for combined_args in permutations(new_arg + old_arg):
args_by_arity[arity].append(combined_args)
# Pass each argument into each model function
for mfun in mfunctions: for mfun in mfunctions:
for args in args_by_arity[mfun.arity]:
# If a previous function shared the same arity,
# we'll use the same set of computed arguments
# to pass into the model functions.
if mfun.arity in cached_args:
for args in cached_args[mfun.arity]:
# Compute the new elements # Compute the new elements
# given the cached arguments. # given the cached arguments.
element = mfun(*args) element = mfun(*args)
@ -350,6 +260,42 @@ def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction],
if forbidden_found: if forbidden_found:
break break
# We don't need to compute the arguments
# thanks to the cache, so move onto the
# next function.
continue
# At this point, we don't have cached arguments, so we need
# to compute this set.
# Each argument must have at least one new element to not repeat
# work. We'll range over the number of new model values within our
# argument.
for num_new in range(1, mfun.arity + 1):
new_args = combinations_with_replacement(last_new, r=num_new)
old_args = combinations_with_replacement(old_closure, r=mfun.arity - num_new)
# Determine every possible ordering of the concatenated
# new and old model values.
for new_arg, old_arg in product(new_args, old_args):
for args in permutations(new_arg + old_arg):
cached_args[mfun.arity].append(args)
element = mfun(*args)
if element not in closure_set:
new_elements.add(element)
# Optimization: Break out of computation
# early when forbidden element is found
if forbidden_element is not None and element == forbidden_element:
forbidden_found = True
break
if forbidden_found:
break
if forbidden_found:
break
closure_set.update(new_elements) closure_set.update(new_elements)
changed = len(new_elements) > 0 changed = len(new_elements) > 0
last_new = new_elements last_new = new_elements
@ -358,47 +304,3 @@ def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction],
break break
return closure_set return closure_set
def model_equivalence(model1: Model, model2: Model, ignore_constants: bool = False) -> bool:
"""
Takes two models and determines if they are equivalent.
Assumes for the model to be equilvalent that their
value names are equivalent as well.
"""
if model1.carrier_set != model2.carrier_set:
return False
if model1.designated_values != model2.designated_values:
return False
model1_fn_names = set()
for fn in model1.logical_operations:
if fn.arity == 0 and ignore_constants:
continue
model1_fn_names.add(fn.operation_name)
model2_fn_names = set()
for fn in model2.logical_operations:
if fn.arity == 0 and ignore_constants:
continue
model2_fn_names.add(fn.operation_name)
if model1_fn_names != model2_fn_names:
return False
for fn_name in model1_fn_names:
fn1 = next((fn for fn in model1.logical_operations if fn.operation_name == fn_name))
fn2 = next((fn for fn in model2.logical_operations if fn.operation_name == fn_name))
if fn1.arity != fn2.arity:
return False
# Check for functional equilvance
# That is for all inputs in the carrier set, the outputs are the same
for args in product(model1.carrier_set, repeat=fn1.arity):
if fn1(*args) != fn2(*args):
return False
return True

View file

@ -4,10 +4,9 @@ Parses the Magic Ugly Data File Format
Assumes the base logic is R with no extra connectives Assumes the base logic is R with no extra connectives
""" """
import re import re
from typing import TextIO, List, Iterator, Optional, Tuple, Set, Dict from typing import TextIO, List, Optional, Tuple, Set, Dict
from itertools import product
from model import Model, ModelValue, ModelFunction, OrderTable from model import Model, ModelValue, ModelFunction
from logic import ( from logic import (
Implication, Implication,
Conjunction, Conjunction,
@ -20,7 +19,7 @@ from logic import (
class SourceFile: class SourceFile:
def __init__(self, fileobj: TextIO): def __init__(self, fileobj: TextIO):
self.fileobj = fileobj self.fileobj = fileobj
self.line_in_file = 0 self.current_line = 0
self.reststr = "" self.reststr = ""
def next_line(self): def next_line(self):
@ -35,7 +34,7 @@ class SourceFile:
return reststr return reststr
contents = next(self.fileobj).strip() contents = next(self.fileobj).strip()
self.line_in_file += 1 self.current_line += 1
return contents return contents
def __next__(self): def __next__(self):
@ -44,9 +43,12 @@ class SourceFile:
""" """
if self.reststr == "": if self.reststr == "":
self.reststr = next(self.fileobj).strip() self.reststr = next(self.fileobj).strip()
self.line_in_file += 1 self.current_line += 1
tokens = self.reststr.split(" ")
next_token = tokens[0]
self.reststr = " ".join(tokens[1:])
next_token, _, self.reststr = self.reststr.partition(" ")
return next_token return next_token
class UglyHeader: class UglyHeader:
@ -61,9 +63,8 @@ class UglyHeader:
class ModelBuilder: class ModelBuilder:
def __init__(self): def __init__(self):
self.size : int = 0 self.size : int = 0
self.carrier_list : List[ModelValue] = [] self.carrier_set : Set[ModelValue] = set()
self.mnegation: Optional[ModelFunction] = None self.mnegation: Optional[ModelFunction] = None
self.ordering: Optional[OrderTable] = None
self.mconjunction: Optional[ModelFunction] = None self.mconjunction: Optional[ModelFunction] = None
self.mdisjunction: Optional[ModelFunction] = None self.mdisjunction: Optional[ModelFunction] = None
self.designated_values: Set[ModelValue] = set() self.designated_values: Set[ModelValue] = set()
@ -72,45 +73,6 @@ class ModelBuilder:
# Map symbol to model function # Map symbol to model function
self.custom_model_functions: Dict[str, ModelFunction] = {} self.custom_model_functions: Dict[str, ModelFunction] = {}
def build(self, model_name: str) -> Tuple[Model, Dict[Operation, ModelFunction]]:
"""Create Model"""
assert self.size > 0
assert self.size + 1 == len(self.carrier_list)
assert len(self.designated_values) <= len(self.carrier_list)
assert self.mimplication is not None
# Implication is required to be present
logical_operations = { self.mimplication }
interpretation = {
Implication: self.mimplication
}
# Other model functions and logical
# operations are optional
if self.mnegation is not None:
logical_operations.add(self.mnegation)
interpretation[Negation] = self.mnegation
if self.mconjunction is not None:
logical_operations.add(self.mconjunction)
interpretation[Conjunction] = self.mconjunction
if self.mdisjunction is not None:
logical_operations.add(self.mdisjunction)
interpretation[Disjunction] = self.mdisjunction
if self.mnecessitation is not None:
logical_operations.add(self.mnecessitation)
interpretation[Necessitation] = self.mnecessitation
# Custom model function definitions
for custom_mf in self.custom_model_functions.values():
if custom_mf is not None:
logical_operations.add(custom_mf)
op = Operation(custom_mf.operation_name, custom_mf.arity)
interpretation[op] = custom_mf
model = Model(set(self.carrier_list), logical_operations, self.designated_values, ordering=self.ordering, name=model_name)
return (model, interpretation)
class Stage: class Stage:
def __init__(self, name: str): def __init__(self, name: str):
self.name = name self.name = name
@ -137,6 +99,8 @@ class Stages:
def add(self, name: str): def add(self, name: str):
stage = Stage(name) stage = Stage(name)
stage.next = stage
stage.previous = self.last_added_stage stage.previous = self.last_added_stage
# End stage is a sink so don't # End stage is a sink so don't
@ -153,16 +117,11 @@ class Stages:
def reset_after(self, name): def reset_after(self, name):
""" """
Resets the counters of every stage after Resets the stage counters after a given stage.
a given stage. This is to accurately reflect the name of the
model within MaGIC.
This is to accurately reflect how names are
generated within magic.
Example: 1.1, 1.2, (reset after 1), 2.1, 2.2
""" """
stage = self.stages[name] stage = self.stages[name]
# NOTE: The process_model stage doesn't
# have a counter associated with it.
while stage.name != "process_model": while stage.name != "process_model":
stage.reset() stage.reset()
stage = stage.next stage = stage.next
@ -171,26 +130,15 @@ class Stages:
return self.stages[name] return self.stages[name]
def name(self): def name(self):
"""
Get the full name of where we are within
the parsing process. Takes into account
the stage number of all the stages seen
so far.
"""
# Stages haven't been added yet
result = "" result = ""
stage = self.first_stage stage = self.first_stage
if stage is None: if stage is None:
return "" return ""
# There's only one stage
result = f"{stage.num}" result = f"{stage.num}"
if stage.next == "process_model": if stage.next == "process_model":
return result return result
# Add every subsequent stage counter
# by appending .stage_num
stage = stage.next stage = stage.next
while stage is not None: while stage is not None:
result += f".{stage.num}" result += f".{stage.num}"
@ -221,19 +169,19 @@ def derive_stages(header: UglyHeader) -> Stages:
return stages return stages
def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation, ModelFunction]]]: def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]:
solutions = []
header = parse_header(infile) header = parse_header(infile)
stages = derive_stages(header) stages = derive_stages(header)
first_run = True first_run = True
current_model_parts = ModelBuilder() current_model_parts = ModelBuilder()
stage = stages.first_stage stage = stages.first_stage
while True: while True:
match stage.name: match stage.name:
case "end": case "end":
break break
case "process_model": case "process_model":
yield current_model_parts.build(stages.name()) process_model(stages.name(), current_model_parts, solutions)
stage = stage.next stage = stage.next
case "size": case "size":
processed = process_sizes(infile, current_model_parts, first_run) processed = process_sizes(infile, current_model_parts, first_run)
@ -299,25 +247,25 @@ def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation,
stages.reset_after(stage.name) stages.reset_after(stage.name)
stage = stage.previous stage = stage.previous
return solutions
def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool: def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool:
size: Optional[int] = None
try: try:
size = parse_size(infile, first_run) size = parse_size(infile, first_run)
except StopIteration: except StopIteration:
pass return False
if size is None: if size is None:
return False return False
carrier_list = carrier_list_from_size(size) carrier_set = carrier_set_from_size(size)
current_model_parts.carrier_list = carrier_list current_model_parts.carrier_set = carrier_set
current_model_parts.size = size current_model_parts.size = size
return True return True
def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
"""Stage 2 (Optional)""" """Stage 2 (Optional)"""
mnegation = parse_single_monadic_connective(infile, "¬", current_model_parts.size, current_model_parts.carrier_list) mnegation = parse_single_monadic_connective(infile, "¬", current_model_parts.size)
if mnegation is None: if mnegation is None:
return False return False
@ -326,12 +274,11 @@ def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) ->
def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
"""Stage 3""" """Stage 3"""
result = parse_single_order(infile, current_model_parts.size, current_model_parts.carrier_list) result = parse_single_order(infile, current_model_parts.size)
if result is None: if result is None:
return False return False
ordering, mconjunction, mdisjunction = result mconjunction, mdisjunction = result
current_model_parts.ordering = ordering
current_model_parts.mconjunction = mconjunction current_model_parts.mconjunction = mconjunction
current_model_parts.mdisjunction = mdisjunction current_model_parts.mdisjunction = mdisjunction
@ -339,7 +286,7 @@ def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> boo
def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
"""Stage 4""" """Stage 4"""
designated_values = parse_single_designated(infile, current_model_parts.size, current_model_parts.carrier_list) designated_values = parse_single_designated(infile, current_model_parts.size)
if designated_values is None: if designated_values is None:
return False return False
@ -348,7 +295,7 @@ def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -
def process_implications(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_implications(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
"""Stage 5""" """Stage 5"""
mimplication = parse_single_dyadic_connective(infile, "", current_model_parts.size, current_model_parts.carrier_list) mimplication = parse_single_dyadic_connective(infile, "", current_model_parts.size)
if mimplication is None: if mimplication is None:
return False return False
@ -356,7 +303,7 @@ def process_implications(infile: SourceFile, current_model_parts: ModelBuilder)
return True return True
def process_necessitations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_necessitations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
mnecessitation = parse_single_monadic_connective(infile, "!", current_model_parts.size, current_model_parts.carrier_list) mnecessitation = parse_single_monadic_connective(infile, "!", current_model_parts.size)
if mnecessitation is None: if mnecessitation is None:
return False return False
@ -367,9 +314,9 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
if adicity == 0: if adicity == 0:
mfunction = parse_single_nullary_connective(infile, symbol) mfunction = parse_single_nullary_connective(infile, symbol)
elif adicity == 1: elif adicity == 1:
mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list) mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size)
elif adicity == 2: elif adicity == 2:
mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list) mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size)
else: else:
raise NotImplementedError("Unable to process connectives of adicity greater than 2") raise NotImplementedError("Unable to process connectives of adicity greater than 2")
@ -379,6 +326,38 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
current_model_parts.custom_model_functions[symbol] = mfunction current_model_parts.custom_model_functions[symbol] = mfunction
return True return True
def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Model, Dict]]):
"""Create Model"""
assert mp.size > 0
assert mp.size + 1 == len(mp.carrier_set)
assert len(mp.designated_values) <= len(mp.carrier_set)
assert mp.mimplication is not None
logical_operations = { mp.mimplication }
model = Model(mp.carrier_set, logical_operations, mp.designated_values, name=model_name)
interpretation = {
Implication: mp.mimplication
}
if mp.mnegation is not None:
logical_operations.add(mp.mnegation)
interpretation[Negation] = mp.mnegation
if mp.mconjunction is not None:
logical_operations.add(mp.mconjunction)
interpretation[Conjunction] = mp.mconjunction
if mp.mdisjunction is not None:
logical_operations.add(mp.mdisjunction)
interpretation[Disjunction] = mp.mdisjunction
if mp.mnecessitation is not None:
logical_operations.add(mp.mnecessitation)
interpretation[Necessitation] = mp.mnecessitation
for custom_mf in mp.custom_model_functions.values():
if custom_mf is not None:
logical_operations.add(custom_mf)
op = Operation(custom_mf.operation_name, custom_mf.arity)
interpretation[op] = custom_mf
solutions.append((model, interpretation))
def parse_header(infile: SourceFile) -> UglyHeader: def parse_header(infile: SourceFile) -> UglyHeader:
""" """
@ -399,19 +378,20 @@ def parse_header(infile: SourceFile) -> UglyHeader:
custom_model_functions.append((arity, symbol)) custom_model_functions.append((arity, symbol))
return UglyHeader(negation_defined, necessitation_defined, custom_model_functions) return UglyHeader(negation_defined, necessitation_defined, custom_model_functions)
def carrier_list_from_size(size: int) -> List[ModelValue]: def carrier_set_from_size(size: int) -> Set[ModelValue]:
""" """
Construct a carrier set of model values Construct a carrier set of model values
based on the desired size. based on the desired size.
""" """
return [ return {
mvalue_from_index(i) for i in range(size + 1) mvalue_from_index(i) for i in range(size + 1)
] }
def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]: def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
""" """
Parse the line representing the matrix size. Parse the line representing the matrix size.
""" """
size = int(infile.next_line()) size = int(infile.next_line())
# HACK: When necessitation and custom connectives are enabled # HACK: When necessitation and custom connectives are enabled
@ -422,9 +402,7 @@ def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
if size == -1: if size == -1:
return None return None
assert size > 0, f"Unexpected size at line {infile.current_line}"
assert size > 0, f"Unexpected size at line {infile.line_in_file}"
return size return size
def mvalue_from_index(i: int) -> ModelValue: def mvalue_from_index(i: int) -> ModelValue:
@ -440,9 +418,55 @@ def parse_mvalue(x: str) -> ModelValue:
""" """
return mvalue_from_index(int(x)) return mvalue_from_index(int(x))
def determine_cresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue:
"""
Determine what a b should be given the ordering table.
"""
for i in range(size + 1):
c = mvalue_from_index(i)
def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[ if not ordering[(c, a)]:
Tuple[OrderTable, Optional[ModelFunction], Optional[ModelFunction]]]: continue
if not ordering[(c, b)]:
continue
invalid = False
for j in range(size + 1):
d = mvalue_from_index(j)
if c == d:
continue
if ordering[(c, d)]:
if ordering[(d, a)] and ordering [(d, b)]:
invalid = True
if not invalid:
return c
def determine_dresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue:
"""
Determine what a b should be given the ordering table.
"""
for i in range(size + 1):
c = mvalue_from_index(i)
if not ordering[(a, c)]:
continue
if not ordering[(b, c)]:
continue
invalid = False
for j in range(size + 1):
d = mvalue_from_index(j)
if d == c:
continue
if ordering[(d, c)]:
if ordering[(a, d)] and ordering[(b, d)]:
invalid = True
if not invalid:
return c
def parse_single_order(infile: SourceFile, size: int) -> Optional[Tuple[ModelFunction, ModelFunction]]:
""" """
Parse the line representing the ordering table Parse the line representing the ordering table
""" """
@ -452,38 +476,47 @@ def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelVa
table = line.split(" ") table = line.split(" ")
assert len(table) == (size + 1)**2, f"Order table doesn't match expected size at line {infile.line_in_file}" assert len(table) == (size + 1)**2, f"Order table doesn't match expected size at line {infile.current_line}"
ordering = OrderTable()
omapping = {} omapping = {}
table_i = 0 table_i = 0
for x, y in product(carrier_list, carrier_list): for i in range(size + 1):
x = mvalue_from_index(i)
for j in range(size + 1):
y = mvalue_from_index(j)
omapping[(x, y)] = table[table_i] == '1' omapping[(x, y)] = table[table_i] == '1'
if table[table_i] == '1':
ordering.add(x, y)
table_i += 1 table_i += 1
cmapping = {} cmapping = {}
dmapping = {} dmapping = {}
for x, y in product(carrier_list, carrier_list): for i in range(size + 1):
cresult = ordering.meet(x, y) x = mvalue_from_index(i)
for j in range(size + 1):
y = mvalue_from_index(j)
cresult = determine_cresult(size, omapping, x, y)
if cresult is None: if cresult is None:
return ordering, None, None print("[Warning] Conjunction and Disjunction are not well-defined")
print(f"{x}{y} = ??")
return None, None
cmapping[(x, y)] = cresult cmapping[(x, y)] = cresult
dresult = ordering.join(x, y) dresult = determine_dresult(size, omapping, x, y)
if dresult is None: if dresult is None:
return ordering, None, None print("[Warning] Conjunction and Disjunction are not well-defined")
print(f"{x} {y} = ??")
return None, None
dmapping[(x, y)] = dresult dmapping[(x, y)] = dresult
mconjunction = ModelFunction(2, cmapping, "") mconjunction = ModelFunction(2, cmapping, "")
mdisjunction = ModelFunction(2, dmapping, "") mdisjunction = ModelFunction(2, dmapping, "")
return ordering, mconjunction, mdisjunction return mconjunction, mdisjunction
def parse_single_designated(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[Set[ModelValue]]: def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[ModelValue]]:
""" """
Parse the line representing which model values are designated. Parse the line representing which model values are designated.
""" """
@ -492,12 +525,13 @@ def parse_single_designated(infile: SourceFile, size: int, carrier_list: List[Mo
return None return None
row = line.split(" ") row = line.split(" ")
assert len(row) == size + 1, f"Designated table doesn't match expected size at line {infile.line_in_file}" assert len(row) == size + 1, f"Designated table doesn't match expected size at line {infile.current_line}"
designated_values = set() designated_values = set()
for x, j in zip(carrier_list, row): for i, j in zip(range(size + 1), row):
if j == '1': if j == '1':
x = mvalue_from_index(i)
designated_values.add(x) designated_values.add(x)
return designated_values return designated_values
@ -509,28 +543,29 @@ def parse_single_nullary_connective(infile: SourceFile, symbol: str) -> Optional
return None return None
row = line.split(" ") row = line.split(" ")
assert len(row) == 1, f"More than one assignment for a nullary connective was provided at line {infile.line_in_file}" assert len(row) == 1, f"More than one assignment for a nullary connective was provided at line {infile.current_line}"
mapping = {} mapping = {}
mapping[()] = parse_mvalue(row[0]) mapping[()] = parse_mvalue(row[0])
return ModelFunction(0, mapping, symbol) return ModelFunction(0, mapping, symbol)
def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]: def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]:
line = infile.next_line() line = infile.next_line()
if line == '-1': if line == '-1':
return None return None
row = line.split(" ") row = line.split(" ")
assert len(row) == size + 1, f"{symbol} table doesn't match size at line {infile.line_in_file}" assert len(row) == size + 1, f"{symbol} table doesn't match size at line {infile.current_line}"
mapping = {} mapping = {}
for x, j in zip(carrier_list, row): for i, j in zip(range(size + 1), row):
x = mvalue_from_index(i)
y = parse_mvalue(j) y = parse_mvalue(j)
mapping[(x, )] = y mapping[(x, )] = y
return ModelFunction(1, mapping, symbol) return ModelFunction(1, mapping, symbol)
def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]: def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]:
first_token = next(infile) first_token = next(infile)
if first_token == "-1": if first_token == "-1":
return None return None
@ -539,14 +574,20 @@ def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int, c
try: try:
table = [first_token] + [next(infile) for _ in range((size + 1)**2 - 1)] table = [first_token] + [next(infile) for _ in range((size + 1)**2 - 1)]
except StopIteration: except StopIteration:
raise Exception(f"{symbol} table does not match expected size at line {infile.line_in_file}") pass
assert len(table) == (size + 1)**2, f"{symbol} table does not match expected size at line {infile.current_line}"
mapping = {} mapping = {}
table_i = 0 table_i = 0
for i in range(size + 1):
x = mvalue_from_index(i)
for j in range(size + 1):
y = mvalue_from_index(j)
for x, y in product(carrier_list, carrier_list):
r = parse_mvalue(table[table_i]) r = parse_mvalue(table[table_i])
table_i += 1 table_i += 1
mapping[(x, y)] = r mapping[(x, y)] = r
return ModelFunction(2, mapping, symbol) return ModelFunction(2, mapping, symbol)

View file

@ -1 +0,0 @@
This folder contains scripts that may be used during experimentation. These are intended to be used ad-hoc and we do not guarentee the maitainance of these scripts.

View file

@ -1,105 +0,0 @@
"""
Given two MaGIC ugly data files that correspond to
the same logic. Report any differences in the models
that exhibit VSP.
Overall process:
- Determine which models in file 1 have VSP
- Print if model does not exist in file 2
- For models in file 2 that were not already encountered for,
check if they have VSP.
- Print models that do
"""
import argparse
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import argparse
from model import model_equivalence
from parse_magic import SourceFile, parse_matrices
from vsp import has_vsp
from vspursuer import restructure_solutions
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Compare models that have VSP in two ugly files")
parser.add_argument("ugly1", type=str, help="First ugly data file")
parser.add_argument("ugly2", type=str, help="Second ugly data file")
parser.add_argument("--ignore-constants", action='store_true', help="When it comes to model equivalance, ignore constants")
args = vars(parser.parse_args())
data_file1 = open(args['ugly1'], "r")
solutions1 = parse_matrices(SourceFile(data_file1))
solutions1 = restructure_solutions(solutions1, None)
data_file2 = open(args['ugly2'], "r")
solutions2 = parse_matrices(SourceFile(data_file2))
solutions2 = list(restructure_solutions(solutions2, None))
ignore_constants = args.get("ignore_constants", False)
# Total count of models
total_models1 = 0
total_models2 = 0
# Models that exhibit VSP
good_models1 = 0
good_models2 = 0
# Models that don't exhibit VSP
bad_models1 = 0
bad_models2 = 0
# Models that exhibit VSP but does
# not exist in the other file.
extra_models1 = 0
extra_models2 = 0
for model, impfunction, negation_defined in solutions1:
total_models1 += 1
vsp_result = has_vsp(model, impfunction, negation_defined)
if vsp_result.has_vsp:
good_models1 += 1
# Check to see if model exists in file 2
match_found_index = (False, -1)
for i in range(len(solutions2) - 1, -1, -1):
if model_equivalence(model, solutions2[i][0], ignore_constants):
match_found_index = (True, i)
break
if match_found_index[0]:
# If so, remove the model from the second set
total_models2 += 1
good_models2 += 1
del solutions2[match_found_index[1]]
else:
extra_models1 += 1
print(f"VSP Model {model.name} not found in file 2.")
print(model)
else:
bad_models1 += 1
# Check through the remaining models in the second set
for model, impfunction, negation_defined in solutions2:
total_models2 += 1
vsp_result = has_vsp(model, impfunction, negation_defined)
if not vsp_result.has_vsp:
bad_models2 += 1
else:
print("VSP model", model.name, "does not appear in file 1")
good_models2 += 1
extra_models2 += 1
print("File 1 has a total of", total_models1, "models.")
print("Out of which,", good_models1, "exhibit VSP while", bad_models1, "do not.")
print("File 1 has a total of", extra_models1, "which exhibit VSP but do not appear in file 2.")
print("")
print("File 2 has a total of", total_models2, "models")
print("Out of which,", good_models2, "exhibit VSP while", bad_models2, "do not.")
print("File 2 has a total of", extra_models2, "which exhibit VSP but do not appear in file 1.")

View file

@ -1,54 +0,0 @@
"""
Given a model, create a Hasse diagram.
Note: This has a dependency on the hasse-diagram library
https://pypi.org/project/hasse-diagram/
"""
import argparse
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from model import Model
from parse_magic import SourceFile, parse_matrices
import numpy as np
import hassediagram
__all__ = ['plot_model_hassee']
def plot_model_hassee(model: Model):
assert model.ordering is not None
carrier_list = list(model.carrier_set)
hasse_ordering = []
for elem1 in carrier_list:
elem_ordering = []
for elem2 in carrier_list:
elem_ordering.append(
1 if model.ordering.is_lt(elem1, elem2) else 0
)
hasse_ordering.append(elem_ordering)
hassediagram.plot_hasse(np.array(hasse_ordering), carrier_list)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Show hassee diagram for model")
parser.add_argument("uglyfile", type=str, help="Path to ugly data file")
parser.add_argument("modelname", type=str, help="Name of model within file")
args = vars(parser.parse_args())
data_file = open(args['uglyfile'], "r")
solutions = parse_matrices(SourceFile(data_file))
requested_model = None
for model, _ in solutions:
if model.name == args['modelname']:
requested_model = model
break
if requested_model is None:
print("Model name", args['modelname'], "not found.")
sys.exit(0)
plot_model_hassee(requested_model)

View file

@ -1,30 +0,0 @@
"""
Print a model given it's name
and ugly data file
"""
import argparse
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from parse_magic import SourceFile, parse_matrices
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Show hassee diagram for model")
parser.add_argument("uglyfile", type=str, help="Path to ugly data file")
parser.add_argument("modelname", type=str, help="Name of model within file")
args = vars(parser.parse_args())
data_file = open(args['uglyfile'], "r")
solutions = parse_matrices(SourceFile(data_file))
model_found = False
for model, _ in solutions:
if model.name == args['modelname']:
model_found = True
print(model)
break
if not model_found:
print("Model", args['modelname'], "not found.")

181
vsp.py
View file

@ -2,12 +2,83 @@
Check to see if the model has the variable Check to see if the model has the variable
sharing property. sharing property.
""" """
from itertools import product from itertools import chain, combinations, product
from typing import List, Optional, Set, Tuple from typing import Dict, List, Optional, Set, Tuple
from common import set_to_str from common import set_to_str
from model import ( from model import (
Model, model_closure, ModelFunction, ModelValue Model, model_closure, ModelFunction, ModelValue
) )
from logic import Conjunction, Disjunction, Implication, Operation
def preseed(
initial_set: Set[ModelValue],
cache:List[Tuple[Set[ModelValue], Set[ModelValue]]]):
"""
Given a cache of previous model_closure calls,
use this to compute an initial model closure
set based on the initial set.
Basic Idea:
Let {1, 2, 3} -> X be in the cache.
If {1,2,3} is a subset of initial set,
then X is the subset of the output of model_closure.
This is used to speed up subsequent calls to model_closure
"""
candidate_preseed: Tuple[Set[ModelValue], int] = (None, None)
for i, o in cache:
if i < initial_set:
cost = len(initial_set - i)
# If i is a subset with less missing elements than
# the previous candidate, then it's the new candidate.
if candidate_preseed[1] is None or cost < candidate_preseed[1]:
candidate_preseed = o, cost
same_set = candidate_preseed[1] == 0
return candidate_preseed[0], same_set
def find_top(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]:
"""
Find the top of the order lattice.
T || a = T, T && a = a for all a in the carrier set
"""
if mconjunction is None or mdisjunction is None:
return None
for x in algebra:
is_top = True
for y in algebra:
if mdisjunction(x, y) != x or mconjunction(x, y) != y:
is_top = False
break
if is_top:
return x
print("[Warning] Failed to find the top of the lattice")
return None
def find_bottom(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]:
"""
Find the bottom of the order lattice
F || a = a, F && a = F for all a in the carrier set
"""
if mconjunction is None or mdisjunction is None:
return None
for x in algebra:
is_bottom = True
for y in algebra:
if mdisjunction(x, y) != y or mconjunction(x, y) != x:
is_bottom = False
break
if is_bottom:
return x
print("[Warning] Failed to find the bottom of the lattice")
return None
class VSP_Result: class VSP_Result:
def __init__( def __init__(
@ -27,90 +98,98 @@ Subalgebra 1: {set_to_str(self.subalgebra1)}
Subalgebra 2: {set_to_str(self.subalgebra2)} Subalgebra 2: {set_to_str(self.subalgebra2)}
""" """
def has_vsp(model: Model, impfunction: ModelFunction, def has_vsp(model: Model, interpretation: Dict[Operation, ModelFunction]) -> VSP_Result:
negation_defined: bool) -> VSP_Result:
""" """
Checks whether a model has the variable Checks whether a model has the variable
sharing property. sharing property.
""" """
impfunction = interpretation[Implication]
mconjunction = interpretation.get(Conjunction)
mdisjunction = interpretation.get(Disjunction)
top = find_top(model.carrier_set, mconjunction, mdisjunction)
bottom = find_bottom(model.carrier_set, mconjunction, mdisjunction)
# NOTE: No models with only one designated # NOTE: No models with only one designated
# value satisfies VSP # value satisfies VSP
if len(model.designated_values) == 1: if len(model.designated_values) == 1:
return VSP_Result(False, model.name) return VSP_Result(False, model.name)
assert model.ordering is not None, "Expected ordering table in model"
top = model.ordering.top()
bottom = model.ordering.bottom()
# Compute I the set of tuples (x, y) where # Compute I the set of tuples (x, y) where
# x -> y does not take a designiated value # x -> y does not take a designiated value
I: List[Tuple[ModelValue, ModelValue]] = [] I: Set[Tuple[ModelValue, ModelValue]] = set()
for (x, y) in product(model.carrier_set, model.carrier_set): for (x, y) in product(model.carrier_set, model.carrier_set):
if impfunction(x, y) not in model.designated_values: if impfunction(x, y) not in model.designated_values:
I.append((x, y)) I.add((x, y))
# Construct the powerset of I without the empty set
s = list(I)
I_power = chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1))
# ((x1, y1)), ((x1, y1), (x2, y2)), ...
# Closure cache
closure_cache: List[Tuple[Set[ModelValue], Set[ModelValue]]] = []
# Find the subalgebras which falsify implication # Find the subalgebras which falsify implication
for xys in I: for xys in I_power:
xi = xys[0] xs = {xy[0] for xy in xys}
orig_xs = xs
cached_xs = preseed(xs, closure_cache)
if cached_xs[0] is not None:
xs |= cached_xs[0]
# Discard ({⊥} A', B) subalgebras ys = {xy[1] for xy in xys}
if bottom is not None and xi == bottom: orig_ys = ys
cached_ys = preseed(ys, closure_cache)
if cached_ys[0] is not None:
ys |= cached_ys[0]
# NOTE: Optimziation before model_closure
# If the two subalgebras intersect, move
# onto the next pair
if len(xs & ys) > 0:
continue continue
# Discard ({} A', B) subalgebras when negation is defined # NOTE: Optimization
if top is not None and negation_defined and xi == top: # If the left subalgebra contains bottom
# or the right subalgebra contains top
# skip this pair
if top is not None and top in ys:
continue
if bottom is not None and bottom in xs:
continue continue
yi = xys[1] # Compute the closure of all operations
# with just the xs
carrier_set_left: Set[ModelValue] = model_closure(xs, model.logical_operations, bottom)
# Discard (A, {} B') subalgebras # Save to cache
if top is not None and yi == top: if cached_xs[0] is not None and not cached_ys[1]:
continue closure_cache.append((orig_xs, carrier_set_left))
# Discard (A, {⊥} B') subalgebras when negation is defined
if bottom is not None and negation_defined and yi == bottom:
continue
# Discard ({a} A', {b} B') subalgebras when a <= b
if model.ordering.is_lt(xi, yi):
continue
# Discard ({a} A', {b} B') subalgebras when b <= a and negation is defined
if negation_defined and model.ordering.is_lt(yi, xi):
continue
# Compute the left closure of the set containing xi under all the operations
carrier_set_left: Set[ModelValue] = model_closure({xi,}, model.logical_operations, bottom)
# Discard ({⊥} A', B) subalgebras
if bottom is not None and bottom in carrier_set_left: if bottom is not None and bottom in carrier_set_left:
continue continue
# Discard ({} A', B) subalgebras when negation is defined
if top is not None and negation_defined and top in carrier_set_left:
continue
# Compute the closure of all operations # Compute the closure of all operations
# with just the ys # with just the ys
carrier_set_right: Set[ModelValue] = model_closure({yi,}, model.logical_operations, top) carrier_set_right: Set[ModelValue] = model_closure(ys, model.logical_operations, top)
# Save to cache
if cached_ys[0] is not None and not cached_ys[1]:
closure_cache.append((orig_ys, carrier_set_right))
# Discard (A, {} B') subalgebras
if top is not None and top in carrier_set_right: if top is not None and top in carrier_set_right:
continue continue
# Discard (A, {⊥} B') subalgebras when negation is defined # If the carrier set intersects, then move on to the next
if bottom is not None and negation_defined and bottom in carrier_set_right: # subalgebra
if len(carrier_set_left & carrier_set_right) > 0:
continue continue
# Discard subalgebras that intersect # See if for all pairs in the subalgebras, that
if not carrier_set_left.isdisjoint(carrier_set_right): # implication is falsified
continue
# Check whether for all pairs in the subalgebra,
# that implication is falsified.
falsified = True falsified = True
for (x2, y2) in product(carrier_set_left, carrier_set_right): for (x2, y2) in product(carrier_set_left, carrier_set_right):
if impfunction(x2, y2) in model.designated_values: if impfunction(x2, y2) in model.designated_values:

View file

@ -1,214 +1,45 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from os import cpu_count
from datetime import datetime
from typing import Dict, Iterator, Optional, Tuple
from queue import Empty as QueueEmpty
import argparse import argparse
import multiprocessing as mp import multiprocessing
from logic import Negation, Implication, Operation from parse_magic import (
from model import Model, ModelFunction SourceFile,
from parse_magic import SourceFile, parse_matrices parse_matrices
)
from vsp import has_vsp, VSP_Result from vsp import has_vsp, VSP_Result
def print_with_timestamp(message):
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] {message}", flush=True)
def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], skip_to: Optional[str]) -> \
Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]:
"""
When subprocess gets spawned, the logical operations will
have a different memory address than what's expected in interpretation.
Therefore, we need to pass the model functions directly instead.
While we're at it, filter out models until we get to --skip-to
if applicable.
"""
start_processing = skip_to is None
for model, interpretation in solutions:
# If skip_to is defined, then don't process models
# until then.
if not start_processing and model.name != skip_to:
continue
start_processing = True
# NOTE: Implication must be defined, the rest may not
impfunction = interpretation[Implication]
negation_defined = Negation in interpretation
yield (model, impfunction, negation_defined)
def has_vsp_plus_model(model, impfunction, negation_defined) -> Tuple[Optional[Model], VSP_Result]:
"""
Wrapper which also stores the models along with its vsp result
"""
vsp_result = has_vsp(model, impfunction, negation_defined)
# NOTE: Memory optimization - Don't return model if it doesn't have VSP
model = model if vsp_result.has_vsp else None
return (model, vsp_result)
def worker_vsp(task_queue: mp.Queue, result_queue: mp.Queue):
"""
Worker process which processes models from the task
queue and adds the result to the result_queue.
Adds the sentinal value None when exception occurs and when there's
no more to process.
"""
try:
while True:
task = task_queue.get()
# If sentinal value, break
if task is None:
break
(model, impfunction, negation_defined) = task
result = has_vsp_plus_model(model, impfunction, negation_defined)
result_queue.put(result)
finally:
# Either an exception occured or the worker finished
# Push sentinal value
result_queue.put(None)
def worker_parser(data_file_path: str, num_sentinal_values: int, task_queue: mp.Queue, skip_to: Optional[str]):
"""
Function which parses the MaGIC file
and adds models to the task_queue.
Intended to be deployed with a dedicated process.
"""
try:
data_file = open(data_file_path, "r")
solutions = parse_matrices(SourceFile(data_file))
solutions = restructure_solutions(solutions, skip_to)
while True:
try:
item = next(solutions)
task_queue.put(item)
except StopIteration:
break
finally:
data_file.close()
for _ in range(num_sentinal_values):
task_queue.put(None)
def multi_process_runner(num_cpu: int, data_file_path: str, skip_to: Optional[str]):
"""
Run VSPursuer in a multi-process configuration.
"""
assert num_cpu > 1
num_tested = 0
num_has_vsp = 0
num_workers = num_cpu - 1
# Create queues
task_queue = mp.Queue(maxsize=1000)
result_queue = mp.Queue()
# Create dedicated process to parse the MaGIC file
process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, skip_to))
process_parser.start()
# Create dedicated processes which check VSP
process_workers = []
for _ in range(num_workers):
p = mp.Process(target=worker_vsp, args=(task_queue, result_queue))
process_workers.append(p)
p.start()
# Check results and add new tasks until finished
result_sentinal_count = 0
while True:
# Read a result
try:
result = result_queue.get(True, 60)
except QueueEmpty:
if all((not p.is_alive() for p in process_workers)):
# All workers finished without us receiving all the
# sentinal values.
break
task_queue_size = 0
try:
task_queue_size = task_queue.qsize()
except NotImplementedError:
# MacOS doesn't implement this
pass
if task_queue_size == 0 and not process_parser.is_alive():
# For Linux/Windows this means that the process_parser
# died before sending the sentinal values.
# For Mac, this doesn't guarentee anything but might
# as well push more sentinal values.
for _ in range(num_workers):
task_queue.put(None)
# Don't do anymore work, wait again for a result
continue
# When we receive None, it means a child process has finished
if result is None:
result_sentinal_count += 1
# If all workers have finished break
if result_sentinal_count == len(process_workers):
break
continue
# Process result
model, vsp_result = result
print_with_timestamp(vsp_result)
num_tested += 1
if vsp_result.has_vsp:
print(model)
if vsp_result.has_vsp:
num_has_vsp += 1
print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
def single_process_runner(data_file_path: str, skip_to: Optional[str]):
num_tested = 0
num_has_vsp = 0
data_file = open(data_file_path, "r")
solutions = parse_matrices(SourceFile(data_file))
solutions = restructure_solutions(solutions, skip_to)
for model, impfunction, negation_defined in solutions:
model, vsp_result = has_vsp_plus_model(model, impfunction, negation_defined)
print_with_timestamp(vsp_result)
num_tested += 1
if vsp_result.has_vsp:
print(model)
if vsp_result.has_vsp:
num_has_vsp += 1
print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VSP Checker") parser = argparse.ArgumentParser(description="VSP Checker")
parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices") parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices")
parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file") parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file")
parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: 1")
parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.")
args = vars(parser.parse_args()) args = vars(parser.parse_args())
data_file_path = args.get("i") data_file_path = args.get("i")
if data_file_path is None: if data_file_path is None:
data_file_path = input("Path to MaGIC Ugly Data File: ") data_file_path = input("Path to MaGIC Ugly Data File: ")
num_cpu = args.get("c") solutions = []
if num_cpu is None: with open(data_file_path, "r") as data_file:
num_cpu = 1 solutions = parse_matrices(SourceFile(data_file))
print(f"Parsed {len(solutions)} matrices")
if num_cpu == 1: num_has_vsp = 0
single_process_runner(data_file_path, args.get("skip_to")) with multiprocessing.Pool(processes=max(cpu_count() - 2, 1)) as pool:
else: results = [
multi_process_runner(num_cpu, data_file_path, args.get("skip_to")) pool.apply_async(has_vsp, (model, interpretation,))
for model, interpretation in solutions
]
for i, result in enumerate(results):
vsp_result: VSP_Result = result.get()
print(vsp_result)
if args['verbose'] or vsp_result.has_vsp:
model = solutions[i][0]
print(model)
if vsp_result.has_vsp:
num_has_vsp += 1
print(f"Tested {len(solutions)} models, {num_has_vsp} of which satisfy VSP")