Source code for mpbn

"""
This module provides a simple implementation of Most Permissive Boolean Networks
(MPBNs) for computing reachability properties, attractors, reachable attractors.
Attractors of MPBNs are the *minimal* trap spaces of the underlying Boolean maps.
See http://dx.doi.org/10.1101/2020.03.22.998377 and https://arxiv.org/abs/1808.10240 for technical details.

It relies on clingo Answer-Set Programming solver
(https://github.com/potassco/clingo).

Examples are available at https://nbviewer.jupyter.org/github/pauleve/mpbn/tree/master/examples/

Quick example:

>>> mbn = mpbn.MPBooleanNetwork({
        "a": "!b",
        "b": "!a",
        "c": "!a & b"})
>>> list(mbn.attractors()) # minimal trap spaces
[{'a': 0, 'b': 1, 'c': 1}, {'a': 1, 'b': 0, 'c': 0}]
>>> mbn.reachability({'a': 0, 'b': 1, 'c': 1}, {'a': 1, 'b': 0, 'c': 0})
False
>>> mbn.reachability({'a': 0, 'b': 0, 'c': 0}, {'a': 1, 'b': 1, 'c': 1})
True
>>> list(mbn.attractors(reachable_from={'a': 0, 'b': 1, 'c': 0}))
[{'a': 0, 'b': 1, 'c': 1}]
"""

import os
import sys
from colomoto import minibn

from boolean import boolean
import clingo

from pyeda.boolalg import bdd
import pyeda.boolalg.expr
from pyeda.boolalg.expr import expr
sys.setrecursionlimit(max(100000, sys.getrecursionlimit()))

__asplibdir__ = os.path.realpath(os.path.join(os.path.dirname(__file__), "asplib"))

clingo_options = ["-W", "no-atom-undefined"]
if hasattr(clingo, "version") and clingo.version() >= (5,5,0):
    clingo_options.append("--single-shot")

def aspf(basename):
    return os.path.join(__asplibdir__, basename)

def clingo_subsets(limit=0):
    s = clingo.Control(clingo_options)
    s.configuration.solve.models = limit
    s.configuration.solve.project = 1
    s.configuration.solve.enum_mode = "domRec"
    s.configuration.solver[0].heuristic = "Domain"
    s.configuration.solver[0].dom_mod = "5,16"
    return s

def clingo_supsets(limit=0):
    s = clingo.Control(clingo_options)
    s.configuration.solve.models = limit
    s.configuration.solve.project = 1
    s.configuration.solve.enum_mode = "domRec"
    s.configuration.solver[0].heuristic = "Domain"
    s.configuration.solver[0].dom_mod = "3,16"
    return s

def clingo_exists():
    s = clingo.Control(clingo_options)
    s.configuration.solve.models = 1
    return s

def clingo_enum(project=True, limit=0):
    s = clingo.Control(clingo_options)
    if project:
        s.configuration.solve.project = 1
    s.configuration.solve.models = limit
    return s

def s2v(s):
    return 1 if s > 0 else -1
def v2s(v):
    return 1 if v > 0 else 0

def is_unate(ba, f):
    pos_lits = set()
    neg_lits = set()
    def is_lit(f):
        if isinstance(f, ba.Symbol):
            pos_lits.add(f.obj)
            return True
        if isinstance(f, ba.NOT) \
                and isinstance(f.args[0], ba.Symbol):
            neg_lits.add(f.args[0].obj)
            return True
        return False

    def is_clause(f):
        if is_lit(f):
            return True
        if isinstance(f, ba.AND):
            for g in f.args:
                if not is_lit(g):
                    return False
            return True
        return False

    def test_monotonicity():
        both = pos_lits.intersection(neg_lits)
        return not both

    if f in [ba.TRUE, ba.FALSE]:
        return True
    if is_clause(f):
        return test_monotonicity()
    if isinstance(f, ba.OR):
        for g in f.args:
            if not is_clause(g):
                return False
        return test_monotonicity()
    return False

