diff --git a/svspursuer.py b/svspursuer.py new file mode 100755 index 0000000..3cbaa70 --- /dev/null +++ b/svspursuer.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +""" +Experimental Runner file to find the signed variable sharing property +""" + + +from datetime import datetime +from typing import Dict, Iterator, Optional, Tuple +from queue import Empty as QueueEmpty +import argparse +import multiprocessing as mp + +from logic import Negation, Implication, Operation, Conjunction, Disjunction +from model import Model, ModelFunction +from parse_magic import SourceFile, parse_matrices +from vsp import has_svsp, SVSP_Result + +def print_with_timestamp(message): + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{current_time}] {message}", flush=True) + +def restructure_solutions(solutions: Iterator[Tuple[Model, Dict[Operation, ModelFunction]]], skip_to: Optional[str]) -> \ + Iterator[Tuple[Model, ModelFunction, Optional[ModelFunction], Optional[ModelFunction], Optional[ModelFunction]]]: + """ + When subprocess gets spawned, the logical operations will + have a different memory address than what's expected in interpretation. + Therefore, we need to pass the model functions directly instead. + + While we're at it, filter out models until we get to --skip-to + if applicable. + """ + start_processing = skip_to is None + for model, interpretation in solutions: + # If skip_to is defined, then don't process models + # until then. + if not start_processing and model.name != skip_to: + continue + start_processing = True + + # NOTE: Implication must be defined, the rest may not + impfunction = interpretation[Implication] + conjfn = interpretation.get(Conjunction) + disjfn = interpretation.get(Disjunction) + negfn = interpretation.get(Negation) + yield (model, impfunction, conjfn, disjfn, negfn) + +def has_svsp_plus_model(model, impfn, conjfn, disjfn, negfn) -> Tuple[Optional[Model], SVSP_Result]: + """ + Wrapper which also stores the models along with its vsp result + """ + svsp_result = has_svsp(model, impfn, conjfn, disjfn, negfn) + # NOTE: Memory optimization - Don't return model if it doesn't have SVSP + model = model if svsp_result.has_svsp else None + return (model, svsp_result) + +def worker_vsp(task_queue: mp.Queue, result_queue: mp.Queue): + """ + Worker process which processes models from the task + queue and adds the result to the result_queue. + + Adds the sentinal value None when exception occurs and when there's + no more to process. + """ + try: + while True: + task = task_queue.get() + # If sentinal value, break + if task is None: + break + (model, impfn, conjfn, disjfn, negfn) = task + result = has_svsp_plus_model(model, impfn, conjfn, disjfn, negfn) + result_queue.put(result) + finally: + # Either an exception occured or the worker finished + # Push sentinal value + result_queue.put(None) + +def worker_parser(data_file_path: str, num_sentinal_values: int, task_queue: mp.Queue, skip_to: Optional[str]): + """ + Function which parses the MaGIC file + and adds models to the task_queue. + + Intended to be deployed with a dedicated process. + """ + try: + data_file = open(data_file_path, "r") + solutions = parse_matrices(SourceFile(data_file)) + solutions = restructure_solutions(solutions, skip_to) + + while True: + try: + item = next(solutions) + task_queue.put(item) + except StopIteration: + break + finally: + data_file.close() + for _ in range(num_sentinal_values): + task_queue.put(None) + +def multi_process_runner(num_cpu: int, data_file_path: str, skip_to: Optional[str]): + """ + Run SVSPursuer in a multi-process configuration. + """ + assert num_cpu > 1 + + num_tested = 0 + num_has_svsp = 0 + num_workers = num_cpu - 1 + + # Create queues + task_queue = mp.Queue(maxsize=1000) + result_queue = mp.Queue() + + # Create dedicated process to parse the MaGIC file + process_parser = mp.Process(target=worker_parser, args=(data_file_path, num_workers, task_queue, skip_to)) + process_parser.start() + + # Create dedicated processes which check SVSP + process_workers = [] + for _ in range(num_workers): + p = mp.Process(target=worker_vsp, args=(task_queue, result_queue)) + process_workers.append(p) + p.start() + + + # Check results and add new tasks until finished + result_sentinal_count = 0 + while True: + # Read a result + try: + result = result_queue.get(True, 60) + except QueueEmpty: + if all((not p.is_alive() for p in process_workers)): + # All workers finished without us receiving all the + # sentinal values. + break + + task_queue_size = 0 + try: + task_queue_size = task_queue.qsize() + except NotImplementedError: + # MacOS doesn't implement this + pass + + if task_queue_size == 0 and not process_parser.is_alive(): + # For Linux/Windows this means that the process_parser + # died before sending the sentinal values. + # For Mac, this doesn't guarentee anything but might + # as well push more sentinal values. + for _ in range(num_workers): + task_queue.put(None) + + # Don't do anymore work, wait again for a result + continue + + + # When we receive None, it means a child process has finished + if result is None: + result_sentinal_count += 1 + # If all workers have finished break + if result_sentinal_count == len(process_workers): + break + continue + + # Process result + model, vsp_result = result + print_with_timestamp(vsp_result) + num_tested += 1 + + if vsp_result.has_svsp: + print(model) + + if vsp_result.has_svsp: + num_has_svsp += 1 + + print_with_timestamp(f"Tested {num_tested} models, {num_has_svsp} of which satisfy SVSP") + +def single_process_runner(data_file_path: str, skip_to: Optional[str]): + num_tested = 0 + num_has_svsp = 0 + + data_file = open(data_file_path, "r") + solutions = parse_matrices(SourceFile(data_file)) + solutions = restructure_solutions(solutions, skip_to) + + for model, impfn, conjfn, disjfn, negfn in solutions: + model, svsp_result = has_svsp_plus_model(model, impfn, conjfn, disjfn, negfn) + print_with_timestamp(svsp_result) + num_tested += 1 + + if svsp_result.has_svsp: + print(model) + + if svsp_result.has_svsp: + num_has_svsp += 1 + + print_with_timestamp(f"Tested {num_tested} models, {num_has_svsp} of which satisfy SVSP") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="SVSP Checker") + parser.add_argument("--verbose", action='store_true', help="Print out all parsed matrices") + parser.add_argument("-i", type=str, help="Path to MaGIC ugly data file") + parser.add_argument("-c", type=int, help="Number of CPUs to use. Default: 1") + parser.add_argument("--skip-to", type=str, help="Skip until a model name is found and process from then onwards.") + args = vars(parser.parse_args()) + + data_file_path = args.get("i") + if data_file_path is None: + data_file_path = input("Path to MaGIC Ugly Data File: ") + + num_cpu = args.get("c") + if num_cpu is None: + num_cpu = 1 + + if num_cpu == 1: + single_process_runner(data_file_path, args.get("skip_to")) + else: + multi_process_runner(num_cpu, data_file_path, args.get("skip_to")) diff --git a/vsp.py b/vsp.py index 034005f..8a5c87c 100644 --- a/vsp.py +++ b/vsp.py @@ -2,7 +2,7 @@ Check to see if the model has the variable sharing property. """ -from itertools import product +from itertools import product, chain, combinations from typing import List, Optional, Set, Tuple from common import set_to_str from model import ( @@ -121,3 +121,345 @@ def has_vsp(model: Model, impfunction: ModelFunction, return VSP_Result(True, model.name, carrier_set_left, carrier_set_right) return VSP_Result(False, model.name) + + +class SVSP_Result: + def __init__( + self, has_svsp: bool, model_name: Optional[str] = None, + subalgebra1: Optional[Set[ModelValue]] = None, + subalgebra2: Optional[Set[ModelValue]] = None, + U: Optional[Set[ModelValue]] = None, + L: Optional[Set[ModelValue]] = None): + self.has_svsp = has_svsp + self.model_name = model_name + self.subalgebra1 = subalgebra1 + self.subalgebra2 = subalgebra2 + self.U = U + self.L = L + + def __str__(self): + if not self.has_svsp: + return f"Model {self.model_name} does not have the signed variable sharing property." + return f"""Model {self.model_name} has the signed variable sharing property. +Subalgebra 1: {set_to_str(self.subalgebra1)} +Subalgebra 2: {set_to_str(self.subalgebra2)} +U: {set_to_str(self.U)} +L: {set_to_str(self.L)} +""" + +def powerset_minus_empty(s): + return chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1)) + +def find_k1_k2(model, impfunction: ModelFunction, + negation_defined: bool) -> List[Tuple[Set[ModelValue], Set[ModelValue]]]: + """ + Returns a list of possible subalgebra pairs (K1, K2) + """ + assert model.ordering is not None, "Expected ordering table in model" + + result = [] + top = model.ordering.top() + bottom = model.ordering.bottom() + + # Compute I the set of tuples (x, y) where + # x -> y does not take a designiated value + I: List[Tuple[ModelValue, ModelValue]] = [] + + for (x, y) in product(model.designated_values, model.designated_values): + if impfunction(x, y) not in model.designated_values: + I.append((x, y)) + + # Find the subalgebras which falsify implication + for xys in I: + + xi = xys[0] + + # Discard ({⊥} ∪ A', B) subalgebras + if bottom is not None and xi == bottom: + continue + + # Discard ({⊤} ∪ A', B) subalgebras when negation is defined + if top is not None and negation_defined and xi == top: + continue + + yi = xys[1] + + # Discard (A, {⊤} ∪ B') subalgebras + if top is not None and yi == top: + continue + + # Discard (A, {⊥} ∪ B') subalgebras when negation is defined + if bottom is not None and negation_defined and yi == bottom: + continue + + # Discard ({a} ∪ A', {b} ∪ B') subalgebras when a <= b + if model.ordering.is_lt(xi, yi): + continue + + # Discard ({a} ∪ A', {b} ∪ B') subalgebras when b <= a and negation is defined + if negation_defined and model.ordering.is_lt(yi, xi): + continue + + # Compute the left closure of the set containing xi under all the operations + carrier_set_left: Set[ModelValue] = model_closure({xi,}, model.logical_operations, bottom) + + # Discard ({⊥} ∪ A', B) subalgebras + if bottom is not None and bottom in carrier_set_left: + continue + + # Discard ({⊤} ∪ A', B) subalgebras when negation is defined + if top is not None and negation_defined and top in carrier_set_left: + continue + + # Compute the closure of all operations + # with just the ys + carrier_set_right: Set[ModelValue] = model_closure({yi,}, model.logical_operations, top) + + # Discard (A, {⊤} ∪ B') subalgebras + if top is not None and top in carrier_set_right: + continue + + # Discard (A, {⊥} ∪ B') subalgebras when negation is defined + if bottom is not None and negation_defined and bottom in carrier_set_right: + continue + + # Discard subalgebras that intersect + if not carrier_set_left.isdisjoint(carrier_set_right): + continue + + # Check whether for all pairs in the subalgebra, + # that implication is falsified. + falsified = True + for (x2, y2) in product(carrier_set_left, carrier_set_right): + if impfunction(x2, y2) in model.designated_values: + falsified = False + break + + if falsified: + result.append((carrier_set_left, carrier_set_right)) + + return result + +def find_candidate_u_l(model: Model, impfn: ModelFunction, negfn: Optional[ModelFunction]) -> List[Tuple[Set[ModelValue], Set[ModelValue]]]: + result: List[Tuple[Set[ModelValue], Set[ModelValue]]] = [] + F = model.carrier_set - model.designated_values + Us = powerset_minus_empty(model.carrier_set) + Ls = powerset_minus_empty(model.carrier_set) + for (U, L) in product(Us, Ls): + unsat = False + U = set(U) + L = set(L) + LFi = F.intersection(L) + # Required property: ∀x ∈ U, y ∈ L(x → y ∈ L ∩ F) + for (x, y) in product(U, L): + if impfn(x, y) not in LFi: + unsat = True + break + + if unsat: + continue + + # Required Property: ∀x ∈ L, y ∈ U(x → y ∈ U) + for (x, y) in product(L, U): + if impfn(x, y) not in U: + unsat = True + break + + if unsat: + continue + + if negfn is not None: + for x in L: + # Required Property: ∀x(x ∈ L ⇒ ¬x ∈ U) + if negfn(x) not in U: + unsat = True + break + + if unsat: + continue + + for x in U: + # Required Property: ∀x(x ∈ U ⇒ ¬x ∈ L) + if negfn(x) not in L: + unsat = True + break + + if unsat: + continue + + # Passed all required properties + result.append((U, L)) + + return result + +def has_svsp(model: Model, impfn: ModelFunction, + conjfn: Optional[ModelFunction], + disjfn: Optional[ModelFunction], + negfn: Optional[ModelFunction]) -> SVSP_Result: + """ + Checks whether a model has the signed + variable sharing property. + """ + # NOTE: No models with only one designated + # value satisfies SVSP + if len(model.designated_values) == 1: + return SVSP_Result(False, model.name) + + F = model.carrier_set - model.designated_values + starops = [conjfn, disjfn] + + K1K2s = find_k1_k2(model, impfn, negfn is not None) + ULs = find_candidate_u_l(model, impfn, negfn) + + candidates = ((k1, k2, u, l) for (k1, k2), (u, l) in product(K1K2s, ULs)) + for K1, K2, U, L in candidates: + unsat = False + K1Uu = K1 | U + K1Lu = K1 | L + K1LuFi = K1Lu.intersection(F) # (K1 ∪ L) ∩ F + K2Uu = K2 | U + K2Lu = K2 | L + K2LuFi = K2Lu.intersection(F) # (K2 ∪ L) ∩ F + + # (6) + for x, y in product(K1, U): + # b) x → y ∈ K1 ∪ U + if impfn(x, y) not in K1Uu: + unsat = True + break + + # c) y → x ∈ K1 ∪ L + if impfn(y, x) not in K1Lu: + unsat = True + break + + # a) x ∗ y, y ∗ x, y ∗ z ∈ K1 ∪ U + for z in U: + for op in starops: + if op is not None: + if op(x, y) not in K1Uu: + unsat = True + break + if op(y, x) not in K1Uu: + unsat = True + break + if op(y, z) not in K1Uu: + unsat = True + break + if unsat: + break + + if unsat: + # Verification for these set of matrices failed + break + + if unsat: + # Move onto the next candidates K1, K2, U, L + continue + + # (7) + for x, y in product(K1, L): + # b) x → y ∈ (K1 ∪ L) ∩ F + if impfn(x, y) not in K1LuFi: + unsat = True + break + + # c) y → x ∈ K1 ∪ U + if impfn(y, x) not in K1Uu: + unsat = True + break + + # a) x ∗ y, y ∗ x, y ∗ z ∈ K1 ∪ L + for z in L: + for op in starops: + if op is not None: + if op(x, y) not in K1Lu: + unsat = True + break + + if op(y, x) not in K1Lu: + unsat = True + break + + if op(y, z) not in K1Lu: + unsat = True + break + if unsat: + break + + if unsat: + break + + if unsat: + continue + + # (8) + for x, y in product(K2, U): + # b) x → y ∈ K2 ∪ U + if impfn(x, y) not in K2Uu: + unsat = True + break + + # c) y → x ∈ (K2 ∪ L) ∩ F + if impfn(y, x) not in K2LuFi: + unsat = True + break + + # a) x ∗ y, y ∗ x, y ∗ z ∈ K2 ∪ U + for z in U: + for op in starops: + if op is not None: + if op(x, y) not in K2Uu: + unsat = True + break + if op(y, x) not in K2Uu: + unsat = True + break + if op(y, z) not in K2Uu: + unsat = True + break + if unsat: + break + + if unsat: + break + + if unsat: + continue + + # (9) + for x, y in product(K2, L): + # b) x → y ∈ K2 ∪ L + if impfn(x, y) not in K2Lu: + unsat = True + break + + # c) y → x ∈ K2 ∪ U + if impfn(y, x) not in K2Uu: + unsat = True + break + + # a) x ∗ y, y ∗ x, y ∗ z ∈ K2 ∪ L + for z in L: + for op in starops: + if op is not None: + if op(x, y) not in K2Lu: + unsat = True + break + if op(y, x) not in K2Lu: + unsat = True + break + if op(y, z) not in K2Lu: + unsat = True + break + if unsat: + break + + if unsat: + break + + + if not unsat: + return SVSP_Result(True, model.name, K1, K2, U, L) + + return SVSP_Result(False, model.name)