mirror of
https://github.com/Brandon-Rozek/matmod.git
synced 2025-07-29 20:52:01 +00:00
Cleanup and small optimizations
This commit is contained in:
parent
de8637bca4
commit
6f5074584b
3 changed files with 85 additions and 70 deletions
3
model.py
3
model.py
|
@ -103,6 +103,9 @@ def binary_function_str(f: ModelFunction) -> str:
|
|||
|
||||
Interpretation = Dict[Operation, ModelFunction]
|
||||
|
||||
# TODO: Replace with a nicer representation
|
||||
# Include meet and join functions
|
||||
# Something like x : (all elements less than x)
|
||||
class OrderTable:
|
||||
def __init__(self):
|
||||
self.ordering = set()
|
||||
|
|
127
parse_magic.py
127
parse_magic.py
|
@ -5,6 +5,7 @@ Assumes the base logic is R with no extra connectives
|
|||
"""
|
||||
import re
|
||||
from typing import TextIO, List, Iterator, Optional, Tuple, Set, Dict
|
||||
from itertools import product
|
||||
|
||||
from model import Model, ModelValue, ModelFunction, OrderTable
|
||||
from logic import (
|
||||
|
@ -19,7 +20,7 @@ from logic import (
|
|||
class SourceFile:
|
||||
def __init__(self, fileobj: TextIO):
|
||||
self.fileobj = fileobj
|
||||
self.current_line = 0
|
||||
self.line_in_file = 0
|
||||
self.reststr = ""
|
||||
|
||||
def next_line(self):
|
||||
|
@ -34,7 +35,7 @@ class SourceFile:
|
|||
return reststr
|
||||
|
||||
contents = next(self.fileobj).strip()
|
||||
self.current_line += 1
|
||||
self.line_in_file += 1
|
||||
return contents
|
||||
|
||||
def __next__(self):
|
||||
|
@ -43,7 +44,7 @@ class SourceFile:
|
|||
"""
|
||||
if self.reststr == "":
|
||||
self.reststr = next(self.fileobj).strip()
|
||||
self.current_line += 1
|
||||
self.line_in_file += 1
|
||||
|
||||
next_token, _, self.reststr = self.reststr.partition(" ")
|
||||
return next_token
|
||||
|
@ -60,7 +61,7 @@ class UglyHeader:
|
|||
class ModelBuilder:
|
||||
def __init__(self):
|
||||
self.size : int = 0
|
||||
self.carrier_set : Set[ModelValue] = set()
|
||||
self.carrier_list : List[ModelValue] = []
|
||||
self.mnegation: Optional[ModelFunction] = None
|
||||
self.ordering: Optional[OrderTable] = None
|
||||
self.mconjunction: Optional[ModelFunction] = None
|
||||
|
@ -97,8 +98,6 @@ class Stages:
|
|||
|
||||
def add(self, name: str):
|
||||
stage = Stage(name)
|
||||
stage.next = stage
|
||||
|
||||
stage.previous = self.last_added_stage
|
||||
|
||||
# End stage is a sink so don't
|
||||
|
@ -115,11 +114,16 @@ class Stages:
|
|||
|
||||
def reset_after(self, name):
|
||||
"""
|
||||
Resets the stage counters after a given stage.
|
||||
This is to accurately reflect the name of the
|
||||
model within MaGIC.
|
||||
Resets the counters of every stage after
|
||||
a given stage.
|
||||
|
||||
This is to accurately reflect how names are
|
||||
generated within magic.
|
||||
Example: 1.1, 1.2, (reset after 1), 2.1, 2.2
|
||||
"""
|
||||
stage = self.stages[name]
|
||||
# NOTE: The process_model stage doesn't
|
||||
# have a counter associated with it.
|
||||
while stage.name != "process_model":
|
||||
stage.reset()
|
||||
stage = stage.next
|
||||
|
@ -128,15 +132,26 @@ class Stages:
|
|||
return self.stages[name]
|
||||
|
||||
def name(self):
|
||||
"""
|
||||
Get the full name of where we are within
|
||||
the parsing process. Takes into account
|
||||
the stage number of all the stages seen
|
||||
so far.
|
||||
"""
|
||||
|
||||
# Stages haven't been added yet
|
||||
result = ""
|
||||
stage = self.first_stage
|
||||
if stage is None:
|
||||
return ""
|
||||
|
||||
# There's only one stage
|
||||
result = f"{stage.num}"
|
||||
if stage.next == "process_model":
|
||||
return result
|
||||
|
||||
# Add every subsequent stage counter
|
||||
# by appending .stage_num
|
||||
stage = stage.next
|
||||
while stage is not None:
|
||||
result += f".{stage.num}"
|
||||
|
@ -172,6 +187,7 @@ def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation,
|
|||
stages = derive_stages(header)
|
||||
first_run = True
|
||||
current_model_parts = ModelBuilder()
|
||||
|
||||
stage = stages.first_stage
|
||||
while True:
|
||||
match stage.name:
|
||||
|
@ -245,22 +261,24 @@ def parse_matrices(infile: SourceFile) -> Iterator[Tuple[Model, Dict[Operation,
|
|||
stage = stage.previous
|
||||
|
||||
def process_sizes(infile: SourceFile, current_model_parts: ModelBuilder, first_run: bool) -> bool:
|
||||
size: Optional[int] = None
|
||||
try:
|
||||
size = parse_size(infile, first_run)
|
||||
except StopIteration:
|
||||
return False
|
||||
pass
|
||||
|
||||
if size is None:
|
||||
return False
|
||||
|
||||
carrier_set = carrier_set_from_size(size)
|
||||
current_model_parts.carrier_set = carrier_set
|
||||
carrier_list = carrier_list_from_size(size)
|
||||
current_model_parts.carrier_list = carrier_list
|
||||
current_model_parts.size = size
|
||||
|
||||
return True
|
||||
|
||||
def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||
"""Stage 2 (Optional)"""
|
||||
mnegation = parse_single_monadic_connective(infile, "¬", current_model_parts.size)
|
||||
mnegation = parse_single_monadic_connective(infile, "¬", current_model_parts.size, current_model_parts.carrier_list)
|
||||
if mnegation is None:
|
||||
return False
|
||||
|
||||
|
@ -269,7 +287,7 @@ def process_negations(infile: SourceFile, current_model_parts: ModelBuilder) ->
|
|||
|
||||
def process_orders(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||
"""Stage 3"""
|
||||
result = parse_single_order(infile, current_model_parts.size)
|
||||
result = parse_single_order(infile, current_model_parts.size, current_model_parts.carrier_list)
|
||||
if result is None:
|
||||
return False
|
||||
|
||||
|
@ -299,7 +317,7 @@ def process_implications(infile: SourceFile, current_model_parts: ModelBuilder)
|
|||
return True
|
||||
|
||||
def process_necessitations(infile: SourceFile, current_model_parts: ModelBuilder) -> bool:
|
||||
mnecessitation = parse_single_monadic_connective(infile, "!", current_model_parts.size)
|
||||
mnecessitation = parse_single_monadic_connective(infile, "!", current_model_parts.size, current_model_parts.carrier_list)
|
||||
if mnecessitation is None:
|
||||
return False
|
||||
|
||||
|
@ -310,7 +328,7 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
|
|||
if adicity == 0:
|
||||
mfunction = parse_single_nullary_connective(infile, symbol)
|
||||
elif adicity == 1:
|
||||
mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size)
|
||||
mfunction = parse_single_monadic_connective(infile, symbol, current_model_parts.size, current_model_parts.carrier_list)
|
||||
elif adicity == 2:
|
||||
mfunction = parse_single_dyadic_connective(infile, symbol, current_model_parts.size)
|
||||
else:
|
||||
|
@ -325,8 +343,8 @@ def process_custom_connective(infile: SourceFile, symbol: str, adicity: int, cur
|
|||
def process_model(model_name: str, mp: ModelBuilder) -> Tuple[Model, Dict[Operation, ModelFunction]]:
|
||||
"""Create Model"""
|
||||
assert mp.size > 0
|
||||
assert mp.size + 1 == len(mp.carrier_set)
|
||||
assert len(mp.designated_values) <= len(mp.carrier_set)
|
||||
assert mp.size + 1 == len(mp.carrier_list)
|
||||
assert len(mp.designated_values) <= len(mp.carrier_list)
|
||||
assert mp.mimplication is not None
|
||||
|
||||
logical_operations = { mp.mimplication }
|
||||
|
@ -351,8 +369,8 @@ def process_model(model_name: str, mp: ModelBuilder) -> Tuple[Model, Dict[Operat
|
|||
logical_operations.add(custom_mf)
|
||||
op = Operation(custom_mf.operation_name, custom_mf.arity)
|
||||
interpretation[op] = custom_mf
|
||||
|
||||
model = Model(mp.carrier_set, logical_operations, mp.designated_values, ordering=mp.ordering, name=model_name)
|
||||
|
||||
model = Model(set(mp.carrier_list), logical_operations, mp.designated_values, ordering=mp.ordering, name=model_name)
|
||||
return (model, interpretation)
|
||||
|
||||
|
||||
|
@ -375,14 +393,14 @@ def parse_header(infile: SourceFile) -> UglyHeader:
|
|||
custom_model_functions.append((arity, symbol))
|
||||
return UglyHeader(negation_defined, necessitation_defined, custom_model_functions)
|
||||
|
||||
def carrier_set_from_size(size: int) -> Set[ModelValue]:
|
||||
def carrier_list_from_size(size: int) -> List[ModelValue]:
|
||||
"""
|
||||
Construct a carrier set of model values
|
||||
based on the desired size.
|
||||
"""
|
||||
return {
|
||||
return [
|
||||
mvalue_from_index(i) for i in range(size + 1)
|
||||
}
|
||||
]
|
||||
|
||||
def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
|
||||
"""
|
||||
|
@ -399,7 +417,7 @@ def parse_size(infile: SourceFile, first_run: bool) -> Optional[int]:
|
|||
|
||||
if size == -1:
|
||||
return None
|
||||
assert size > 0, f"Unexpected size at line {infile.current_line}"
|
||||
assert size > 0, f"Unexpected size at line {infile.line_in_file}"
|
||||
return size
|
||||
|
||||
def mvalue_from_index(i: int) -> ModelValue:
|
||||
|
@ -463,7 +481,7 @@ def determine_dresult(size: int, ordering: Dict[ModelValue, ModelValue], a: Mode
|
|||
if not invalid:
|
||||
return c
|
||||
|
||||
def parse_single_order(infile: SourceFile, size: int) -> Optional[Tuple[OrderTable, ModelFunction, ModelFunction]]:
|
||||
def parse_single_order(infile: SourceFile, size: int, carrier_list: List[ModelValue]) -> Optional[Tuple[OrderTable, ModelFunction, ModelFunction]]:
|
||||
"""
|
||||
Parse the line representing the ordering table
|
||||
"""
|
||||
|
@ -473,43 +491,35 @@ def parse_single_order(infile: SourceFile, size: int) -> Optional[Tuple[OrderTab
|
|||
|
||||
table = line.split(" ")
|
||||
|
||||
assert len(table) == (size + 1)**2, f"Order table doesn't match expected size at line {infile.current_line}"
|
||||
assert len(table) == (size + 1)**2, f"Order table doesn't match expected size at line {infile.line_in_file}"
|
||||
|
||||
ordering = OrderTable()
|
||||
omapping = {}
|
||||
table_i = 0
|
||||
|
||||
for i in range(size + 1):
|
||||
x = mvalue_from_index(i)
|
||||
for j in range(size + 1):
|
||||
y = mvalue_from_index(j)
|
||||
omapping[(x, y)] = table[table_i] == '1'
|
||||
if table[table_i] == '1':
|
||||
ordering.add(x, y)
|
||||
table_i += 1
|
||||
for x, y in product(carrier_list, carrier_list):
|
||||
omapping[(x, y)] = table[table_i] == '1'
|
||||
if table[table_i] == '1':
|
||||
ordering.add(x, y)
|
||||
table_i += 1
|
||||
|
||||
cmapping = {}
|
||||
dmapping = {}
|
||||
|
||||
for i in range(size + 1):
|
||||
x = mvalue_from_index(i)
|
||||
for j in range(size + 1):
|
||||
y = mvalue_from_index(j)
|
||||
|
||||
cresult = determine_cresult(size, omapping, x, y)
|
||||
if cresult is None:
|
||||
print("[Warning] Conjunction and Disjunction are not well-defined")
|
||||
print(f"{x} ∧ {y} = ??")
|
||||
return None, None
|
||||
cmapping[(x, y)] = cresult
|
||||
|
||||
dresult = determine_dresult(size, omapping, x, y)
|
||||
if dresult is None:
|
||||
print("[Warning] Conjunction and Disjunction are not well-defined")
|
||||
print(f"{x} ∨ {y} = ??")
|
||||
return None, None
|
||||
dmapping[(x, y)] = dresult
|
||||
for x, y in product(carrier_list, carrier_list):
|
||||
cresult = determine_cresult(size, omapping, x, y)
|
||||
if cresult is None:
|
||||
print("[Warning] Conjunction and Disjunction are not well-defined")
|
||||
print(f"{x} ∧ {y} = ??")
|
||||
return None, None
|
||||
cmapping[(x, y)] = cresult
|
||||
|
||||
dresult = determine_dresult(size, omapping, x, y)
|
||||
if dresult is None:
|
||||
print("[Warning] Conjunction and Disjunction are not well-defined")
|
||||
print(f"{x} ∨ {y} = ??")
|
||||
return None, None
|
||||
dmapping[(x, y)] = dresult
|
||||
|
||||
mconjunction = ModelFunction(2, cmapping, "∧")
|
||||
mdisjunction = ModelFunction(2, dmapping, "∨")
|
||||
|
@ -525,7 +535,7 @@ def parse_single_designated(infile: SourceFile, size: int) -> Optional[Set[Model
|
|||
return None
|
||||
|
||||
row = line.split(" ")
|
||||
assert len(row) == size + 1, f"Designated table doesn't match expected size at line {infile.current_line}"
|
||||
assert len(row) == size + 1, f"Designated table doesn't match expected size at line {infile.line_in_file}"
|
||||
|
||||
designated_values = set()
|
||||
|
||||
|
@ -543,23 +553,22 @@ def parse_single_nullary_connective(infile: SourceFile, symbol: str) -> Optional
|
|||
return None
|
||||
|
||||
row = line.split(" ")
|
||||
assert len(row) == 1, f"More than one assignment for a nullary connective was provided at line {infile.current_line}"
|
||||
assert len(row) == 1, f"More than one assignment for a nullary connective was provided at line {infile.line_in_file}"
|
||||
|
||||
mapping = {}
|
||||
mapping[()] = parse_mvalue(row[0])
|
||||
return ModelFunction(0, mapping, symbol)
|
||||
|
||||
def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int) -> Optional[ModelFunction]:
|
||||
def parse_single_monadic_connective(infile: SourceFile, symbol: str, size: int, carrier_list: List[ModelValue]) -> Optional[ModelFunction]:
|
||||
line = infile.next_line()
|
||||
if line == '-1':
|
||||
return None
|
||||
|
||||
row = line.split(" ")
|
||||
assert len(row) == size + 1, f"{symbol} table doesn't match size at line {infile.current_line}"
|
||||
assert len(row) == size + 1, f"{symbol} table doesn't match size at line {infile.line_in_file}"
|
||||
mapping = {}
|
||||
|
||||
for i, j in zip(range(size + 1), row):
|
||||
x = mvalue_from_index(i)
|
||||
for x, j in zip(carrier_list, row):
|
||||
y = parse_mvalue(j)
|
||||
mapping[(x, )] = y
|
||||
|
||||
|
@ -576,7 +585,7 @@ def parse_single_dyadic_connective(infile: SourceFile, symbol: str, size: int) -
|
|||
except StopIteration:
|
||||
pass
|
||||
|
||||
assert len(table) == (size + 1)**2, f"{symbol} table does not match expected size at line {infile.current_line}"
|
||||
assert len(table) == (size + 1)**2, f"{symbol} table does not match expected size at line {infile.line_in_file}"
|
||||
|
||||
mapping = {}
|
||||
table_i = 0
|
||||
|
|
25
vspursuer.py
25
vspursuer.py
|
@ -10,9 +10,9 @@ import multiprocessing as mp
|
|||
from logic import Conjunction, Disjunction, Negation, Implication, Operation
|
||||
from model import Model, ModelFunction
|
||||
from parse_magic import SourceFile, parse_matrices
|
||||
from vsp import has_vsp
|
||||
from vsp import has_vsp, VSP_Result
|
||||
|
||||
def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], args) -> \
|
||||
def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], skip_to: Optional[str]) -> \
|
||||
Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]:
|
||||
"""
|
||||
When subprocess gets spawned, the logical operations will
|
||||
|
@ -22,28 +22,31 @@ def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, Model
|
|||
While we're at it, filter out models until we get to --skip-to
|
||||
if applicable.
|
||||
"""
|
||||
start_processing = args.get("skip_to") is None
|
||||
start_processing = skip_to is None
|
||||
for model, interpretation in solutions:
|
||||
# If skip_to is defined, then don't process models
|
||||
# until then.
|
||||
if not start_processing and model.name == args.get("skip_to"):
|
||||
start_processing = True
|
||||
if not start_processing:
|
||||
if not start_processing and model.name != skip_to:
|
||||
continue
|
||||
start_processing = True
|
||||
|
||||
# NOTE: Implication must be defined, the rest may not
|
||||
impfunction = interpretation[Implication]
|
||||
mconjunction = interpretation.get(Conjunction)
|
||||
mdisjunction = interpretation.get(Disjunction)
|
||||
mnegation = interpretation.get(Negation)
|
||||
yield (model, impfunction, mconjunction, mdisjunction, mnegation)
|
||||
|
||||
def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation):
|
||||
def has_vsp_plus_model(model, impfunction, mconjunction, mdisjunction, mnegation) -> Tuple[Optional[Model], VSP_Result]:
|
||||
"""
|
||||
Wrapper which also stores the models along with its vsp result
|
||||
"""
|
||||
vsp_result = has_vsp(model, impfunction, mconjunction, mdisjunction, mnegation)
|
||||
# NOTE: Memory optimization - Don't return model if it doens't have VSP
|
||||
model = model if vsp_result.has_vsp else None
|
||||
return (model, vsp_result)
|
||||
|
||||
def worker_vsp(task_queue, result_queue):
|
||||
def worker_vsp(task_queue: mp.Queue, result_queue: mp.Queue):
|
||||
"""
|
||||
Worker process which processes models from the task
|
||||
queue and adds the result to the result_queue.
|
||||
|
@ -65,7 +68,7 @@ def worker_vsp(task_queue, result_queue):
|
|||
# Push sentinal value
|
||||
result_queue.put(None)
|
||||
|
||||
def worker_parser(data_file_path, num_sentinal_values, task_queue):
|
||||
def worker_parser(data_file_path: str, num_sentinal_values: int, task_queue: mp.Queue, skip_to: Optional[str]):
|
||||
"""
|
||||
Function which parses the MaGIC file
|
||||
and adds models to the task_queue.
|
||||
|
@ -75,7 +78,7 @@ def worker_parser(data_file_path, num_sentinal_values, task_queue):
|
|||
try:
|
||||
data_file = open(data_file_path, "r")
|
||||
solutions = parse_matrices(SourceFile(data_file))
|
||||
solutions = restructure_solutions(solutions, args)
|
||||
solutions = restructure_solutions(solutions, skip_to)
|
||||
|
||||
while True:
|
||||
try:
|
||||
|
@ -118,7 +121,7 @@ if __name__ == "__main__":
|
|||
result_queue = mp.Queue()
|
||||
|
||||
# Create dedicated process to parse the MaGIC file
|
||||
process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue))
|
||||
process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, args.get("skip_to")))
|
||||
process_parser.start()
|
||||
|
||||
# Create dedicated processes which check VSP
|
||||
|
|
Loading…
Add table
Reference in a new issue