def asp_of_bdd(bid, b):
    _rules = dict()
    def register(node, nid=None):
        if node is bdd.BDDNODEONE:
            if nid is not None:
                _rules[bid] = f"bdd({clingo.String(nid)},1)"
            return 1
        elif node is bdd.BDDNODEZERO:
            if nid is not None:
                _rules[bid] = f"bdd({clingo.String(nid)},-1)"
            return -1
        nid = clingo.String(f"{bid}_n{id(node)}" if nid is None else nid)
        if nid not in _rules:
            var = clingo.String(bdd._VARS[node.root].qualname)
            lo = register(node.lo)
            hi = register(node.hi)
            a = f"bdd({nid},{var},{lo},{hi})"
            _rules[nid] = a
        return nid
    register(b.node, bid)
    return _rules.values()

def bddasp_of_boolfunc(f, i):
    e = expr(str(f).replace("!","~"))
    b = bdd.expr2bdd(e)
    atoms = asp_of_bdd(i, b)
    return "\n".join((f"{a}." for a in atoms))

def circuitasp_of_boolfunc(f, i, ba):
    atoms = []
    fid = clingo.String(i)
    def encode(expr):
        if expr == ba.TRUE:
            nodeid = "(constant,1)"
            atoms.append(f"circuit({nodeid}).")
        elif expr == ba.FALSE:
            nodeid = "(constant,-1)"
            atoms.append(f"circuit({nodeid}).")
        elif isinstance(expr, ba.Symbol):
            nodeid = f"(var,{clingo.String(expr.obj)})"
            atoms.append(f"circuit({fid},{nodeid}).")
        else:
            nodeid = f"n{id(expr)}"
            if isinstance(expr, ba.NOT):
                nodetype = "neg"
            elif isinstance(expr, ba.AND):
                nodetype = "and"
            elif isinstance(expr, ba.OR):
                nodetype = "or"
            else:
                raise NotImplementedError(type(expr))
            atoms.append(f"circuit({fid},{nodeid},{nodetype}).")
            for child in expr.args:
                cid = encode(child)
                atoms.append(f"circuitedge({fid},{nodeid},{cid}).")
        return nodeid
    root = encode(f)
    atoms.append(f"circuit({fid},root,{root}).\n")
    return "\n".join(atoms)


def expr2bpy(ex, ba):
    """
    converts a pyeda Boolean expression into a boolean.py one
    """
    if isinstance(ex, pyeda.boolalg.expr.Variable):
        return ba.Symbol(str(ex))
    elif isinstance(ex, pyeda.boolalg.expr._One):
        return ba.TRUE
    elif isinstance(ex, pyeda.boolalg.expr._Zero):
        return ba.FALSE
    elif isinstance(ex, pyeda.boolalg.expr.Complement):
        return ba.NOT(ba.Symbol(str(ex.__invert__())))
    elif isinstance(ex, pyeda.boolalg.expr.NotOp):
        return ba.NOT(expr2bpy(ex.x, ba))
    elif isinstance(ex, pyeda.boolalg.expr.OrOp):
        return ba.OR(*(expr2bpy(x, ba) for x in ex.xs))
    elif isinstance(ex, pyeda.boolalg.expr.AndOp):
        return ba.AND(*(expr2bpy(x, ba) for x in ex.xs))
    raise NotImplementedError(str(ex), type(ex))

DEFAULT_ENCODING = "mixed-dnf-bdd"

