diff --git a/syndicate/actor.py b/syndicate/actor.py index 76152b2..f714817 100644 --- a/syndicate/actor.py +++ b/syndicate/actor.py @@ -7,6 +7,7 @@ import traceback from preserves import Embedded, preserve from .idgen import IdGenerator +from .dataflow import Graph, Field log = logging.getLogger(__name__) @@ -57,6 +58,7 @@ class Actor: self.exit_reason = None # None -> running, True -> terminated OK, exn -> error self.exit_hooks = [] self._log = None + self._dataflow_graph = None Turn.run(Facet(self, self.root, set(self.outbound.keys())), stop_if_inert_after(boot_proc)) def __repr__(self): @@ -82,12 +84,22 @@ class Actor: self._log = logging.getLogger('syndicate.Actor.%s' % (self.name,)) return self._log + @property + def dataflow_graph(self): + if self._dataflow_graph is None: + self._dataflow_graph = Graph() + return self._dataflow_graph + def at_exit(self, hook): self.exit_hooks.append(hook) def cancel_at_exit(self, hook): remove_noerror(self.exit_hooks, hook) + def _repair_dataflow_graph(self, turn): + if self._dataflow_graph is not None: + self._dataflow_graph.repair_damage(lambda a: a(turn)) + def _terminate(self, turn, exit_reason): if self.exit_reason is not None: return self.log.debug('Terminating %r with exit_reason %r', self, exit_reason) @@ -262,6 +274,7 @@ class Turn: turn = cls(facet) try: action(turn) + facet.actor._repair_dataflow_graph(turn) except: ei = sys.exc_info() facet.log.error('%s', ''.join(traceback.format_exception(*ei))) @@ -331,6 +344,24 @@ class Turn: def crash(self, exn): self._enqueue(self._facet.actor.root, lambda turn: self._facet.actor._terminate(turn, exn)) + def field(self, initial_value=None, name=None): + return Field(self._facet.actor.dataflow_graph, initial_value, name) + + # can also be used as a decorator + def dataflow(self, a): + f = self._facet + f.prevent_inert_check() + def subject(turn): + if not f.alive: return + with ActiveFacet(turn, f): + a(turn) + f.on_stop(lambda turn: f.actor.dataflow_graph.forget_subject(subject)) + f.actor.dataflow_graph.with_subject(subject, lambda: subject(self)) + + def publish_dataflow(self, assertion_function): + endpoint = DataflowPublication(assertion_function) + self.dataflow(lambda turn: endpoint.update(turn)) + def publish(self, ref, assertion): handle = next(_next_handle) self._publish(ref, assertion, handle) @@ -356,7 +387,10 @@ class Turn: self._retract(e) def replace(self, ref, handle, assertion): - new_handle = None if assertion is None else self.publish(ref, assertion) + if assertion is None or ref is None: + new_handle = None + else: + new_handle = self.publish(ref, assertion) self.retract(handle) return new_handle @@ -421,6 +455,20 @@ def stop_if_inert_after(action): turn._enqueue(turn._facet, check_action) return wrapped_action +class DataflowPublication: + def __init__(self, assertion_function): + self.assertion_function = assertion_function + self.handle = None + self.target = None + self.assertion = None + + def update(self, turn): + (next_target, next_assertion) = self.assertion_function(turn) or (None, None) + if next_target != self.target or next_assertion != self.assertion_function: + self.target = next_target + self.assertion = next_assertion + self.handle = turn.replace(self.target, self.handle, self.assertion) + class Ref: def __init__(self, facet, entity): self.facet = facet diff --git a/syndicate/dataflow.py b/syndicate/dataflow.py new file mode 100644 index 0000000..64ce7cb --- /dev/null +++ b/syndicate/dataflow.py @@ -0,0 +1,78 @@ +from . import mapset + +class Graph: + def __init__(self): + self.edges_forward = {} + self.edges_reverse = {} + self.damaged_nodes = set() + self.active_subject = None + + def with_subject(self, subject_id, f): + old_subject = self.active_subject + self.active_subject = subject_id + try: + return f() + finally: + self.active_subject = old_subject + + def record_observation(self, object_id): + if self.active_subject is not None: + mapset.add(self.edges_forward, object_id, self.active_subject) + mapset.add(self.edges_reverse, self.active_subject, object_id) + + def record_damage(self, object_id): + self.damaged_nodes.add(object_id) + + def forget_subject(self, subject_id): + for oid in self.edges_reverse.pop(subject_id, set()): + mapset.discard(self.edges_forward, oid, subject_id) + + def observers_of(self, object_id): + return list(self.edges_forward.get(object_id, [])) + + def repair_damage(self, repair_fn): + repaired_this_round = set() + while True: + workset = self.damaged_nodes - repaired_this_round + self.damaged_nodes = set() + + if not workset: + break + + repaired_this_round = repaired_this_round | workset + + for object_id in workset: + for subject_id in self.observers_of(object_id): + self.forget_subject(subject_id) + self.with_subject(subject_id, lambda: repair_fn(subject_id)) + +__nextFieldId = 0 + +class Field: + def __init__(self, graph, initial=None, name=None): + global __nextFieldId + self.id = name + if self.id is None: + self.id = str(__nextFieldId) + __nextFieldId = __nextFieldId + 1 + self.graph = graph + self._value = initial + + @property + def value(self): + self.graph.record_observation(self) + return self._value + + @value.setter + def value(self, new_value): + if self._value != new_value: + self.graph.record_damage(self) + self._value = new_value + + @property + def update(self): + self.graph.record_damage(self) + return self.value + + def changed(self): + self.graph.record_damage(self) diff --git a/syndicate/mapset.py b/syndicate/mapset.py new file mode 100644 index 0000000..735001f --- /dev/null +++ b/syndicate/mapset.py @@ -0,0 +1,14 @@ +def add(m, k, v): + s = m.get(k) + if s is None: + s = set() + m[k] = s + s.add(v) + +def discard(m, k, v): + s = m.get(k) + if s is None: + return + s.discard(v) + if not s: + m.pop(k)