This commit is contained in:
Tony Garnock-Jones 2021-12-25 11:35:11 -05:00
parent 46b5889ea4
commit fd36cff912
3 changed files with 141 additions and 1 deletions

View File

@ -7,6 +7,7 @@ import traceback
from preserves import Embedded, preserve
from .idgen import IdGenerator
from .dataflow import Graph, Field
log = logging.getLogger(__name__)
@ -57,6 +58,7 @@ class Actor:
self.exit_reason = None # None -> running, True -> terminated OK, exn -> error
self.exit_hooks = []
self._log = None
self._dataflow_graph = None
Turn.run(Facet(self, self.root, set(self.outbound.keys())), stop_if_inert_after(boot_proc))
def __repr__(self):
@ -82,12 +84,22 @@ class Actor:
self._log = logging.getLogger('syndicate.Actor.%s' % (self.name,))
return self._log
@property
def dataflow_graph(self):
if self._dataflow_graph is None:
self._dataflow_graph = Graph()
return self._dataflow_graph
def at_exit(self, hook):
self.exit_hooks.append(hook)
def cancel_at_exit(self, hook):
remove_noerror(self.exit_hooks, hook)
def _repair_dataflow_graph(self, turn):
if self._dataflow_graph is not None:
self._dataflow_graph.repair_damage(lambda a: a(turn))
def _terminate(self, turn, exit_reason):
if self.exit_reason is not None: return
self.log.debug('Terminating %r with exit_reason %r', self, exit_reason)
@ -262,6 +274,7 @@ class Turn:
turn = cls(facet)
try:
action(turn)
facet.actor._repair_dataflow_graph(turn)
except:
ei = sys.exc_info()
facet.log.error('%s', ''.join(traceback.format_exception(*ei)))
@ -331,6 +344,24 @@ class Turn:
def crash(self, exn):
self._enqueue(self._facet.actor.root, lambda turn: self._facet.actor._terminate(turn, exn))
def field(self, initial_value=None, name=None):
return Field(self._facet.actor.dataflow_graph, initial_value, name)
# can also be used as a decorator
def dataflow(self, a):
f = self._facet
f.prevent_inert_check()
def subject(turn):
if not f.alive: return
with ActiveFacet(turn, f):
a(turn)
f.on_stop(lambda turn: f.actor.dataflow_graph.forget_subject(subject))
f.actor.dataflow_graph.with_subject(subject, lambda: subject(self))
def publish_dataflow(self, assertion_function):
endpoint = DataflowPublication(assertion_function)
self.dataflow(lambda turn: endpoint.update(turn))
def publish(self, ref, assertion):
handle = next(_next_handle)
self._publish(ref, assertion, handle)
@ -356,7 +387,10 @@ class Turn:
self._retract(e)
def replace(self, ref, handle, assertion):
new_handle = None if assertion is None else self.publish(ref, assertion)
if assertion is None or ref is None:
new_handle = None
else:
new_handle = self.publish(ref, assertion)
self.retract(handle)
return new_handle
@ -421,6 +455,20 @@ def stop_if_inert_after(action):
turn._enqueue(turn._facet, check_action)
return wrapped_action
class DataflowPublication:
def __init__(self, assertion_function):
self.assertion_function = assertion_function
self.handle = None
self.target = None
self.assertion = None
def update(self, turn):
(next_target, next_assertion) = self.assertion_function(turn) or (None, None)
if next_target != self.target or next_assertion != self.assertion_function:
self.target = next_target
self.assertion = next_assertion
self.handle = turn.replace(self.target, self.handle, self.assertion)
class Ref:
def __init__(self, facet, entity):
self.facet = facet

78
syndicate/dataflow.py Normal file
View File

@ -0,0 +1,78 @@
from . import mapset
class Graph:
def __init__(self):
self.edges_forward = {}
self.edges_reverse = {}
self.damaged_nodes = set()
self.active_subject = None
def with_subject(self, subject_id, f):
old_subject = self.active_subject
self.active_subject = subject_id
try:
return f()
finally:
self.active_subject = old_subject
def record_observation(self, object_id):
if self.active_subject is not None:
mapset.add(self.edges_forward, object_id, self.active_subject)
mapset.add(self.edges_reverse, self.active_subject, object_id)
def record_damage(self, object_id):
self.damaged_nodes.add(object_id)
def forget_subject(self, subject_id):
for oid in self.edges_reverse.pop(subject_id, set()):
mapset.discard(self.edges_forward, oid, subject_id)
def observers_of(self, object_id):
return list(self.edges_forward.get(object_id, []))
def repair_damage(self, repair_fn):
repaired_this_round = set()
while True:
workset = self.damaged_nodes - repaired_this_round
self.damaged_nodes = set()
if not workset:
break
repaired_this_round = repaired_this_round | workset
for object_id in workset:
for subject_id in self.observers_of(object_id):
self.forget_subject(subject_id)
self.with_subject(subject_id, lambda: repair_fn(subject_id))
__nextFieldId = 0
class Field:
def __init__(self, graph, initial=None, name=None):
global __nextFieldId
self.id = name
if self.id is None:
self.id = str(__nextFieldId)
__nextFieldId = __nextFieldId + 1
self.graph = graph
self._value = initial
@property
def value(self):
self.graph.record_observation(self)
return self._value
@value.setter
def value(self, new_value):
if self._value != new_value:
self.graph.record_damage(self)
self._value = new_value
@property
def update(self):
self.graph.record_damage(self)
return self.value
def changed(self):
self.graph.record_damage(self)

14
syndicate/mapset.py Normal file
View File

@ -0,0 +1,14 @@
def add(m, k, v):
s = m.get(k)
if s is None:
s = set()
m[k] = s
s.add(v)
def discard(m, k, v):
s = m.get(k)
if s is None:
return
s.discard(v)
if not s:
m.pop(k)