diff --git a/chat.py b/chat.py index 4ead1b8..38b06f9 100644 --- a/chat.py +++ b/chat.py @@ -29,13 +29,12 @@ def main_facet(turn, root_facet, ds): f = turn._facet turn.publish(ds, Present(me)) - @During().observe(turn, ds, P.rec('Present', P.CAPTURE)) + @dataspace.during(turn, ds, P.rec('Present', P.CAPTURE)) def on_presence(turn, who): print('%s joined' % (who,)) return lambda turn: print('%s left' % (who,)) - @dataspace.observe(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE)) - @During().msg_handler + @dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE)) def on_says(turn, who, what): print('%s says %r' % (who, what)) @@ -57,9 +56,7 @@ def main(turn): return turn.facet(lambda turn: main_facet(turn, root_facet, ds.embeddedValue)) turn.publish(gk.embeddedValue, gatekeeper.Resolve(cap, turn.ref(handle_ds))) - disarm = turn.prevent_inert_check() async def on_connected(tr): - disarm() print('-'*50, 'Connected') async def on_disconnected(tr, did_connect): if did_connect: diff --git a/syndicate/actor.py b/syndicate/actor.py index 82ba4f4..faa492f 100644 --- a/syndicate/actor.py +++ b/syndicate/actor.py @@ -110,6 +110,7 @@ class Facet: self.children = set() self.outbound = initial_assertions self.shutdown_actions = [] + self.linked_tasks = [] self.alive = True self.inert_check_preventers = 0 @@ -137,7 +138,11 @@ class Facet: remove_noerror(self.shutdown_actions, a) def isinert(self): - return len(self.children) == 0 and len(self.outbound) == 0 and self.inert_check_preventers == 0 + return \ + len(self.children) == 0 and \ + len(self.outbound) == 0 and \ + len(self.linked_tasks) == 0 and \ + self.inert_check_preventers == 0 def prevent_inert_check(self): armed = True @@ -149,6 +154,26 @@ class Facet: self.inert_check_preventers = self.inert_check_preventers - 1 return disarm + def linked_task(self, coro, loop = None): + task = None + def cancel_linked_task(turn): + nonlocal task + if task is not None: + remove_noerror(self.linked_tasks, task) + task.cancel() + task = None + self.cancel_on_stop(cancel_linked_task) + self.actor.cancel_at_exit(cancel_linked_task) + async def guarded_task(): + try: + await coro + finally: + Turn.external(self, cancel_linked_task) + task = find_loop(loop).create_task(guarded_task()) + self.linked_tasks.append(task) + self.on_stop(cancel_linked_task) + self.actor.at_exit(cancel_linked_task) + def _terminate(self, turn, orderly): if not self.alive: return self.log.debug('%s terminating %r', 'orderly' if orderly else 'disorderly', self) @@ -253,25 +278,7 @@ class Turn: # decorator def linked_task(self, loop = None): - return lambda thunk: self._linked_task(thunk(), loop = loop) - - def _linked_task(self, coro, loop = None): - task = None - def cancel_linked_task(turn): - nonlocal task - if task is not None: - task.cancel() - task = None - self._facet.cancel_on_stop(cancel_linked_task) - self._facet.actor.cancel_at_exit(cancel_linked_task) - async def guarded_task(): - try: - await coro - finally: - Turn.external(self._facet, cancel_linked_task) - task = find_loop(loop).create_task(guarded_task()) - self._facet.on_stop(cancel_linked_task) - self._facet.actor.at_exit(cancel_linked_task) + return lambda thunk: self._facet.linked_task(thunk(), loop = loop) def stop(self, facet = None, continuation = None): if facet is None: diff --git a/syndicate/dataspace.py b/syndicate/dataspace.py index 94ad165..0ca7deb 100644 --- a/syndicate/dataspace.py +++ b/syndicate/dataspace.py @@ -1,4 +1,5 @@ from .schema import dataspace +from .during import During # decorator def observe(turn, ds, pattern): @@ -6,3 +7,11 @@ def observe(turn, ds, pattern): turn.publish(ds, dataspace.Observe(pattern, turn.ref(entity))) return entity return publish_observer + +# decorator +def on_message(turn, ds, pattern): + return lambda on_msg: observe(turn, ds, pattern)(During().msg_handler(on_msg)) + +# decorator +def during(turn, ds, pattern): + return lambda on_add: observe(turn, ds, pattern)(During().add_handler(on_add)) diff --git a/syndicate/during.py b/syndicate/during.py index 07ec0b6..aafa67e 100644 --- a/syndicate/during.py +++ b/syndicate/during.py @@ -1,5 +1,4 @@ from . import actor -from . import dataspace def _ignore(*args, **kwargs): pass @@ -60,7 +59,3 @@ class During(actor.Entity): def sync_handler(self, on_sync): self._on_sync = on_sync return self - - # decorator - def observe(self, turn, ds, pattern): - return lambda on_add: dataspace.observe(turn, ds, pattern)(self.add_handler(on_add)) diff --git a/syndicate/relay.py b/syndicate/relay.py index b7f60f4..d131619 100644 --- a/syndicate/relay.py +++ b/syndicate/relay.py @@ -6,7 +6,7 @@ from preserves import Embedded, stringify from preserves.fold import map_embeddeds from . import actor, encode, transport, Decoder -from .actor import _inert_ref, Turn, adjust_engine_inhabitant_count +from .actor import _inert_ref, Turn from .idgen import IdGenerator from .schema import externalProtocol as protocol, sturdy, transportAddress @@ -64,16 +64,15 @@ class TunnelRelay: on_connected = None, on_disconnected = None, ): - self.ref = turn.ref(self) self.facet = turn._facet self.facet.on_stop(self._shutdown) self.address = address self.gatekeeper_peer = gatekeeper_peer self.gatekeeper_oid = gatekeeper_oid self._reset() - actor.queue_task(lambda: self._reconnecting_main(asyncio.get_running_loop(), - on_connected = on_connected, - on_disconnected = on_disconnected)) + self.facet.linked_task(self._reconnecting_main(asyncio.get_running_loop(), + on_connected = on_connected, + on_disconnected = on_disconnected)) def _reset(self): self.inbound_assertions = {} # map remote handle to InboundAssertion @@ -232,12 +231,10 @@ class TunnelRelay: raise Exception('subclassresponsibility') async def _reconnecting_main(self, loop, on_connected=None, on_disconnected=None): - adjust_engine_inhabitant_count(1) should_run = True while should_run and self.facet.alive: did_connect = await self.main(loop, on_connected=(on_connected or _default_on_connected)) should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect) - adjust_engine_inhabitant_count(-1) @staticmethod def from_str(turn, s, **kwargs):