Initial draft implementation of a signed VSP checker

This commit is contained in:
Brandon Rozek 2025-11-05 13:21:19 -05:00
parent 0a0b62f3a0
commit 4179345956
2 changed files with 563 additions and 1 deletions

220
svspursuer.py Executable file
View file

@ -0,0 +1,220 @@
#!/usr/bin/env python3
"""
Experimental Runner file to find the signed variable sharing property
"""
from datetime import datetime
from typing import Dict, Iterator, Optional, Tuple
from queue import Empty as QueueEmpty
import argparse
import multiprocessing as mp
from logic import Negation, Implication, Operation, Conjunction, Disjunction
from model import Model, ModelFunction
from parse_magic import SourceFile, parse_matrices
from vsp import has_svsp, SVSP_Result
def print_with_timestamp(message):
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] {message}", flush=True)
def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], skip_to: Optional[str]) -> \
Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]:
"""
When subprocess gets spawned, the logical operations will
have a different memory address than what's expected in interpretation.
Therefore, we need to pass the model functions directly instead.
While we're at it, filter out models until we get to --skip-to
if applicable.
"""
start_processing = skip_to is None
for model, interpretation in solutions:
# If skip_to is defined, then don't process models
# until then.
if not start_processing and model.name != skip_to:
continue
start_processing = True
# NOTE: Implication must be defined, the rest may not
impfunction = interpretation[Implication]
conjfn = interpretation.get(Conjunction)
disjfn = interpretation.get(Disjunction)
negfn = interpretation.get(Negation)
yield (model, impfunction, conjfn, disjfn, negfn)
def has_svsp_plus_model(model, impfn, conjfn, disjfn, negfn) -> Tuple[Optional[Model], SVSP_Result]:
"""
Wrapper which also stores the models along with its vsp result
"""
svsp_result = has_svsp(model, impfn, conjfn, disjfn, negfn)
# NOTE: Memory optimization - Don't return model if it doesn't have SVSP
model = model if svsp_result.has_svsp else None
return (model, svsp_result)
def worker_vsp(task_queue: mp.Queue, result_queue: mp.Queue):
"""
Worker process which processes models from the task
queue and adds the result to the result_queue.
Adds the sentinal value None when exception occurs and when there's
no more to process.
"""
try:
while True:
task = task_queue.get()
# If sentinal value, break
if task is None:
break
(model, impfn, conjfn, disjfn, negfn) = task
result = has_svsp_plus_model(model, impfn, conjfn, disjfn, negfn)
result_queue.put(result)
finally:
# Either an exception occured or the worker finished
# Push sentinal value
result_queue.put(None)
def worker_parser(data_file_path: str, num_sentinal_values: int, task_queue: mp.Queue, skip_to: Optional[str]):
"""
Function which parses the MaGIC file
and adds models to the task_queue.
Intended to be deployed with a dedicated process.
"""
try:
data_file = open(data_file_path, "r")
solutions = parse_matrices(SourceFile(data_file))
solutions = restructure_solutions(solutions, skip_to)
while True:
try:
item = next(solutions)
task_queue.put(item)
except StopIteration:
break
finally:
data_file.close()
for _ in range(num_sentinal_values):
task_queue.put(None)
def multi_process_runner(num_cpu: int, data_file_path: str, skip_to: Optional[str]):
"""
Run SVSPursuer in a multi-process configuration.
"""
assert num_cpu > 1
num_tested = 0
num_has_svsp = 0
num_workers = num_cpu - 1
# Create queues
task_queue = mp.Queue(maxsize=1000)
result_queue = mp.Queue()
# Create dedicated process to parse the MaGIC file
process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, skip_to))
process_parser.start()
# Create dedicated processes which check SVSP
process_workers = []
for _ in range(num_workers):
p = mp.Process(target=worker_vsp, args=(task_queue, result_queue))
process_workers.append(p)
p.start()
# Check results and add new tasks until finished
result_sentinal_count = 0
while True:
# Read a result
try:
result = result_queue.get(True, 60)
except QueueEmpty:
if all((not p.is_alive() for p in process_workers)):
# All workers finished without us receiving all the
# sentinal values.
break
task_queue_size = 0
try:
task_queue_size = task_queue.qsize()
except NotImplementedError:
# MacOS doesn't implement this
pass
if task_queue_size == 0 and not process_parser.is_alive():
# For Linux/Windows this means that the process_parser
# died before sending the sentinal values.
# For Mac, this doesn't guarentee anything but might
# as well push more sentinal values.
for _ in range(num_workers):
task_queue.put(None)
# Don't do anymore work, wait again for a result
continue
# When we receive None, it means a child process has finished
if result is None:
result_sentinal_count += 1
# If all workers have finished break
if result_sentinal_count == len(process_workers):
break
continue
# Process result
model, vsp_result = result
print_with_timestamp(vsp_result)
num_tested += 1
if vsp_result.has_svsp:
print(model)
if vsp_result.has_svsp:
num_has_svsp += 1
print_with_timestamp(f"Tested {num_tested} models, {num_has_svsp} of which satisfy SVSP")
def single_process_runner(data_file_path: str, skip_to: Optional[str]):
num_tested = 0
num_has_svsp = 0
data_file = open(data_file_path, "r")
solutions = parse_matrices(SourceFile(data_file))
solutions = restructure_solutions(solutions, skip_to)
for model, impfn, conjfn, disjfn, negfn in solutions:
model, svsp_result = has_svsp_plus_model(model, impfn, conjfn, disjfn, negfn)
print_with_timestamp(svsp_result)
num_tested += 1
if svsp_result.has_svsp:
print(model)
if svsp_result.has_svsp:
num_has_svsp += 1
print_with_timestamp(f"Tested {num_tested} models, {num_has_svsp} of which satisfy SVSP")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="SVSP Checker")
parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices")
parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file")
parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: 1")
parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.")
args = vars(parser.parse_args())
data_file_path = args.get("i")
if data_file_path is None:
data_file_path = input("Path to MaGIC Ugly Data File: ")
num_cpu = args.get("c")
if num_cpu is None:
num_cpu = 1
if num_cpu == 1:
single_process_runner(data_file_path, args.get("skip_to"))
else:
multi_process_runner(num_cpu, data_file_path, args.get("skip_to"))