[docs]class MPBooleanNetwork(minibn.BooleanNetwork): """ Most Permissive Boolean Network Extends ``colomoto.minibn.BooleanNetwork`` class by adding methods for computing reachable and attractor properties with the Most Permissive update mode. """ supported_encodings = [ "unate-dnf", "bdd", "circuit", "dnf-bdd", "mixed-dnf-bdd", "force-unate-dnf"] dnf_encodings = ["dnf-bdd", "unate-dnf", "force-unate-dnf", "mixed-dnf-bdd"] nonpc_encodings = ["circuit"] def __init__(self, bn=minibn.BooleanNetwork(), auto_dnf=True, simplify=False, try_unate_hard=False, encoding=DEFAULT_ENCODING): """ Constructor for :py:class:`.MPBoooleanNetwork`. :param bn: Boolean network to copy from :type bn: :py:class:`colomoto.minibn.BooleanNetwork` or any type accepted by :py:class:`colomoto.minibn.BooleanNetwork` constructor :param bool auto_dnf: if ``False``, turns off automatic DNF transformation of local functions Examples: >>> mbn = MPBooleanNetwork("network.bnet") >>> bn = BooleanNetwork() >>> bn["a"] = ".."; ... >>> mbn = MPBooleanNetwork(bn) """ assert encoding in self.supported_encodings self.auto_dnf = auto_dnf and encoding in self.dnf_encodings self.encoding = encoding self.try_unate_hard = try_unate_hard self._simplify = simplify self._is_unate = dict() super(MPBooleanNetwork, self).__init__(bn) def __setitem__(self, a, f): """ Assigns the Boolean function ``f`` to component ``a``. Unless :py:attr:`.auto_dnf` is ``False``, ``f`` is converted into DNF form first. """ if isinstance(f, str): f = self.ba.parse(f) f = self._autobool(f) if self.auto_dnf: e = expr(str(f).replace("!","~")) e = e.to_dnf() if self._simplify is not None: e = e.simplify() f = expr2bpy(e, self.ba) if self.try_unate_hard: f = minibn.simplify_dnf(self.ba, f) elif self._simplify: f = f.simplify() a = self._autokey(a) if self.encoding in self.dnf_encodings: self._is_unate[a] = is_unate(self.ba, f) if self.encoding == "unate-dnf": assert self._is_unate[a], f"'{f}' seems not unate. Try simplify()?" return super().__setitem__(a, f) def asp_of_bn(self, encoding=None): if encoding is None: encoding = self.encoding def clauses_of_dnf(f): if isinstance(f, boolean.OR): return f.args else: return [f] def literals_of_clause(c): def make_literal(l): if isinstance(l, boolean.NOT): return (l.args[0].obj, -1) else: return (l.obj, 1) lits = c.args if isinstance(c, boolean.AND) else [c] return map(make_literal, lits) def encode_dnf(f): facts = [] for cid, c in enumerate(clauses_of_dnf(f)): for m, v in literals_of_clause(c): facts.append(" clause(\"{}\",{},\"{}\",{}).".format(n, cid, m, v)) return facts facts = [] for n, f in self.items(): facts.append("node(\"{}\").".format(n)) if encoding in ["unate-dnf", "force-unate-dnf"]: f_encoding = "dnf" elif encoding == "dnf-bdd": f_encoding = "dnf" if self._is_unate[n] else "bdd" else: f_encoding = encoding if f == self.ba.FALSE: f = False elif f == self.ba.TRUE: f = True if isinstance(f, bool): facts.append("constant(\"{}\",{}).".format(n, s2v(f))) elif f_encoding == "dnf": facts.extend(encode_dnf(f)) elif f_encoding == "bdd": facts.append(bddasp_of_boolfunc(f, n)) elif f_encoding == "mixed-dnf-bdd": facts.extend(encode_dnf(f)) if self._is_unate[n]: facts.append(f"unate(\"{n}\").") else: facts.append(bddasp_of_boolfunc(f, n)) elif f_encoding == "circuit": facts.append(circuitasp_of_boolfunc(f, n, self.ba)) return "".join(facts) def load_eval(self, solver): if self.encoding == "circuit": solver.load(aspf("eval_circuit.asp")) elif self.encoding == "mixed-dnf-bdd": solver.load(aspf("eval_mixed.asp")) else: solver.load(aspf("mp_eval.asp")) def assert_pc_encoding(self): assert self.encoding not in self.nonpc_encodings, "Unsupported encoding" def asp_of_cfg(self, e, t, c): facts = ["timepoint({},{}).".format(e,t)] facts += [" mp_state({},{},\"{}\",{}).".format(e,t,n,s2v(s)) for (n,s) in c.items()] facts += [" 1 {{mp_state({},{},\"{}\",(-1;1))}} 1.".format(e,t,n) for n in self if n not in c] return "".join(facts)
[docs] def reachability(self, x, y): """ Returns ``True`` whenever the configuration `y` is reachable from `x` with the Most Permissive update mode. Configurations can be partially defined. In that case, returns ``True`` whenever there exists a configuration matching with `y` which is reachable with at least one configuration matching with `x` :param dict[str,int] x: initial configuration :param dict[str,int] y: target configuration """ self.assert_pc_encoding() s = clingo_exists() self.load_eval(s) s.load(aspf("mp_positivereach-np.asp")) s.add("base", [], self.asp_of_bn()) e = "default" t1 = 0 t2 = 1 s.add("base", [], self.asp_of_cfg(e,t1,x)) s.add("base", [], self.asp_of_cfg(e,t2,y)) s.add("base", [], "is_reachable({},{},{}).".format(e,t1,t2)) s.ground([("base",[])]) res = s.solve() return res.satisfiable
[docs] def fixedpoints(self, reachable_from=None, constraints={}, limit=0): """ Iterator over fixed points of the MPBN (i.e., of f) :param dict[str,int] reachable_from: restrict to the attractors reachable from the given configuration. Whenever partial, restrict attractors to the one reachable by at least one matching configuration. :param dict[str,int] constraints: consider only attractors matching with the given constraints. :param int limit: maximum number of solutions, ``0`` for unlimited. """ s = clingo_enum(limit=limit) s.add("base", [], self.asp_of_bn()) e = "fp" t2 = "fp" self.load_eval(s) s.add("base", [], self.asp_of_cfg(e, t2, constraints)) s.add("base", [], f"mp_reach({e},{t2},N,V) :- mp_state({e},{t2},N,V).") s.add("base", [], f":- mp_state({e},{t2},N,V), mp_eval({e},{t2},N,-V).") if reachable_from: self.assert_pc_encoding() t1 = "0" s.load(aspf("mp_positivereach-np.asp")) s.add("base", [], self.asp_of_cfg(e,t1,reachable_from)) s.add("base", [], "is_reachable({},{},{}).".format(e,t1,t2)) s.add("base", [], f"#show. #show fixpoint(N,V) : mp_state({e},{t2},N,V).") s.ground([("base",[])]) for sol in s.solve(yield_=True): x = {n: None for n in self} data = sol.symbols(shown=True) for d in data: if d.name != "fixpoint": continue (n, v) = d.arguments n = n.string v = v.number v = 1 if v == 1 else 0 x[n] = v yield x
def _trapspaces(self, reachable_from=None, subcube={}, limit=0, star="*", mode="min", exclude_full=False): self.assert_pc_encoding() solver = clingo_subsets if mode == "min" else clingo_supsets s = solver(limit=limit) self.load_eval(s) s.load(aspf("mp_attractor.asp")) s.add("base", [], self.asp_of_bn()) e = "__a" t2 = "final" if exclude_full and not subcube: s.add("base", [], f"{{ mp_reach({e},{t2},N,(-1;1)): node(N) }} {len(self)*2-1}.") if reachable_from: t1 = "0" s.load(aspf("mp_positivereach-np.asp")) s.add("base", [], self.asp_of_cfg(e,t1,reachable_from)) s.add("base", [], "is_reachable({},{},{}).".format(e,t1,t2)) s.add("base", [], "mp_state({},{},N,V) :- attractor(N,V).".format(e,t2)) for n, b in subcube.items(): if isinstance(b, str): b = int(b) if b not in [0,1]: continue s.add("base", [], ":- mp_reach({},{},\"{}\",{}).".format(e,t2,n,s2v(1-b))) s.add("base", [], "#show attractor/2.") s.ground([("base",[])]) for sol in s.solve(yield_=True): attractor = {n: None for n in self} data = sol.symbols(shown=True) for d in data: if d.name != "attractor": continue (n, v) = d.arguments n = n.string v = v.number if v == 2: v = star else: v = 1 if v == 1 else 0 if attractor[n] is not None: if star is not None: attractor[n] = star else: del attractor[n] else: attractor[n] = v yield attractor
[docs] def attractors(self, reachable_from=None, constraints={}, limit=0, star='*'): """ Iterator over attractors of the MPBN (minimal trap spaces of the BN). An attractor is an hypercube, represented by a dictionnary mapping every component of the network to either ``0``, ``1``, or ``star``. :param dict[str,int] reachable_from: restrict to the attractors reachable from the given configuration. Whenever partial, restrict attractors to the one reachable by at least one matching configuration. :param dict[str,int] constraints: consider only attractors matching with the given constraints. :param int limit: maximum number of solutions, ``0`` for unlimited. :param str star: value to use for components which are free in the attractor """ return self._trapspaces(reachable_from=reachable_from, subcube=constraints, limit=limit, star=star, mode="min")
minimal_trapspaces = attractors def maximal_trapspaces(self, limit=0, subcube={}, star="*", exclude_full=True): return self._trapspaces(subcube=subcube, limit=limit, star=star, mode="max", exclude_full=exclude_full) def has_cyclic_attractor(self): for a in self.attractors(): if "*" in a.values(): return True return False
[docs] def reachable_from(self, x, reversed=False): """ Returns an iterator over the configurations reachable from ``x`` with the Most Permissive update mode. Configuration ``x`` can be partially defined: in that case a configuration is yielded whnever it is reachable from at least one configuration matching with ``x``. Whenever ``reversed`` is ``True``, yields over the configurations that can reach `x` instead. """ self.assert_pc_encoding() s = clingo_enum() self.load_eval(s) s.load(aspf("mp_positivereach-np.asp")) s.add("base", [], self.asp_of_bn()) e = "default" t1 = 0 t2 = 1 s.add("base", [], self.asp_of_cfg(e,t1,x if not reversed else {})) s.add("base", [], self.asp_of_cfg(e,t2,{} if not reversed else x)) s.add("base", [], "is_reachable({},{},{}).".format(e,t1,t2)) t = t2 if not reversed else t1 s.add("base", [], "#show." \ f"#show mp_state(E,T,N,V) : mp_state(E,T,N,V), E={e}, T={t}.") s.ground([("base",[])]) def cfg_of_asp(atoms): return {a.arguments[2].string: v2s(a.arguments[3].number) for a in atoms} for sol in s.solve(yield_=True): data = sol.symbols(shown=True) yield cfg_of_asp(data)
[docs] def dynamics(self, update_mode="mp", **kwargs): """ Returns a :py:class:`networkx.DiGraph` object representing the transitions between the configurations using the Most Permissive update mode by default. See :py:meth:`colomoto.minibn.BooleanNetwork.dynamics`. """ if update_mode in ["mp", "most-permissive"]: update_mode = MostPermissiveDynamics return super().dynamics(update_mode=update_mode, **kwargs)
[docs]def load(filename, **opts): """ Create a :py:class:`.MPBooleanNetwork` object from ``filename`` in BoolNet format; ``filename`` can be a local file or an URL. """ return MPBooleanNetwork.load(filename, **opts)
[docs]class MostPermissiveDynamics(minibn.UpdateModeDynamics): def __init__(self, model, **opts): if not isinstance(model, MPBooleanNetwork)\ and isinstance(model, minibn.BooleanNetwork): model = MPBooleanNetwork(model) super().__init__(model, **opts) def __call__(self, x): return self.model.reachable_from(x)
__all__ = ["load", "MPBooleanNetwork", "MostPermissiveDynamics"]