Compare commits

...

37 commits

Author SHA1 Message Date
6d7fc9094a Add flag to ignore constants during model equivalence 2025-06-17 22:05:45 -04:00
7305b358a9 Added utility scripts 2025-05-26 17:38:48 -04:00
b16376e35c Added model equivalence check 2025-05-26 17:36:03 -04:00
94a01dd3da Fixed misusage of Model constructor 2025-05-26 17:35:05 -04:00
bd0d836204
(#40) Consider less subalgebras 2025-05-14 20:37:24 -04:00
7b652f36eb Transformed subalgebra generation from exponential to linear 2025-05-13 13:22:28 -04:00
214e9ba658
(#38) Redid parallel implementation
Queue-based parallel implementation with dedicated file reader node and dedicated VSP verifier nodes.

Additionally, cleaned up the other files.
2025-05-13 12:58:22 -04:00
ec451e007c Removed comment 2025-05-13 12:55:24 -04:00
4cccdc85b9 Flush print 2025-05-13 12:53:46 -04:00
6639344280 Stopped bug triggered during timeout 2025-05-04 21:07:20 -04:00
cd084812cc Removed a useless optimization and added one when negation is defined 2025-05-04 20:29:02 -04:00
01204a9551 Code cleanup 2025-05-03 16:42:15 -04:00
fa9e5026ca [Draft] Changing OrderTable (Currently non-functional) 2025-05-03 12:54:37 -04:00
6f5074584b Cleanup and small optimizations 2025-05-02 12:08:15 -04:00
de8637bca4 Dedicated process for file parsing 2025-04-09 13:00:08 -04:00
cb00a82c67 Attempt at keeping workers busy 2025-04-09 10:59:02 -04:00
806309b1ae Adding more work to task queue and more detailed healthcheck 2025-04-09 09:51:19 -04:00
502252676f Rewrote multiprocessing as the prior approach was unreliable 2025-04-08 21:10:28 -04:00
154cee0349 Switched to cpu_count 2025-03-19 09:31:25 -04:00
c554c53141 Removed dead code 2025-03-19 09:22:54 -04:00
4412b6c2da Redid parallel implementation
- Made parse_matrices into a generator
- Keep track of num_proccesses results and spawn new ones when done
2025-03-18 18:08:28 -04:00
b1452ac672
(#36) Optimize MaGIC Input Parsing 2025-03-18 15:50:56 -04:00
1a4857429f Use partition instead of split/join 2025-02-18 13:38:39 -05:00
2d8540f5c2
Discarding Order-Dependent Subalgebras (#14) 2025-02-09 11:29:57 -05:00
d431030b41 Added case for when negation is defined 2025-02-09 11:28:09 -05:00
db30799c4f
Removed unused inequality rules 2025-02-09 11:19:23 -05:00
b1a046b70b Removed unused inequality rules 2025-02-09 11:18:52 -05:00
d76a7fc35d
Implemented feature #31
Skip VSP check until a certain model name is found
2025-02-09 11:16:02 -05:00
cad3e85cd0 Implemented feature #31 2025-02-09 11:14:27 -05:00
54dc6e503c Configurable CPU usage 2025-02-09 11:05:40 -05:00
2d50cd9479
Implemented optimization #29
Discard subalgebras that don't have at least one designated value
2025-02-09 11:01:14 -05:00
f9d307969e Added small note 2025-02-09 11:00:39 -05:00
2ff9f8134c Implemented optimization #29 2025-02-09 10:57:52 -05:00
4b907281a5 Implementing optimization #14
Discard subalgebras which are order-dependent
2025-01-31 17:16:25 -05:00
9f80fb8bba
Fix Multiprocessing on Windows (#27) 2024-12-10 18:39:03 -05:00
70cd1cfa7f Small cleanup 2024-12-10 18:34:43 -05:00
b06dd8ee01 Don't rely on shared memory for logic operators 2024-12-10 17:17:14 -08:00
10 changed files with 724 additions and 406 deletions

12
R.py
View file

@ -12,7 +12,7 @@ from logic import (
) )
from model import Model, ModelFunction, ModelValue, satisfiable from model import Model, ModelFunction, ModelValue, satisfiable
from generate_model import generate_model from generate_model import generate_model
from vsp import has_vsp # from vsp import has_vsp
# =================================================== # ===================================================
@ -106,7 +106,7 @@ designated_values = {a1}
logical_operations = { logical_operations = {
mnegation, mimplication, mconjunction, mdisjunction mnegation, mimplication, mconjunction, mdisjunction
} }
R_model_2 = Model(carrier_set, logical_operations, designated_values, "R2") R_model_2 = Model(carrier_set, logical_operations, designated_values, None, "R2")
interpretation = { interpretation = {
Negation: mnegation, Negation: mnegation,
@ -132,8 +132,8 @@ solutions = generate_model(R_logic, model_size, print_model=False)
print(f"Found {len(solutions)} satisfiable models") print(f"Found {len(solutions)} satisfiable models")
for model, interpretation in solutions: # for model, interpretation in solutions:
print(has_vsp(model, interpretation)) # print(has_vsp(model, interpretation))
print("*" * 30) print("*" * 30)
@ -301,7 +301,7 @@ mdisjunction = ModelFunction(2, {
logical_operations = { logical_operations = {
mnegation, mimplication, mconjunction, mdisjunction mnegation, mimplication, mconjunction, mdisjunction
} }
R_model_6 = Model(carrier_set, logical_operations, designated_values, "R6") R_model_6 = Model(carrier_set, logical_operations, designated_values, None, "R6")
interpretation = { interpretation = {
Negation: mnegation, Negation: mnegation,
@ -312,4 +312,4 @@ interpretation = {
print(R_model_6) print(R_model_6)
print(f"Model {R_model_6.name} satisfies logic {R_logic.name}?", satisfiable(R_logic, R_model_6, interpretation)) print(f"Model {R_model_6.name} satisfies logic {R_logic.name}?", satisfiable(R_logic, R_model_6, interpretation))
print(has_vsp(R_model_6, interpretation)) # print(has_vsp(R_model_6, interpretation))

View file

@ -27,8 +27,6 @@ class Operation:
class Term: class Term:
def __init__(self): def __init__(self):
pass pass
def __lt__(self, y):
return Inequation(self, y)
class PropositionalVariable(Term): class PropositionalVariable(Term):
def __init__(self, name): def __init__(self, name):
@ -70,23 +68,6 @@ Disjunction = Operation("", 2)
Implication = Operation("", 2) Implication = Operation("", 2)
Necessitation = Operation("!", 1) Necessitation = Operation("!", 1)
class Inequation:
def __init__(self, antecedant : Term, consequent: Term):
self.antecedant = antecedant
self.consequent = consequent
def __str__(self):
return str(self.antecedant) + "" + str(self.consequent)
class InequalityRule:
def __init__(self, premises : Set[Inequation], conclusion: Inequation):
self.premises = premises
self.conclusion = conclusion
def __str__(self):
str_premises = [str(p) for p in self.premises]
str_premises2 = "{" + ",".join(str_premises) + "}"
return str(str_premises2) + "=>" + str(self.conclusion)
class Rule: class Rule:
def __init__(self, premises : Set[Term], conclusion: Term): def __init__(self, premises : Set[Term], conclusion: Term):
self.premises = premises self.premises = premises

198
model.py
View file

@ -9,16 +9,19 @@ from logic import (
) )
from collections import defaultdict from collections import defaultdict
from functools import cached_property, lru_cache, reduce from functools import cached_property, lru_cache, reduce
from itertools import chain, combinations_with_replacement, permutations, product from itertools import (
chain, combinations_with_replacement,
permutations, product
)
from typing import Dict, List, Optional, Set, Tuple from typing import Dict, List, Optional, Set, Tuple
__all__ = ['ModelValue', 'ModelFunction', 'Model', 'Interpretation'] __all__ = ['ModelValue', 'ModelFunction', 'Model', 'Interpretation']
class ModelValue: class ModelValue:
def __init__(self, name): def __init__(self, name: str, hashed_value: Optional[int] = None):
self.name = name self.name = name
self.hashed_value = hash(self.name) self.hashed_value = hashed_value if hashed_value is not None else hash(self.name)
self.__setattr__ = immutable self.__setattr__ = immutable
def __str__(self): def __str__(self):
return self.name return self.name
@ -27,7 +30,7 @@ class ModelValue:
def __eq__(self, other): def __eq__(self, other):
return isinstance(other, ModelValue) and self.name == other.name return isinstance(other, ModelValue) and self.name == other.name
def __deepcopy__(self, _): def __deepcopy__(self, _):
return ModelValue(self.name) return ModelValue(self.name, self.hashed_value)
class ModelFunction: class ModelFunction:
def __init__(self, arity: int, mapping, operation_name = ""): def __init__(self, arity: int, mapping, operation_name = ""):
@ -103,18 +106,94 @@ def binary_function_str(f: ModelFunction) -> str:
Interpretation = Dict[Operation, ModelFunction] Interpretation = Dict[Operation, ModelFunction]
class OrderTable:
def __init__(self):
# a : {x | x <= a }
self.le_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
# a : {x | x >= a}
self.ge_map: Dict[ModelValue, Set[ModelValue]] = defaultdict(set)
def add(self, x, y):
"""
Add x <= y
"""
self.le_map[y].add(x)
self.ge_map[x].add(y)
def is_lt(self, x, y):
return x in self.le_map[y]
def meet(self, x, y) -> Optional[ModelValue]:
X = self.le_map[x]
Y = self.le_map[y]
candidates = X.intersection(Y)
# Grab all elements greater than each of the candidates
candidate_ge_maps = (self.ge_map[candidate] for candidate in candidates)
common_ge_values = reduce(set.intersection, candidate_ge_maps)
# Intersect with candidates to get the values that satisfy
# the meet properties
result_set = candidates.intersection(common_ge_values)
# NOTE: The meet may not exist, in which case return None
result = next(iter(result_set), None)
return result
def join(self, x, y) -> Optional[ModelValue]:
X = self.ge_map[x]
Y = self.ge_map[y]
candidates = X.intersection(Y)
# Grab all elements smaller than each of the candidates
candidate_le_maps = (self.le_map[candidate] for candidate in candidates)
common_le_values = reduce(set.intersection, candidate_le_maps)
# Intersect with candidatse to get the values that satisfy
# the join properties
result_set = candidates.intersection(common_le_values)
# NOTE: The join may not exist, in which case return None
result = next(iter(result_set), None)
return result
def top(self) -> Optional[ModelValue]:
ge_maps = (self.ge_map[candidate] for candidate in self.ge_map)
result_set = reduce(set.intersection, ge_maps)
# Either not unique or does not exist
if len(result_set) != 1:
return None
return next(iter(result_set))
def bottom(self) -> Optional[ModelValue]:
le_maps = (self.le_map[candidate] for candidate in self.le_map)
result_set = reduce(set.intersection, le_maps)
# Either not unique or does not exist
if len(result_set) != 1:
return None
return next(iter(result_set))
class Model: class Model:
def __init__( def __init__(
self, self,
carrier_set: Set[ModelValue], carrier_set: Set[ModelValue],
logical_operations: Set[ModelFunction], logical_operations: Set[ModelFunction],
designated_values: Set[ModelValue], designated_values: Set[ModelValue],
ordering: Optional[OrderTable] = None,
name: Optional[str] = None name: Optional[str] = None
): ):
assert designated_values <= carrier_set assert designated_values <= carrier_set
self.carrier_set = carrier_set self.carrier_set = carrier_set
self.logical_operations = logical_operations self.logical_operations = logical_operations
self.designated_values = designated_values self.designated_values = designated_values
self.ordering = ordering
self.name = str(abs(hash(( self.name = str(abs(hash((
frozenset(carrier_set), frozenset(carrier_set),
frozenset(logical_operations), frozenset(logical_operations),
@ -215,36 +294,47 @@ def satisfiable(logic: Logic, model: Model, interpretation: Dict[Operation, Mode
return True return True
def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction], forbidden_element: Optional[ModelValue]) -> Set[ModelValue]: def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction], forbidden_element: Optional[ModelValue]) -> Set[ModelValue]:
""" """
Given an initial set of model values and a set of model functions, Given an initial set of model values and a set of model functions,
compute the complete set of model values that are closed compute the complete set of model values that are closed
under the operations. under the operations.
If top or bottom is encountered, then we end the saturation procedure early. If the forbidden element is encountered, then we end the saturation procedure early.
""" """
closure_set: Set[ModelValue] = initial_set closure_set: Set[ModelValue] = initial_set
last_new: Set[ModelValue] = initial_set last_new: Set[ModelValue] = initial_set
changed: bool = True changed: bool = True
forbidden_found = False forbidden_found = False
arities = set()
for mfun in mfunctions:
arities.add(mfun.arity)
while changed: while changed:
changed = False changed = False
new_elements: Set[ModelValue] = set() new_elements: Set[ModelValue] = set()
old_closure: Set[ModelValue] = closure_set - last_new old_closure: Set[ModelValue] = closure_set - last_new
# arity -> args # arity -> args
cached_args = defaultdict(list) args_by_arity = defaultdict(list)
# Pass elements into each model function # Motivation: We want to only compute arguments that we have not
# seen before
for arity in arities:
for num_new in range(1, arity + 1):
new_args = combinations_with_replacement(last_new, r=num_new)
old_args = combinations_with_replacement(old_closure, r=arity - num_new)
# Determine every possible ordering of the concatenated new and
# old model values.
for new_arg, old_arg in product(new_args, old_args):
for combined_args in permutations(new_arg + old_arg):
args_by_arity[arity].append(combined_args)
# Pass each argument into each model function
for mfun in mfunctions: for mfun in mfunctions:
for args in args_by_arity[mfun.arity]:
# If a previous function shared the same arity,
# we'll use the same set of computed arguments
# to pass into the model functions.
if mfun.arity in cached_args:
for args in cached_args[mfun.arity]:
# Compute the new elements # Compute the new elements
# given the cached arguments. # given the cached arguments.
element = mfun(*args) element = mfun(*args)
@ -260,42 +350,6 @@ def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction],
if forbidden_found: if forbidden_found:
break break
# We don't need to compute the arguments
# thanks to the cache, so move onto the
# next function.
continue
# At this point, we don't have cached arguments, so we need
# to compute this set.
# Each argument must have at least one new element to not repeat
# work. We'll range over the number of new model values within our
# argument.
for num_new in range(1, mfun.arity + 1):
new_args = combinations_with_replacement(last_new, r=num_new)
old_args = combinations_with_replacement(old_closure, r=mfun.arity - num_new)
# Determine every possible ordering of the concatenated
# new and old model values.
for new_arg, old_arg in product(new_args, old_args):
for args in permutations(new_arg + old_arg):
cached_args[mfun.arity].append(args)
element = mfun(*args)
if element not in closure_set:
new_elements.add(element)
# Optimization: Break out of computation
# early when forbidden element is found
if forbidden_element is not None and element == forbidden_element:
forbidden_found = True
break
if forbidden_found:
break
if forbidden_found:
break
closure_set.update(new_elements) closure_set.update(new_elements)
changed = len(new_elements) > 0 changed = len(new_elements) > 0
last_new = new_elements last_new = new_elements
@ -304,3 +358,47 @@ def model_closure(initial_set: Set[ModelValue], mfunctions: Set[ModelFunction],
break break
return closure_set return closure_set
def model_equivalence(model1: Model, model2: Model, ignore_constants: bool = False) -> bool:
"""
Takes two models and determines if they are equivalent.
Assumes for the model to be equilvalent that their
value names are equivalent as well.
"""
if model1.carrier_set != model2.carrier_set:
return False
if model1.designated_values != model2.designated_values:
return False
model1_fn_names = set()
for fn in model1.logical_operations:
if fn.arity == 0 and ignore_constants:
continue
model1_fn_names.add(fn.operation_name)
model2_fn_names = set()
for fn in model2.logical_operations:
if fn.arity == 0 and ignore_constants:
continue
model2_fn_names.add(fn.operation_name)
if model1_fn_names != model2_fn_names:
return False
for fn_name in model1_fn_names:
fn1 = next((fn for fn in model1.logical_operations if fn.operation_name == fn_name))
fn2 = next((fn for fn in model2.logical_operations if fn.operation_name == fn_name))
if fn1.arity != fn2.arity:
return False
# Check for functional equilvance
# That is for all inputs in the carrier set, the outputs are the same
for args in product(model1.carrier_set, repeat=fn1.arity):
if fn1(*args) != fn2(*args):
return False
return True

View file

@ -4,9 +4,10 @@ Parses the Magic Ugly Data File Format
Assumes the base logic is R with no extra connectives Assumes the base logic is R with no extra connectives
""" """
import re import re
from typing import TextIO, List, Optional, Tuple, Set, Dict from typing import TextIO, List, Iterator, Optional, Tuple, Set, Dict
from itertools import product
from model import Model, ModelValue, ModelFunction from model import Model, ModelValue, ModelFunction, OrderTable
from logic import ( from logic import (
Implication, Implication,
Conjunction, Conjunction,
@ -19,7 +20,7 @@ from logic import (
class SourceFile: class SourceFile:
def __init__(self, fileobj: TextIO): def __init__(self, fileobj: TextIO):
self.fileobj = fileobj self.fileobj = fileobj
self.current_line = 0 self.line_in_file = 0
self.reststr = "" self.reststr = ""
def next_line(self): def next_line(self):
@ -34,7 +35,7 @@ class SourceFile:
return reststr return reststr
contents = next(self.fileobj).strip() contents = next(self.fileobj).strip()
self.current_line += 1 self.line_in_file += 1
return contents return contents
def __next__(self): def __next__(self):
@ -43,12 +44,9 @@ class SourceFile:
""" """
if self.reststr == "": if self.reststr == "":
self.reststr = next(self.fileobj).strip() self.reststr = next(self.fileobj).strip()
self.current_line += 1 self.line_in_file += 1
tokens = self.reststr.split(" ")
next_token = tokens[0]
self.reststr = " ".join(tokens[1:])
next_token, _, self.reststr = self.reststr.partition(" ")
return next_token return next_token
class UglyHeader: class UglyHeader:
@ -63,8 +61,9 @@ class UglyHeader:
class ModelBuilder: class ModelBuilder:
def __init__(self): def __init__(self):
self.size : int = 0 self.size : int = 0
self.carrier_set : Set[ModelValue] = set() self.carrier_list : List[ModelValue] = []
self.mnegation: Optional[ModelFunction] = None self.mnegation: Optional[ModelFunction] = None
self.ordering: Optional[OrderTable] = None
self.mconjunction: Optional[ModelFunction] = None self.mconjunction: Optional[ModelFunction] = None
self.mdisjunction: Optional[ModelFunction] = None self.mdisjunction: Optional[ModelFunction] = None
self.designated_values: Set[ModelValue] = set() self.designated_values: Set[ModelValue] = set()
@ -73,6 +72,45 @@ class ModelBuilder:
# Map symbol to model function # Map symbol to model function
self.custom_model_functions: Dict[str, ModelFunction] = {} self.custom_model_functions: Dict[str, ModelFunction] = {}
def build(self, model_name: str) -> Tuple[Model, Dict[Operation, ModelFunction]]:
"""Create Model"""
assert self.size > 0
assert self.size + 1 == len(self.carrier_list)
assert len(self.designated_values) <= len(self.carrier_list)
assert self.mimplication is not None
# Implication is required to be present
logical_operations = { self.mimplication }
interpretation = {
Implication: self.mimplication
}
# Other model functions and logical
# operations are optional
if self.mnegation is not None:
logical_operations.add(self.mnegation)
interpretation[Negation] = self.mnegation
if self.mconjunction is not None:
logical_operations.add(self.mconjunction)
interpretation[Conjunction] = self.mconjunction
if self.mdisjunction is not None:
logical_operations.add(self.mdisjunction)
interpretation[Disjunction] = self.mdisjunction
if self.mnecessitation is not None:
logical_operations.add(self.mnecessitation)
interpretation[Necessitation] = self.mnecessitation
# Custom model function definitions
for custom_mf in self.custom_model_functions.values():
if custom_mf is not None:
logical_operations.add(custom_mf)
op = Operation(custom_mf.operation_name, custom_mf.arity)
interpretation[op] = custom_mf
model = Model(set(self.carrier_list), logical_operations, self.designated_values, ordering=self.ordering, name=model_name)
return (model, interpretation)
class Stage: class Stage:
def __init__(self, name: str): def __init__(self, name: str):
self.name = name self.name = name
@ -99,8 +137,6 @@ class Stages:
def add(self, name: str): def add(self, name: str):
stage = Stage(name) stage = Stage(name)
stage.next = stage
stage.previous = self.last_added_stage stage.previous = self.last_added_stage
# End stage is a sink so don't # End stage is a sink so don't
@ -117,11 +153,16 @@ class Stages:
def reset_after(self, name): def reset_after(self, name):
""" """
Resets the stage counters after a given stage. Resets the counters of every stage after
This is to accurately reflect the name of the a given stage.
model within MaGIC.
This is to accurately reflect how names are
generated within magic.
Example: 1.1, 1.2, (reset after 1), 2.1, 2.2
""" """
stage = self.stages[name] stage = self.stages[name]
# NOTE: The process_model stage doesn't
# have a counter associated with it.
while stage.name != "process_model": while stage.name != "process_model":
stage.reset() stage.reset()
stage = stage.next stage = stage.next
@ -130,15 +171,26 @@ class Stages:
return self.stages[name] return self.stages[name]
def name(self): def name(self):
"""
Get the full name of where we are within
the parsing process. Takes into account
the stage number of all the stages seen
so far.
"""
# Stages haven't been added yet
result = "" result = ""
stage = self.first_stage stage = self.first_stage
if stage is None: if stage is None:
return "" return ""
# There's only one stage
result = f"{stage.num}" result = f"{stage.num}"
if stage.next == "process_model": if stage.next == "process_model":
return result return result
# Add every subsequent stage counter
# by appending .stage_num
stage = stage.next stage = stage.next
while stage is not None: while stage is not None:
result += f".{stage.num}" result += f".{stage.num}"
@ -169,19 +221,19 @@ def derive_stages(header: UglyHeader) -> Stages:
return stages return stages
def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]: def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation, ModelFunction]]]:
solutions = []
header = parse_header(infile) header = parse_header(infile)
stages = derive_stages(header) stages = derive_stages(header)
first_run = True first_run = True
current_model_parts = ModelBuilder() current_model_parts = ModelBuilder()
stage = stages.first_stage stage = stages.first_stage
while True: while True:
match stage.name: match stage.name:
case "end": case "end":
break break
case "process_model": case "process_model":
process_model(stages.name(), current_model_parts, solutions) yield current_model_parts.build(stages.name())
stage = stage.next stage = stage.next
case "size": case "size":
processed = process_sizes(infile, current_model_parts, first_run) processed = process_sizes(infile, current_model_parts, first_run)
@ -247,25 +299,25 @@ def parse_matrices(infile: SourceFile) -> List[Tuple[Model, Dict]]:
stages.reset_after(stage.name) stages.reset_after(stage.name)
stage = stage.previous stage = stage.previous
return solutions
def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool: def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool:
size: Optional[int] = None
try: try:
size = parse_size(infile, first_run) size = parse_size(infile, first_run)
except StopIteration: except StopIteration:
return False pass
if size is None: if size is None:
return False return False
carrier_set = carrier_set_from_size(size) carrier_list = carrier_list_from_size(size)
current_model_parts.carrier_set = carrier_set current_model_parts.carrier_list = carrier_list
current_model_parts.size = size current_model_parts.size = size
return True return True
def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
"""Stage 2 (Optional)""" """Stage 2 (Optional)"""
mnegation = parse_single_monadic_connective(infile, "¬", current_model_parts.size) mnegation = parse_single_monadic_connective(infile, "¬", current_model_parts.size, current_model_parts.carrier_list)
if mnegation is None: if mnegation is None:
return False return False
@ -274,11 +326,12 @@ def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) ->
def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
"""Stage 3""" """Stage 3"""
result = parse_single_order(infile, current_model_parts.size) result = parse_single_order(infile, current_model_parts.size, current_model_parts.carrier_list)
if result is None: if result is None:
return False return False
mconjunction, mdisjunction = result ordering, mconjunction, mdisjunction = result
current_model_parts.ordering = ordering
current_model_parts.mconjunction = mconjunction current_model_parts.mconjunction = mconjunction
current_model_parts.mdisjunction = mdisjunction current_model_parts.mdisjunction = mdisjunction
@ -286,7 +339,7 @@ def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> boo
def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
"""Stage 4""" """Stage 4"""
designated_values = parse_single_designated(infile, current_model_parts.size) designated_values = parse_single_designated(infile, current_model_parts.size, current_model_parts.carrier_list)
if designated_values is None: if designated_values is None:
return False return False
@ -295,7 +348,7 @@ def process_designateds(infile: SourceFile, current_model_parts: ModelBuilder) -
def process_implications(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_implications(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
"""Stage 5""" """Stage 5"""
mimplication = parse_single_dyadic_connective(infile, "", current_model_parts.size) mimplication = parse_single_dyadic_connective(infile, "", current_model_parts.size, current_model_parts.carrier_list)
if mimplication is None: if mimplication is None:
return False return False
@ -303,7 +356,7 @@ def process_implications(infile: SourceFile, current_model_parts: ModelBuilder)
return True return True
def process_necessitations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool: def process_necessitations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
mnecessitation = parse_single_monadic_connective(infile, "!", current_model_parts.size) mnecessitation = parse_single_monadic_connective(infile, "!", current_model_parts.size, current_model_parts.carrier_list)
if mnecessitation is None: if mnecessitation is None:
return False return False
@ -314,9 +367,9 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
if adicity == 0: if adicity == 0:
mfunction = parse_single_nullary_connective(infile, symbol) mfunction = parse_single_nullary_connective(infile, symbol)
elif adicity == 1: elif adicity == 1:
mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size) mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list)
elif adicity == 2: elif adicity == 2:
mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size) mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list)
else: else:
raise NotImplementedError("Unable to process connectives of adicity greater than 2") raise NotImplementedError("Unable to process connectives of adicity greater than 2")
@ -326,38 +379,6 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
current_model_parts.custom_model_functions[symbol] = mfunction current_model_parts.custom_model_functions[symbol] = mfunction
return True return True
def process_model(model_name: str, mp: ModelBuilder, solutions: List[Tuple[Model, Dict]]):
"""Create Model"""
assert mp.size > 0
assert mp.size + 1 == len(mp.carrier_set)
assert len(mp.designated_values) <= len(mp.carrier_set)
assert mp.mimplication is not None
logical_operations = { mp.mimplication }
model = Model(mp.carrier_set, logical_operations, mp.designated_values, name=model_name)
interpretation = {
Implication: mp.mimplication
}
if mp.mnegation is not None:
logical_operations.add(mp.mnegation)
interpretation[Negation] = mp.mnegation
if mp.mconjunction is not None:
logical_operations.add(mp.mconjunction)
interpretation[Conjunction] = mp.mconjunction
if mp.mdisjunction is not None:
logical_operations.add(mp.mdisjunction)
interpretation[Disjunction] = mp.mdisjunction
if mp.mnecessitation is not None:
logical_operations.add(mp.mnecessitation)
interpretation[Necessitation] = mp.mnecessitation
for custom_mf in mp.custom_model_functions.values():
if custom_mf is not None:
logical_operations.add(custom_mf)
op = Operation(custom_mf.operation_name, custom_mf.arity)
interpretation[op] = custom_mf
solutions.append((model, interpretation))
def parse_header(infile: SourceFile) -> UglyHeader: def parse_header(infile: SourceFile) -> UglyHeader:
""" """
@ -378,20 +399,19 @@ def parse_header(infile: SourceFile) -> UglyHeader:
custom_model_functions.append((arity, symbol)) custom_model_functions.append((arity, symbol))
return UglyHeader(negation_defined, necessitation_defined, custom_model_functions) return UglyHeader(negation_defined, necessitation_defined, custom_model_functions)
def carrier_set_from_size(size: int) -> Set[ModelValue]: def carrier_list_from_size(size: int) -> List[ModelValue]:
""" """
Construct a carrier set of model values Construct a carrier set of model values
based on the desired size. based on the desired size.
""" """
return { return [
mvalue_from_index(i) for i in range(size + 1) mvalue_from_index(i) for i in range(size + 1)
} ]
def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]: def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
""" """
Parse the line representing the matrix size. Parse the line representing the matrix size.
""" """
size = int(infile.next_line()) size = int(infile.next_line())
# HACK: When necessitation and custom connectives are enabled # HACK: When necessitation and custom connectives are enabled
@ -402,7 +422,9 @@ def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
if size == -1: if size == -1:
return None return None
assert size > 0, f"Unexpected size at line {infile.current_line}"
assert size > 0, f"Unexpected size at line {infile.line_in_file}"
return size return size
def mvalue_from_index(i: int) -> ModelValue: def mvalue_from_index(i: int) -> ModelValue:
@ -418,55 +440,9 @@ def parse_mvalue(x: str) -> ModelValue:
""" """
return mvalue_from_index(int(x)) return mvalue_from_index(int(x))
def determine_cresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue:
"""
Determine what a b should be given the ordering table.
"""
for i in range(size + 1):
c = mvalue_from_index(i)
if not ordering[(c, a)]: def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[
continue Tuple[OrderTable, Optional[ModelFunction], Optional[ModelFunction]]]:
if not ordering[(c, b)]:
continue
invalid = False
for j in range(size + 1):
d = mvalue_from_index(j)
if c == d:
continue
if ordering[(c, d)]:
if ordering[(d, a)] and ordering [(d, b)]:
invalid = True
if not invalid:
return c
def determine_dresult(size: int, ordering: Dict[ModelValue, ModelValue], a: ModelValue, b: ModelValue) -> ModelValue:
"""
Determine what a b should be given the ordering table.
"""
for i in range(size + 1):
c = mvalue_from_index(i)
if not ordering[(a, c)]:
continue
if not ordering[(b, c)]:
continue
invalid = False
for j in range(size + 1):
d = mvalue_from_index(j)
if d == c:
continue
if ordering[(d, c)]:
if ordering[(a, d)] and ordering[(b, d)]:
invalid = True
if not invalid:
return c
def parse_single_order(infile: SourceFile, size: int) -> Optional[Tuple[ModelFunction, ModelFunction]]:
""" """
Parse the line representing the ordering table Parse the line representing the ordering table
""" """
@ -476,47 +452,38 @@ def parse_single_order(infile: SourceFile, size: int) -> Optional[Tuple[ModelFun
table = line.split(" ") table = line.split(" ")
assert len(table) == (size + 1)**2, f"Order table doesn't match expected size at line {infile.current_line}" assert len(table) == (size + 1)**2, f"Order table doesn't match expected size at line {infile.line_in_file}"
ordering = OrderTable()
omapping = {} omapping = {}
table_i = 0 table_i = 0
for i in range(size + 1): for x, y in product(carrier_list, carrier_list):
x = mvalue_from_index(i)
for j in range(size + 1):
y = mvalue_from_index(j)
omapping[(x, y)] = table[table_i] == '1' omapping[(x, y)] = table[table_i] == '1'
if table[table_i] == '1':
ordering.add(x, y)
table_i += 1 table_i += 1
cmapping = {} cmapping = {}
dmapping = {} dmapping = {}
for i in range(size + 1): for x, y in product(carrier_list, carrier_list):
x = mvalue_from_index(i) cresult = ordering.meet(x, y)
for j in range(size + 1):
y = mvalue_from_index(j)
cresult = determine_cresult(size, omapping, x, y)
if cresult is None: if cresult is None:
print("[Warning] Conjunction and Disjunction are not well-defined") return ordering, None, None
print(f"{x}{y} = ??")
return None, None
cmapping[(x, y)] = cresult cmapping[(x, y)] = cresult
dresult = determine_dresult(size, omapping, x, y) dresult = ordering.join(x, y)
if dresult is None: if dresult is None:
print("[Warning] Conjunction and Disjunction are not well-defined") return ordering, None, None
print(f"{x} {y} = ??")
return None, None
dmapping[(x, y)] = dresult dmapping[(x, y)] = dresult
mconjunction = ModelFunction(2, cmapping, "") mconjunction = ModelFunction(2, cmapping, "")
mdisjunction = ModelFunction(2, dmapping, "") mdisjunction = ModelFunction(2, dmapping, "")
return mconjunction, mdisjunction return ordering, mconjunction, mdisjunction
def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[ModelValue]]: def parse_single_designated(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[Set[ModelValue]]:
""" """
Parse the line representing which model values are designated. Parse the line representing which model values are designated.
""" """
@ -525,13 +492,12 @@ def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[Model
return None return None
row = line.split(" ") row = line.split(" ")
assert len(row) == size + 1, f"Designated table doesn't match expected size at line {infile.current_line}" assert len(row) == size + 1, f"Designated table doesn't match expected size at line {infile.line_in_file}"
designated_values = set() designated_values = set()
for i, j in zip(range(size + 1), row): for x, j in zip(carrier_list, row):
if j == '1': if j == '1':
x = mvalue_from_index(i)
designated_values.add(x) designated_values.add(x)
return designated_values return designated_values
@ -543,29 +509,28 @@ def parse_single_nullary_connective(infile: SourceFile, symbol: str) -> Optional
return None return None
row = line.split(" ") row = line.split(" ")
assert len(row) == 1, f"More than one assignment for a nullary connective was provided at line {infile.current_line}" assert len(row) == 1, f"More than one assignment for a nullary connective was provided at line {infile.line_in_file}"
mapping = {} mapping = {}
mapping[()] = parse_mvalue(row[0]) mapping[()] = parse_mvalue(row[0])
return ModelFunction(0, mapping, symbol) return ModelFunction(0, mapping, symbol)
def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]: def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]:
line = infile.next_line() line = infile.next_line()
if line == '-1': if line == '-1':
return None return None
row = line.split(" ") row = line.split(" ")
assert len(row) == size + 1, f"{symbol} table doesn't match size at line {infile.current_line}" assert len(row) == size + 1, f"{symbol} table doesn't match size at line {infile.line_in_file}"
mapping = {} mapping = {}
for i, j in zip(range(size + 1), row): for x, j in zip(carrier_list, row):
x = mvalue_from_index(i)
y = parse_mvalue(j) y = parse_mvalue(j)
mapping[(x, )] = y mapping[(x, )] = y
return ModelFunction(1, mapping, symbol) return ModelFunction(1, mapping, symbol)
def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]: def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]:
first_token = next(infile) first_token = next(infile)
if first_token == "-1": if first_token == "-1":
return None return None
@ -574,20 +539,14 @@ def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) -
try: try:
table = [first_token] + [next(infile) for _ in range((size + 1)**2 - 1)] table = [first_token] + [next(infile) for _ in range((size + 1)**2 - 1)]
except StopIteration: except StopIteration:
pass raise Exception(f"{symbol} table does not match expected size at line {infile.line_in_file}")
assert len(table) == (size + 1)**2, f"{symbol} table does not match expected size at line {infile.current_line}"
mapping = {} mapping = {}
table_i = 0 table_i = 0
for i in range(size + 1):
x = mvalue_from_index(i)
for j in range(size + 1):
y = mvalue_from_index(j)
for x, y in product(carrier_list, carrier_list):
r = parse_mvalue(table[table_i]) r = parse_mvalue(table[table_i])
table_i += 1 table_i += 1
mapping[(x, y)] = r mapping[(x, y)] = r
return ModelFunction(2, mapping, symbol) return ModelFunction(2, mapping, symbol)

1
utils/README.md Normal file
View file

@ -0,0 +1 @@
This folder contains scripts that may be used during experimentation. These are intended to be used ad-hoc and we do not guarentee the maitainance of these scripts.

View file

@ -0,0 +1,105 @@
"""
Given two MaGIC ugly data files that correspond to
the same logic. Report any differences in the models
that exhibit VSP.
Overall process:
- Determine which models in file 1 have VSP
- Print if model does not exist in file 2
- For models in file 2 that were not already encountered for,
check if they have VSP.
- Print models that do
"""
import argparse
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import argparse
from model import model_equivalence
from parse_magic import SourceFile, parse_matrices
from vsp import has_vsp
from vspursuer import restructure_solutions
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Compare models that have VSP in two ugly files")
parser.add_argument("ugly1", type=str, help="First ugly data file")
parser.add_argument("ugly2", type=str, help="Second ugly data file")
parser.add_argument("--ignore-constants", action='store_true', help="When it comes to model equivalance, ignore constants")
args = vars(parser.parse_args())
data_file1 = open(args['ugly1'], "r")
solutions1 = parse_matrices(SourceFile(data_file1))
solutions1 = restructure_solutions(solutions1, None)
data_file2 = open(args['ugly2'], "r")
solutions2 = parse_matrices(SourceFile(data_file2))
solutions2 = list(restructure_solutions(solutions2, None))
ignore_constants = args.get("ignore_constants", False)
# Total count of models
total_models1 = 0
total_models2 = 0
# Models that exhibit VSP
good_models1 = 0
good_models2 = 0
# Models that don't exhibit VSP
bad_models1 = 0
bad_models2 = 0
# Models that exhibit VSP but does
# not exist in the other file.
extra_models1 = 0
extra_models2 = 0
for model, impfunction, negation_defined in solutions1:
total_models1 += 1
vsp_result = has_vsp(model, impfunction, negation_defined)
if vsp_result.has_vsp:
good_models1 += 1
# Check to see if model exists in file 2
match_found_index = (False, -1)
for i in range(len(solutions2) - 1, -1, -1):
if model_equivalence(model, solutions2[i][0], ignore_constants):
match_found_index = (True, i)
break
if match_found_index[0]:
# If so, remove the model from the second set
total_models2 += 1
good_models2 += 1
del solutions2[match_found_index[1]]
else:
extra_models1 += 1
print(f"VSP Model {model.name} not found in file 2.")
print(model)
else:
bad_models1 += 1
# Check through the remaining models in the second set
for model, impfunction, negation_defined in solutions2:
total_models2 += 1
vsp_result = has_vsp(model, impfunction, negation_defined)
if not vsp_result.has_vsp:
bad_models2 += 1
else:
print("VSP model", model.name, "does not appear in file 1")
good_models2 += 1
extra_models2 += 1
print("File 1 has a total of", total_models1, "models.")
print("Out of which,", good_models1, "exhibit VSP while", bad_models1, "do not.")
print("File 1 has a total of", extra_models1, "which exhibit VSP but do not appear in file 2.")
print("")
print("File 2 has a total of", total_models2, "models")
print("Out of which,", good_models2, "exhibit VSP while", bad_models2, "do not.")
print("File 2 has a total of", extra_models2, "which exhibit VSP but do not appear in file 1.")

54
utils/hasse.py Normal file
View file

@ -0,0 +1,54 @@
"""
Given a model, create a Hasse diagram.
Note: This has a dependency on the hasse-diagram library
https://pypi.org/project/hasse-diagram/
"""
import argparse
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from model import Model
from parse_magic import SourceFile, parse_matrices
import numpy as np
import hassediagram
__all__ = ['plot_model_hassee']
def plot_model_hassee(model: Model):
assert model.ordering is not None
carrier_list = list(model.carrier_set)
hasse_ordering = []
for elem1 in carrier_list:
elem_ordering = []
for elem2 in carrier_list:
elem_ordering.append(
1 if model.ordering.is_lt(elem1, elem2) else 0
)
hasse_ordering.append(elem_ordering)
hassediagram.plot_hasse(np.array(hasse_ordering), carrier_list)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Show hassee diagram for model")
parser.add_argument("uglyfile", type=str, help="Path to ugly data file")
parser.add_argument("modelname", type=str, help="Name of model within file")
args = vars(parser.parse_args())
data_file = open(args['uglyfile'], "r")
solutions = parse_matrices(SourceFile(data_file))
requested_model = None
for model, _ in solutions:
if model.name == args['modelname']:
requested_model = model
break
if requested_model is None:
print("Model name", args['modelname'], "not found.")
sys.exit(0)
plot_model_hassee(requested_model)

30
utils/print_model.py Normal file
View file

@ -0,0 +1,30 @@
"""
Print a model given it's name
and ugly data file
"""
import argparse
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from parse_magic import SourceFile, parse_matrices
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Show hassee diagram for model")
parser.add_argument("uglyfile", type=str, help="Path to ugly data file")
parser.add_argument("modelname", type=str, help="Name of model within file")
args = vars(parser.parse_args())
data_file = open(args['uglyfile'], "r")
solutions = parse_matrices(SourceFile(data_file))
model_found = False
for model, _ in solutions:
if model.name == args['modelname']:
model_found = True
print(model)
break
if not model_found:
print("Model", args['modelname'], "not found.")

181
vsp.py
View file

@ -2,83 +2,12 @@
Check to see if the model has the variable Check to see if the model has the variable
sharing property. sharing property.
""" """
from itertools import chain, combinations, product from itertools import product
from typing import Dict, List, Optional, Set, Tuple from typing import List, Optional, Set, Tuple
from common import set_to_str from common import set_to_str
from model import ( from model import (
Model, model_closure, ModelFunction, ModelValue Model, model_closure, ModelFunction, ModelValue
) )
from logic import Conjunction, Disjunction, Implication, Operation
def preseed(
initial_set: Set[ModelValue],
cache:List[Tuple[Set[ModelValue], Set[ModelValue]]]):
"""
Given a cache of previous model_closure calls,
use this to compute an initial model closure
set based on the initial set.
Basic Idea:
Let {1, 2, 3} -> X be in the cache.
If {1,2,3} is a subset of initial set,
then X is the subset of the output of model_closure.
This is used to speed up subsequent calls to model_closure
"""
candidate_preseed: Tuple[Set[ModelValue], int] = (None, None)
for i, o in cache:
if i < initial_set:
cost = len(initial_set - i)
# If i is a subset with less missing elements than
# the previous candidate, then it's the new candidate.
if candidate_preseed[1] is None or cost < candidate_preseed[1]:
candidate_preseed = o, cost
same_set = candidate_preseed[1] == 0
return candidate_preseed[0], same_set
def find_top(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]:
"""
Find the top of the order lattice.
T || a = T, T && a = a for all a in the carrier set
"""
if mconjunction is None or mdisjunction is None:
return None
for x in algebra:
is_top = True
for y in algebra:
if mdisjunction(x, y) != x or mconjunction(x, y) != y:
is_top = False
break
if is_top:
return x
print("[Warning] Failed to find the top of the lattice")
return None
def find_bottom(algebra: Set[ModelValue], mconjunction: Optional[ModelFunction], mdisjunction: Optional[ModelFunction]) -> Optional[ModelValue]:
"""
Find the bottom of the order lattice
F || a = a, F && a = F for all a in the carrier set
"""
if mconjunction is None or mdisjunction is None:
return None
for x in algebra:
is_bottom = True
for y in algebra:
if mdisjunction(x, y) != y or mconjunction(x, y) != x:
is_bottom = False
break
if is_bottom:
return x
print("[Warning] Failed to find the bottom of the lattice")
return None
class VSP_Result: class VSP_Result:
def __init__( def __init__(
@ -98,98 +27,90 @@ Subalgebra 1: {set_to_str(self.subalgebra1)}
Subalgebra 2: {set_to_str(self.subalgebra2)} Subalgebra 2: {set_to_str(self.subalgebra2)}
""" """
def has_vsp(model: Model, interpretation: Dict[Operation, ModelFunction]) -> VSP_Result: def has_vsp(model: Model, impfunction: ModelFunction,
negation_defined: bool) -> VSP_Result:
""" """
Checks whether a model has the variable Checks whether a model has the variable
sharing property. sharing property.
""" """
impfunction = interpretation[Implication]
mconjunction = interpretation.get(Conjunction)
mdisjunction = interpretation.get(Disjunction)
top = find_top(model.carrier_set, mconjunction, mdisjunction)
bottom = find_bottom(model.carrier_set, mconjunction, mdisjunction)
# NOTE: No models with only one designated # NOTE: No models with only one designated
# value satisfies VSP # value satisfies VSP
if len(model.designated_values) == 1: if len(model.designated_values) == 1:
return VSP_Result(False, model.name) return VSP_Result(False, model.name)
assert model.ordering is not None, "Expected ordering table in model"
top = model.ordering.top()
bottom = model.ordering.bottom()
# Compute I the set of tuples (x, y) where # Compute I the set of tuples (x, y) where
# x -> y does not take a designiated value # x -> y does not take a designiated value
I: Set[Tuple[ModelValue, ModelValue]] = set() I: List[Tuple[ModelValue, ModelValue]] = []
for (x, y) in product(model.carrier_set, model.carrier_set): for (x, y) in product(model.carrier_set, model.carrier_set):
if impfunction(x, y) not in model.designated_values: if impfunction(x, y) not in model.designated_values:
I.add((x, y)) I.append((x, y))
# Construct the powerset of I without the empty set
s = list(I)
I_power = chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1))
# ((x1, y1)), ((x1, y1), (x2, y2)), ...
# Closure cache
closure_cache: List[Tuple[Set[ModelValue], Set[ModelValue]]] = []
# Find the subalgebras which falsify implication # Find the subalgebras which falsify implication
for xys in I_power: for xys in I:
xs = {xy[0] for xy in xys} xi = xys[0]
orig_xs = xs
cached_xs = preseed(xs, closure_cache)
if cached_xs[0] is not None:
xs |= cached_xs[0]
ys = {xy[1] for xy in xys} # Discard ({⊥} A', B) subalgebras
orig_ys = ys if bottom is not None and xi == bottom:
cached_ys = preseed(ys, closure_cache)
if cached_ys[0] is not None:
ys |= cached_ys[0]
# NOTE: Optimziation before model_closure
# If the two subalgebras intersect, move
# onto the next pair
if len(xs & ys) > 0:
continue continue
# NOTE: Optimization # Discard ({} A', B) subalgebras when negation is defined
# If the left subalgebra contains bottom if top is not None and negation_defined and xi == top:
# or the right subalgebra contains top
# skip this pair
if top is not None and top in ys:
continue
if bottom is not None and bottom in xs:
continue continue
# Compute the closure of all operations yi = xys[1]
# with just the xs
carrier_set_left: Set[ModelValue] = model_closure(xs, model.logical_operations, bottom)
# Save to cache # Discard (A, {} B') subalgebras
if cached_xs[0] is not None and not cached_ys[1]: if top is not None and yi == top:
closure_cache.append((orig_xs, carrier_set_left)) continue
# Discard (A, {⊥} B') subalgebras when negation is defined
if bottom is not None and negation_defined and yi == bottom:
continue
# Discard ({a} A', {b} B') subalgebras when a <= b
if model.ordering.is_lt(xi, yi):
continue
# Discard ({a} A', {b} B') subalgebras when b <= a and negation is defined
if negation_defined and model.ordering.is_lt(yi, xi):
continue
# Compute the left closure of the set containing xi under all the operations
carrier_set_left: Set[ModelValue] = model_closure({xi,}, model.logical_operations, bottom)
# Discard ({⊥} A', B) subalgebras
if bottom is not None and bottom in carrier_set_left: if bottom is not None and bottom in carrier_set_left:
continue continue
# Discard ({} A', B) subalgebras when negation is defined
if top is not None and negation_defined and top in carrier_set_left:
continue
# Compute the closure of all operations # Compute the closure of all operations
# with just the ys # with just the ys
carrier_set_right: Set[ModelValue] = model_closure(ys, model.logical_operations, top) carrier_set_right: Set[ModelValue] = model_closure({yi,}, model.logical_operations, top)
# Save to cache
if cached_ys[0] is not None and not cached_ys[1]:
closure_cache.append((orig_ys, carrier_set_right))
# Discard (A, {} B') subalgebras
if top is not None and top in carrier_set_right: if top is not None and top in carrier_set_right:
continue continue
# If the carrier set intersects, then move on to the next # Discard (A, {⊥} B') subalgebras when negation is defined
# subalgebra if bottom is not None and negation_defined and bottom in carrier_set_right:
if len(carrier_set_left & carrier_set_right) > 0:
continue continue
# See if for all pairs in the subalgebras, that # Discard subalgebras that intersect
# implication is falsified if not carrier_set_left.isdisjoint(carrier_set_right):
continue
# Check whether for all pairs in the subalgebra,
# that implication is falsified.
falsified = True falsified = True
for (x2, y2) in product(carrier_set_left, carrier_set_right): for (x2, y2) in product(carrier_set_left, carrier_set_right):
if impfunction(x2, y2) in model.designated_values: if impfunction(x2, y2) in model.designated_values:

View file

@ -1,45 +1,214 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from os import cpu_count
import argparse
import multiprocessing
from parse_magic import ( from datetime import datetime
SourceFile, from typing import Dict, Iterator, Optional, Tuple
parse_matrices from queue import Empty as QueueEmpty
) import argparse
import multiprocessing as mp
from logic import Negation, Implication, Operation
from model import Model, ModelFunction
from parse_magic import SourceFile, parse_matrices
from vsp import has_vsp, VSP_Result from vsp import has_vsp, VSP_Result
def print_with_timestamp(message):
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] {message}", flush=True)
def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], skip_to: Optional[str]) -> \
Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]:
"""
When subprocess gets spawned, the logical operations will
have a different memory address than what's expected in interpretation.
Therefore, we need to pass the model functions directly instead.
While we're at it, filter out models until we get to --skip-to
if applicable.
"""
start_processing = skip_to is None
for model, interpretation in solutions:
# If skip_to is defined, then don't process models
# until then.
if not start_processing and model.name != skip_to:
continue
start_processing = True
# NOTE: Implication must be defined, the rest may not
impfunction = interpretation[Implication]
negation_defined = Negation in interpretation
yield (model, impfunction, negation_defined)
def has_vsp_plus_model(model, impfunction, negation_defined) -> Tuple[Optional[Model], VSP_Result]:
"""
Wrapper which also stores the models along with its vsp result
"""
vsp_result = has_vsp(model, impfunction, negation_defined)
# NOTE: Memory optimization - Don't return model if it doesn't have VSP
model = model if vsp_result.has_vsp else None
return (model, vsp_result)
def worker_vsp(task_queue: mp.Queue, result_queue: mp.Queue):
"""
Worker process which processes models from the task
queue and adds the result to the result_queue.
Adds the sentinal value None when exception occurs and when there's
no more to process.
"""
try:
while True:
task = task_queue.get()
# If sentinal value, break
if task is None:
break
(model, impfunction, negation_defined) = task
result = has_vsp_plus_model(model, impfunction, negation_defined)
result_queue.put(result)
finally:
# Either an exception occured or the worker finished
# Push sentinal value
result_queue.put(None)
def worker_parser(data_file_path: str, num_sentinal_values: int, task_queue: mp.Queue, skip_to: Optional[str]):
"""
Function which parses the MaGIC file
and adds models to the task_queue.
Intended to be deployed with a dedicated process.
"""
try:
data_file = open(data_file_path, "r")
solutions = parse_matrices(SourceFile(data_file))
solutions = restructure_solutions(solutions, skip_to)
while True:
try:
item = next(solutions)
task_queue.put(item)
except StopIteration:
break
finally:
data_file.close()
for _ in range(num_sentinal_values):
task_queue.put(None)
def multi_process_runner(num_cpu: int, data_file_path: str, skip_to: Optional[str]):
"""
Run VSPursuer in a multi-process configuration.
"""
assert num_cpu > 1
num_tested = 0
num_has_vsp = 0
num_workers = num_cpu - 1
# Create queues
task_queue = mp.Queue(maxsize=1000)
result_queue = mp.Queue()
# Create dedicated process to parse the MaGIC file
process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, skip_to))
process_parser.start()
# Create dedicated processes which check VSP
process_workers = []
for _ in range(num_workers):
p = mp.Process(target=worker_vsp, args=(task_queue, result_queue))
process_workers.append(p)
p.start()
# Check results and add new tasks until finished
result_sentinal_count = 0
while True:
# Read a result
try:
result = result_queue.get(True, 60)
except QueueEmpty:
if all((not p.is_alive() for p in process_workers)):
# All workers finished without us receiving all the
# sentinal values.
break
task_queue_size = 0
try:
task_queue_size = task_queue.qsize()
except NotImplementedError:
# MacOS doesn't implement this
pass
if task_queue_size == 0 and not process_parser.is_alive():
# For Linux/Windows this means that the process_parser
# died before sending the sentinal values.
# For Mac, this doesn't guarentee anything but might
# as well push more sentinal values.
for _ in range(num_workers):
task_queue.put(None)
# Don't do anymore work, wait again for a result
continue
# When we receive None, it means a child process has finished
if result is None:
result_sentinal_count += 1
# If all workers have finished break
if result_sentinal_count == len(process_workers):
break
continue
# Process result
model, vsp_result = result
print_with_timestamp(vsp_result)
num_tested += 1
if vsp_result.has_vsp:
print(model)
if vsp_result.has_vsp:
num_has_vsp += 1
print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
def single_process_runner(data_file_path: str, skip_to: Optional[str]):
num_tested = 0
num_has_vsp = 0
data_file = open(data_file_path, "r")
solutions = parse_matrices(SourceFile(data_file))
solutions = restructure_solutions(solutions, skip_to)
for model, impfunction, negation_defined in solutions:
model, vsp_result = has_vsp_plus_model(model, impfunction, negation_defined)
print_with_timestamp(vsp_result)
num_tested += 1
if vsp_result.has_vsp:
print(model)
if vsp_result.has_vsp:
num_has_vsp += 1
print_with_timestamp(f"Tested {num_tested} models, {num_has_vsp} of which satisfy VSP")
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VSP Checker") parser = argparse.ArgumentParser(description="VSP Checker")
parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices") parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices")
parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file") parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file")
parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: 1")
parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.")
args = vars(parser.parse_args()) args = vars(parser.parse_args())
data_file_path = args.get("i") data_file_path = args.get("i")
if data_file_path is None: if data_file_path is None:
data_file_path = input("Path to MaGIC Ugly Data File: ") data_file_path = input("Path to MaGIC Ugly Data File: ")
solutions = [] num_cpu = args.get("c")
with open(data_file_path, "r") as data_file: if num_cpu is None:
solutions = parse_matrices(SourceFile(data_file)) num_cpu = 1
print(f"Parsed {len(solutions)} matrices")
num_has_vsp = 0 if num_cpu == 1:
with multiprocessing.Pool(processes=max(cpu_count() - 2, 1)) as pool: single_process_runner(data_file_path, args.get("skip_to"))
results = [ else:
pool.apply_async(has_vsp, (model, interpretation,)) multi_process_runner(num_cpu, data_file_path, args.get("skip_to"))
for model, interpretation in solutions
]
for i, result in enumerate(results):
vsp_result: VSP_Result = result.get()
print(vsp_result)
if args['verbose'] or vsp_result.has_vsp:
model = solutions[i][0]
print(model)
if vsp_result.has_vsp:
num_has_vsp += 1
print(f"Tested {len(solutions)} models, {num_has_vsp} of which satisfy VSP")