344
vsp.py
View file

@ -2,7 +2,7 @@
Check to see if the model has the variable Check to see if the model has the variable
sharing property. sharing property.
""" """
from itertools import product from itertools import product, chain, combinations
from typing import List, Optional, Set, Tuple from typing import List, Optional, Set, Tuple
from common import set_to_str from common import set_to_str
from model import ( from model import (
@ -121,3 +121,345 @@ def has_vsp(model: Model, impfunction: ModelFunction,
return VSP_Result(True, model.name, carrier_set_left, carrier_set_right) return VSP_Result(True, model.name, carrier_set_left, carrier_set_right)
return VSP_Result(False, model.name) return VSP_Result(False, model.name)
class SVSP_Result:
def __init__(
self, has_svsp: bool, model_name: Optional[str] = None,
subalgebra1: Optional[Set[ModelValue]] = None,
subalgebra2: Optional[Set[ModelValue]] = None,
U: Optional[Set[ModelValue]] = None,
L: Optional[Set[ModelValue]] = None):
self.has_svsp = has_svsp
self.model_name = model_name
self.subalgebra1 = subalgebra1
self.subalgebra2 = subalgebra2
self.U = U
self.L = L
def __str__(self):
if not self.has_svsp:
return f"Model {self.model_name} does not have the signed variable sharing property."
return f"""Model {self.model_name} has the signed variable sharing property.
Subalgebra 1: {set_to_str(self.subalgebra1)}
Subalgebra 2: {set_to_str(self.subalgebra2)}
U: {set_to_str(self.U)}
L: {set_to_str(self.L)}
"""
def powerset_minus_empty(s):
return chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1))
def find_k1_k2(model, impfunction: ModelFunction,
negation_defined: bool) -> List[Tuple[Set[ModelValue], Set[ModelValue]]]:
"""
Returns a list of possible subalgebra pairs (K1, K2)
"""
assert model.ordering is not None, "Expected ordering table in model"
result = []
top = model.ordering.top()
bottom = model.ordering.bottom()
# Compute I the set of tuples (x, y) where
# x -> y does not take a designiated value
I: List[Tuple[ModelValue, ModelValue]] = []
for (x, y) in product(model.designated_values, model.designated_values):
if impfunction(x, y) not in model.designated_values:
I.append((x, y))
# Find the subalgebras which falsify implication
for xys in I:
xi = xys[0]
# Discard ({⊥} A', B) subalgebras
if bottom is not None and xi == bottom:
continue
# Discard ({} A', B) subalgebras when negation is defined
if top is not None and negation_defined and xi == top:
continue
yi = xys[1]
# Discard (A, {} B') subalgebras
if top is not None and yi == top:
continue
# Discard (A, {⊥} B') subalgebras when negation is defined
if bottom is not None and negation_defined and yi == bottom:
continue
# Discard ({a} A', {b} B') subalgebras when a <= b
if model.ordering.is_lt(xi, yi):
continue
# Discard ({a} A', {b} B') subalgebras when b <= a and negation is defined
if negation_defined and model.ordering.is_lt(yi, xi):
continue
# Compute the left closure of the set containing xi under all the operations
carrier_set_left: Set[ModelValue] = model_closure({xi,}, model.logical_operations, bottom)
# Discard ({⊥} A', B) subalgebras
if bottom is not None and bottom in carrier_set_left:
continue
# Discard ({} A', B) subalgebras when negation is defined
if top is not None and negation_defined and top in carrier_set_left:
continue
# Compute the closure of all operations
# with just the ys
carrier_set_right: Set[ModelValue] = model_closure({yi,}, model.logical_operations, top)
# Discard (A, {} B') subalgebras
if top is not None and top in carrier_set_right:
continue
# Discard (A, {⊥} B') subalgebras when negation is defined
if bottom is not None and negation_defined and bottom in carrier_set_right:
continue
# Discard subalgebras that intersect
if not carrier_set_left.isdisjoint(carrier_set_right):
continue
# Check whether for all pairs in the subalgebra,
# that implication is falsified.
falsified = True
for (x2, y2) in product(carrier_set_left, carrier_set_right):
if impfunction(x2, y2) in model.designated_values:
falsified = False
break
if falsified:
result.append((carrier_set_left, carrier_set_right))
return result
def find_candidate_u_l(model: Model, impfn: ModelFunction, negfn: Optional[ModelFunction]) -> List[Tuple[Set[ModelValue], Set[ModelValue]]]:
result: List[Tuple[Set[ModelValue], Set[ModelValue]]] = []
F = model.carrier_set - model.designated_values
Us = powerset_minus_empty(model.carrier_set)
Ls = powerset_minus_empty(model.carrier_set)
for (U, L) in product(Us, Ls):
unsat = False
U = set(U)
L = set(L)
LFi = F.intersection(L)
# Required property: ∀x ∈ U, y ∈ L(x → y ∈ L ∩ F)
for (x, y) in product(U, L):
if impfn(x, y) not in LFi:
unsat = True
break
if unsat:
continue
# Required Property: ∀x ∈ L, y ∈ U(x → y ∈ U)
for (x, y) in product(L, U):
if impfn(x, y) not in U:
unsat = True
break
if unsat:
continue
if negfn is not None:
for x in L:
# Required Property: ∀x(x ∈ L ⇒ ¬x ∈ U)
if negfn(x) not in U:
unsat = True
break
if unsat:
continue
for x in U:
# Required Property: ∀x(x ∈ U ⇒ ¬x ∈ L)
if negfn(x) not in L:
unsat = True
break
if unsat:
continue
# Passed all required properties
result.append((U, L))
return result
def has_svsp(model: Model, impfn: ModelFunction,
conjfn: Optional[ModelFunction],
disjfn: Optional[ModelFunction],
negfn: Optional[ModelFunction]) -> SVSP_Result:
"""
Checks whether a model has the signed
variable sharing property.
"""
# NOTE: No models with only one designated
# value satisfies SVSP
if len(model.designated_values) == 1:
return SVSP_Result(False, model.name)
F = model.carrier_set - model.designated_values
starops = [conjfn, disjfn]
K1K2s = find_k1_k2(model, impfn, negfn is not None)
ULs = find_candidate_u_l(model, impfn, negfn)
candidates = ((k1, k2, u, l) for (k1, k2), (u, l) in product(K1K2s, ULs))
for K1, K2, U, L in candidates:
unsat = False
K1Uu = K1 | U
K1Lu = K1 | L
K1LuFi = K1Lu.intersection(F) # (K1 L) ∩ F
K2Uu = K2 | U
K2Lu = K2 | L
K2LuFi = K2Lu.intersection(F) # (K2 L) ∩ F
# (6)
for x, y in product(K1, U):
# b) x → y ∈ K1 U
if impfn(x, y) not in K1Uu:
unsat = True
break
# c) y → x ∈ K1 L
if impfn(y, x) not in K1Lu:
unsat = True
break
# a) x y, y x, y z ∈ K1 U
for z in U:
for op in starops:
if op is not None:
if op(x, y) not in K1Uu:
unsat = True
break
if op(y, x) not in K1Uu:
unsat = True
break
if op(y, z) not in K1Uu:
unsat = True
break
if unsat:
break
if unsat:
# Verification for these set of matrices failed
break
if unsat:
# Move onto the next candidates K1, K2, U, L
continue
# (7)
for x, y in product(K1, L):
# b) x → y ∈ (K1 L) ∩ F
if impfn(x, y) not in K1LuFi:
unsat = True
break
# c) y → x ∈ K1 U
if impfn(y, x) not in K1Uu:
unsat = True
break
# a) x y, y x, y z ∈ K1 L
for z in L:
for op in starops:
if op is not None:
if op(x, y) not in K1Lu:
unsat = True
break
if op(y, x) not in K1Lu:
unsat = True
break
if op(y, z) not in K1Lu:
unsat = True
break
if unsat:
break
if unsat:
break
if unsat:
continue
# (8)
for x, y in product(K2, U):
# b) x → y ∈ K2 U
if impfn(x, y) not in K2Uu:
unsat = True
break
# c) y → x ∈ (K2 L) ∩ F
if impfn(y, x) not in K2LuFi:
unsat = True
break
# a) x y, y x, y z ∈ K2 U
for z in U:
for op in starops:
if op is not None:
if op(x, y) not in K2Uu:
unsat = True
break
if op(y, x) not in K2Uu:
unsat = True
break
if op(y, z) not in K2Uu:
unsat = True
break
if unsat:
break
if unsat:
break
if unsat:
continue
# (9)
for x, y in product(K2, L):
# b) x → y ∈ K2 L
if impfn(x, y) not in K2Lu:
unsat = True
break
# c) y → x ∈ K2 U
if impfn(y, x) not in K2Uu:
unsat = True
break
# a) x y, y x, y z ∈ K2 L
for z in L:
for op in starops:
if op is not None:
if op(x, y) not in K2Lu:
unsat = True
break
if op(y, x) not in K2Lu:
unsat = True
break
if op(y, z) not in K2Lu:
unsat = True
break
if unsat:
break
if unsat:
break
if not unsat:
return SVSP_Result(True, model.name, K1, K2, U, L)
return SVSP_Result(False, model.name)