"""
This module provides a simple implementation of Most Permissive Boolean Networks
(MPBNs) for computing reachability properties, attractors, reachable attractors.
Attractors of MPBNs are the *minimal* trap spaces of the underlying Boolean maps.
See http://dx.doi.org/10.1101/2020.03.22.998377 and https://arxiv.org/abs/1808.10240 for technical details.
It relies on clingo Answer-Set Programming solver
(https://github.com/potassco/clingo).
Examples are available at https://nbviewer.jupyter.org/github/pauleve/mpbn/tree/master/examples/
Quick example:
>>> mbn = mpbn.MPBooleanNetwork({
"a": "!b",
"b": "!a",
"c": "!a & b"})
>>> list(mbn.attractors()) # minimal trap spaces
[{'a': 0, 'b': 1, 'c': 1}, {'a': 1, 'b': 0, 'c': 0}]
>>> mbn.reachability({'a': 0, 'b': 1, 'c': 1}, {'a': 1, 'b': 0, 'c': 0})
False
>>> mbn.reachability({'a': 0, 'b': 0, 'c': 0}, {'a': 1, 'b': 1, 'c': 1})
True
>>> list(mbn.attractors(reachable_from={'a': 0, 'b': 1, 'c': 0}))
[{'a': 0, 'b': 1, 'c': 1}]
"""
import os
import sys
from colomoto import minibn
from boolean import boolean
import clingo
sys.setrecursionlimit(max(100000, sys.getrecursionlimit()))
__asplibdir__ = os.path.realpath(os.path.join(os.path.dirname(__file__), "asplib"))
clingo_options = ["-W", "no-atom-undefined"]
if hasattr(clingo, "version") and clingo.version() >= (5,5,0):
clingo_options.append("--single-shot")
def aspf(basename):
return os.path.join(__asplibdir__, basename)
def _clingo_domrec(mod, limit=0, project=False, extra_opts=[]):
s = clingo.Control(clingo_options + extra_opts)
s.configuration.solve.models = limit
if project:
s.configuration.solve.project = 1
s.configuration.solve.enum_mode = "domRec"
s.configuration.solver[0].heuristic = "Domain"
s.configuration.solver[0].dom_mod = f"{mod},{16 if project else 0}"
return s
def clingo_subsets(**opts):
return _clingo_domrec(5, **opts)
def clingo_supsets(**opts):
return _clingo_domrec(3, **opts)
def clingo_exists():
s = clingo.Control(clingo_options)
s.configuration.solve.models = 1
return s
def clingo_enum(project=True, limit=0):
s = clingo.Control(clingo_options)
if project:
s.configuration.solve.project = 1
s.configuration.solve.models = limit
return s
def s2v(s):
return 1 if s > 0 else -1
def v2s(v):
return 1 if v > 0 else 0
def is_dnf_unate(ba, f):
pos_lits = set()
neg_lits = set()
def is_lit(f):
if isinstance(f, ba.Symbol):
pos_lits.add(f.obj)
return True
if isinstance(f, ba.NOT) \
and isinstance(f.args[0], ba.Symbol):
neg_lits.add(f.args[0].obj)
return True
return False
def is_clause(f):
if is_lit(f):
return True
if isinstance(f, ba.AND):
for g in f.args:
if not is_lit(g):
return False
return True
return False
def test_monotonicity():
both = pos_lits.intersection(neg_lits)
return not both
if f in [ba.TRUE, ba.FALSE]:
return True
if is_clause(f):
return test_monotonicity()
if isinstance(f, ba.OR):
for g in f.args:
if not is_clause(g):
return False
return test_monotonicity()
return False
def circuitasp_of_boolfunc(f, i, ba):
atoms = []
fid = clingo.String(i)
def encode(expr):
if expr == ba.TRUE:
nodeid = "(constant,1)"
atoms.append(f"circuit({nodeid}).")
elif expr == ba.FALSE:
nodeid = "(constant,-1)"
atoms.append(f"circuit({nodeid}).")
elif isinstance(expr, ba.Symbol):
nodeid = f"(var,{clingo.String(expr.obj)})"
atoms.append(f"circuit({fid},{nodeid}).")
else:
nodeid = f"n{id(expr)}"
if isinstance(expr, ba.NOT):
nodetype = "neg"
elif isinstance(expr, ba.AND):
nodetype = "and"
elif isinstance(expr, ba.OR):
nodetype = "or"
else:
raise NotImplementedError(type(expr))
atoms.append(f"circuit({fid},{nodeid},{nodetype}).")
for child in expr.args:
cid = encode(child)
atoms.append(f"circuitedge({fid},{nodeid},{cid}).")
return nodeid
root = encode(f)
atoms.append(f"circuit({fid},root,{root}).\n")
return "\n".join(atoms)
DEFAULT_ENCODING = "mixed-dnf-bdd"
DEFAULT_BOOLFUNCLIB = os.environ.get("MPBN_BOOLFUNCLIB", "aeon")
SUPPORTED_BOOLFUNCLIBS = ["aeon", "pyeda"]
[docs]
class MPBooleanNetwork(minibn.BooleanNetwork):
"""
Most Permissive Boolean Network
Extends ``colomoto.minibn.BooleanNetwork`` class by adding methods for
computing reachable and attractor properties with the Most Permissive
update mode.
"""
supported_encodings = [
"unate-dnf", "bdd", "circuit",
"dnf-bdd", "mixed-dnf-bdd",
"force-unate-dnf"]
dnf_encodings = ["dnf-bdd", "unate-dnf", "force-unate-dnf",
"mixed-dnf-bdd"]
nonpc_encodings = ["circuit"]
def __init__(self, bn=minibn.BooleanNetwork(), auto_dnf=True,
simplify=False,
try_unate_hard=False,
encoding=DEFAULT_ENCODING,
boolfunclib=DEFAULT_BOOLFUNCLIB):
"""
Constructor for :py:class:`.MPBoooleanNetwork`.
:param bn: Boolean network to copy from
:type bn: :py:class:`colomoto.minibn.BooleanNetwork` or any type accepted by
:py:class:`colomoto.minibn.BooleanNetwork` constructor
:param bool auto_dnf: if ``False``, turns off automatic DNF
transformation of local functions
:param str boolfunlib: library to use for Boolean function manipulation
among ``"aeon"`` (default) or ``"pyeda"``. Default can be overriden with
``MPBN_BOOLFUNCLIB`` environment variable.
Examples:
>>> mbn = MPBooleanNetwork("network.bnet")
>>> bn = BooleanNetwork()
>>> bn["a"] = ".."; ...
>>> mbn = MPBooleanNetwork(bn)
"""
assert encoding in self.supported_encodings
assert boolfunclib in SUPPORTED_BOOLFUNCLIBS
self.auto_dnf = auto_dnf and encoding in self.dnf_encodings
self.encoding = encoding
self.try_unate_hard = try_unate_hard
self._simplify = simplify
self._is_unate = dict()
self._boolfunclib = boolfunclib
__boolfunclib_symbols = (
"make_dnf_boolfunc",
"bddasp_of_boolfunc",
)
self._bf_impl = __import__(f"mpbn.boolfunclib.{boolfunclib}_impl",
fromlist=__boolfunclib_symbols)
super(MPBooleanNetwork, self).__init__(bn)
def __setitem__(self, a, f):
"""
Assigns the Boolean function ``f`` to component ``a``.
Unless :py:attr:`.auto_dnf` is ``False``, ``f`` is converted into DNF
form first.
"""
if isinstance(f, str):
f = self.ba.parse(f)
f = self._autobool(f)
if self.auto_dnf:
f = self._bf_impl.make_dnf_boolfunc(self.ba, f,
simplify=self._simplify,
try_unate_hard=self.try_unate_hard)
a = self._autokey(a)
if self.encoding in self.dnf_encodings:
self._is_unate[a] = is_dnf_unate(self.ba, f)
if self.encoding == "unate-dnf":
assert self._is_unate[a], f"'{f}' seems not unate. Try simplify()?"
return super().__setitem__(a, f)
def asp_of_bn(self, encoding=None):
if encoding is None:
encoding = self.encoding
def clauses_of_dnf(f):
if isinstance(f, boolean.OR):
return f.args
else:
return [f]
def literals_of_clause(c):
def make_literal(l):
if isinstance(l, boolean.NOT):
return (l.args[0].obj, -1)
else:
return (l.obj, 1)
lits = c.args if isinstance(c, boolean.AND) else [c]
return map(make_literal, lits)
def encode_dnf(f):
facts = []
for cid, c in enumerate(clauses_of_dnf(f)):
for m, v in literals_of_clause(c):
facts.append(" clause(\"{}\",{},\"{}\",{}).".format(n, cid, m, v))
return facts
facts = []
for n, f in self.items():
facts.append("node(\"{}\").".format(n))
if encoding in ["unate-dnf", "force-unate-dnf"]:
f_encoding = "dnf"
elif encoding == "dnf-bdd":
f_encoding = "dnf" if self._is_unate[n] else "bdd"
else:
f_encoding = encoding
if f == self.ba.FALSE:
f = False
elif f == self.ba.TRUE:
f = True
if isinstance(f, bool):
facts.append("constant(\"{}\",{}).".format(n, s2v(f)))
elif f_encoding == "dnf":
facts.extend(encode_dnf(f))
elif f_encoding == "bdd":
facts.append(self._bf_impl.bddasp_of_boolfunc(self.ba, f, n))
elif f_encoding == "mixed-dnf-bdd":
facts.extend(encode_dnf(f))
if self._is_unate[n]:
facts.append(f"unate(\"{n}\").")
else:
facts.append(self._bf_impl.bddasp_of_boolfunc(self.ba, f, n))
elif f_encoding == "circuit":
facts.append(circuitasp_of_boolfunc(f, n, self.ba))
return "".join(facts)
def _file_eval(self):
if self.encoding == "circuit":
f = aspf("eval_circuit.asp")
elif self.encoding == "mixed-dnf-bdd":
f = aspf("eval_mixed.asp")
else:
f = aspf("mp_eval.asp")
return f
def rules_eval(self):
f = self._file_eval()
with open(f, "r") as fp:
return fp.read()
def load_eval(self, solver):
f = self._file_eval()
solver.load(f)
def assert_pc_encoding(self):
assert self.encoding not in self.nonpc_encodings, "Unsupported encoding"
def asp_of_cfg(self, e, t, c):
facts = ["timepoint({},{}).".format(e,t)]
facts += [" mp_state({},{},\"{}\",{}).".format(e,t,n,s2v(s))
for (n,s) in c.items()]
facts += [f"1 {{mp_state({e},{t},N,(-1;1))}} 1 :- node(N)."]
#facts += [" 1 {{mp_state({},{},\"{}\",(-1;1))}} 1 :- node(N).".format(e,t,n)
# for n in self if n not in c]
return "".join(facts)
[docs]
def reachability(self, x, y):
"""
Returns ``True`` whenever the configuration `y` is reachable from `x`
with the Most Permissive update mode.
Configurations can be partially defined.
In that case, returns ``True`` whenever there exists a configuration
matching with `y` which is reachable with at least one configuration
matching with `x`
:param dict[str,int] x: initial configuration
:param dict[str,int] y: target configuration
"""
self.assert_pc_encoding()
s = clingo_exists()
self.load_eval(s)
s.load(aspf("mp_positivereach-np.asp"))
s.add("base", [], self.asp_of_bn())
e = "default"
t1 = 0
t2 = 1
s.add("base", [], self.asp_of_cfg(e,t1,x))
s.add("base", [], self.asp_of_cfg(e,t2,y))
s.add("base", [], "is_reachable({},{},{}).".format(e,t1,t2))
s.ground([("base",[])])
res = s.solve()
return res.satisfiable
def _ground_rules(self, ctl, rules):
rules = "\n".join(rules)
ctl.add("base", [], rules)
ctl.ground([("base",[])])
def _fixedpoints(self, reachable_from=None, constraints={}, limit=0):
e = "fp"
t2 = "fp"
rules = [self.asp_of_cfg(e, t2, constraints)]
rules.append(f"mp_reach({e},{t2},N,V) :- mp_state({e},{t2},N,V).")
rules.append(f":- mp_state({e},{t2},N,V), mp_eval({e},{t2},N,-V).")
rules.append(self.asp_of_bn())
if reachable_from:
self.assert_pc_encoding()
t1 = "0"
rules.append(open(aspf("mp_positivereach-np.asp")).read())
rules.append(self.asp_of_cfg(e,t1,reachable_from))
rules.append("is_reachable({},{},{}).".format(e,t1,t2))
rules.append(f"#show. #show fixpoint(N,V) : mp_state({e},{t2},N,V).")
rules.append(open(aspf("mp_eval.asp")).read())
project = reachable_from and set(self.keys()).difference(reachable_from)
s = clingo_enum(limit=limit, project=project)
self._ground_rules(s, rules)
return s
[docs]
def fixedpoints(self, reachable_from=None, constraints={}, limit=0):
"""
Iterator over fixed points of the MPBN (i.e., of f)
:param dict[str,int] reachable_from: restrict to the attractors
reachable from the given configuration. Whenever partial, restrict
attractors to the one reachable by at least one matching
configuration.
:param dict[str,int] constraints: consider only attractors matching with
the given constraints.
:param int limit: maximum number of solutions, ``0`` for unlimited.
"""
s = self._fixedpoints(reachable_from=reachable_from,
constraints=constraints, limit=limit)
for sol in s.solve(yield_=True):
x = {n: None for n in self}
data = sol.symbols(shown=True)
for d in data:
if d.name != "fixpoint":
continue
(n, v) = d.arguments
n = n.string
v = v.number
v = 1 if v == 1 else 0
x[n] = v
yield x
[docs]
def count_fixedpoints(self, reachable_from=None, constraints={}, limit=0):
"""
Returns number of fixed points
:param dict[str,int] reachable_from: restrict to the attractors
reachable from the given configuration. Whenever partial, restrict
attractors to the one reachable by at least one matching
configuration.
:param dict[str,int] constraints: consider only attractors matching with
the given constraints.
:param int limit: maximum number of solutions, ``0`` for unlimited.
"""
s = self._fixedpoints(reachable_from=reachable_from,
constraints=constraints, limit=limit)
return sum((1 for _ in s.solve(yield_=True)))
def _trapspaces(self, reachable_from=None, subcube={}, limit=0,
mode="min", exclude_full=False):
self.assert_pc_encoding()
rules = []
rules.append(self.asp_of_bn())
rules.append(self.rules_eval())
rules.append(open(aspf("mp_attractor.asp")).read())
rules.append("#show attractor/2.")
e = "__a"
t2 = "final"
if exclude_full and not subcube:
rules.append(f"{{ mp_reach({e},{t2},N,(-1;1)): node(N) }} {len(self)*2-1}.")
if reachable_from:
t1 = "0"
rules.append(open(aspf("mp_positivereach-np.asp")).read())
rules.append(self.asp_of_cfg(e,t1,reachable_from))
rules.append("is_reachable({},{},{}).".format(e,t1,t2))
rules.append("mp_state({},{},N,V) :- attractor(N,V).".format(e,t2))
for n, b in subcube.items():
if isinstance(b, str):
b = int(b)
if b not in [0,1]:
continue
rules.append(":- mp_reach({},{},\"{}\",{}).".format(e,t2,n,s2v(1-b)))
project = reachable_from and set(self.keys()).difference(reachable_from)
solver = clingo_subsets if mode == "min" else clingo_supsets
s = solver(limit=limit, project=project)
self._ground_rules(s, rules)
return s
def _yield_trapspaces(self, *args, star="*", **kwargs):
s = self._trapspaces(*args, **kwargs)
for sol in s.solve(yield_=True):
attractor = {n: None for n in self}
data = sol.symbols(shown=True)
for d in data:
if d.name != "attractor":
continue
(n, v) = d.arguments
n = n.string
v = v.number
if v == 2:
v = star
else:
v = 1 if v == 1 else 0
if attractor[n] is not None:
if star is not None:
attractor[n] = star
else:
del attractor[n]
else:
attractor[n] = v
yield attractor
def _count_trapspaces(self, *args, **kwargs):
s = self._trapspaces(*args, **kwargs)
return sum((1 for _ in s.solve(yield_=True)))
[docs]
def attractors(self, reachable_from=None, constraints={}, limit=0, star='*'):
"""
Iterator over attractors of the MPBN (minimal trap spaces of the BN).
An attractor is an hypercube, represented by a dictionnary mapping every
component of the network to either ``0``, ``1``, or ``star``.
:param dict[str,int] reachable_from: restrict to the attractors
reachable from the given configuration. Whenever partial, restrict
attractors to the one reachable by at least one matching
configuration.
:param dict[str,int] constraints: consider only attractors matching with
the given constraints.
:param int limit: maximum number of solutions, ``0`` for unlimited.
:param str star: value to use for components which are free in the
attractor
"""
return self._yield_trapspaces(reachable_from=reachable_from,
subcube=constraints, limit=limit, star=star,
mode="min")
minimal_trapspaces = attractors
def maximal_trapspaces(self, limit=0, subcube={}, star="*",
exclude_full=True):
return self._yield_trapspaces(subcube=subcube, limit=limit, star=star,
mode="max", exclude_full=exclude_full)
[docs]
def count_attractors(self, reachable_from=None, constraints={}, limit=0):
"""
Returns number of attractors of the MPBN (minimal trap spaces of the BN).
:param dict[str,int] reachable_from: restrict to the attractors
reachable from the given configuration. Whenever partial, restrict
attractors to the one reachable by at least one matching
configuration.
:param dict[str,int] constraints: consider only attractors matching with
the given constraints.
:param int limit: maximum number of solutions, ``0`` for unlimited.
"""
return self._count_trapspaces(reachable_from=reachable_from,
subcube=constraints, limit=limit,
mode="min")
count_minimal_trapspaces = count_attractors
[docs]
def count_maximal_trapspaces(self, reachable_from=None, constraints={}, limit=0):
"""
Returns number of attractors of the MPBN (minimal trap spaces of the BN).
:param dict[str,int] reachable_from: restrict to the attractors
reachable from the given configuration. Whenever partial, restrict
attractors to the one reachable by at least one matching
configuration.
:param dict[str,int] constraints: consider only attractors matching with
the given constraints.
:param int limit: maximum number of solutions, ``0`` for unlimited.
"""
return self._count_trapspaces(reachable_from=reachable_from,
subcube=constraints, limit=limit,
mode="max")
def has_cyclic_attractor(self):
for a in self.attractors():
if "*" in a.values():
return True
return False
[docs]
def reachable_from(self, x, reversed=False):
"""
Returns an iterator over the configurations reachable from ``x`` with the
Most Permissive update mode.
Configuration ``x`` can be partially defined: in that case a configuration
is yielded whnever it is reachable from at least one configuration
matching with ``x``.
Whenever ``reversed`` is ``True``, yields over the configurations that can
reach `x` instead.
"""
self.assert_pc_encoding()
s = clingo_enum()
self.load_eval(s)
s.load(aspf("mp_positivereach-np.asp"))
s.add("base", [], self.asp_of_bn())
e = "default"
t1 = 0
t2 = 1
s.add("base", [], self.asp_of_cfg(e,t1,x if not reversed else {}))
s.add("base", [], self.asp_of_cfg(e,t2,{} if not reversed else x))
s.add("base", [], "is_reachable({},{},{}).".format(e,t1,t2))
t = t2 if not reversed else t1
s.add("base", [], "#show." \
f"#show mp_state(E,T,N,V) : mp_state(E,T,N,V), E={e}, T={t}.")
s.ground([("base",[])])
def cfg_of_asp(atoms):
return {a.arguments[2].string: v2s(a.arguments[3].number) for a in atoms}
for sol in s.solve(yield_=True):
data = sol.symbols(shown=True)
yield cfg_of_asp(data)
[docs]
def dynamics(self, update_mode="mp", **kwargs):
"""
Returns a :py:class:`networkx.DiGraph` object representing the transitions between
the configurations using the Most Permissive update mode by default.
See :py:meth:`colomoto.minibn.BooleanNetwork.dynamics`.
"""
if update_mode in ["mp", "most-permissive"]:
update_mode = MostPermissiveDynamics
return super().dynamics(update_mode=update_mode, **kwargs)
[docs]
def load(filename, **opts):
"""
Create a :py:class:`.MPBooleanNetwork` object from ``filename`` in BoolNet
format; ``filename`` can be a local file or an URL.
"""
return MPBooleanNetwork.load(filename, **opts)
[docs]
class MostPermissiveDynamics(minibn.UpdateModeDynamics):
def __init__(self, model, **opts):
if not isinstance(model, MPBooleanNetwork)\
and isinstance(model, minibn.BooleanNetwork):
model = MPBooleanNetwork(model)
super().__init__(model, **opts)
def __call__(self, x):
return self.model.reachable_from(x)
__all__ = ["load", "MPBooleanNetwork", "MostPermissiveDynamics"]