From 82e5a2c07b25bd7e5ac23ef9daec23854583ef00 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 19 Aug 2021 15:50:21 -0400 Subject: [PATCH] Utility decorators --- chat.py | 26 ++++++++++++-------------- syndicate/actor.py | 6 +++++- syndicate/dataspace.py | 8 ++++++++ syndicate/during.py | 23 +++++++++++++++++++++-- 4 files changed, 46 insertions(+), 17 deletions(-) create mode 100644 syndicate/dataspace.py diff --git a/chat.py b/chat.py index 485d5c9..4ead1b8 100644 --- a/chat.py +++ b/chat.py @@ -2,8 +2,8 @@ import sys import asyncio import random import syndicate -from syndicate import patterns as P, actor -from syndicate.schema import simpleChatProtocol, gatekeeper, sturdy, dataspace +from syndicate import patterns as P, actor, dataspace +from syndicate.schema import simpleChatProtocol, gatekeeper, sturdy from syndicate.during import During Present = simpleChatProtocol.Present @@ -29,35 +29,33 @@ def main_facet(turn, root_facet, ds): f = turn._facet turn.publish(ds, Present(me)) + @During().observe(turn, ds, P.rec('Present', P.CAPTURE)) def on_presence(turn, who): print('%s joined' % (who,)) return lambda turn: print('%s left' % (who,)) - turn.publish(ds, dataspace.Observe(P.rec('Present', P.CAPTURE), - During(turn, on_add = on_presence).ref)) + @dataspace.observe(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE)) + @During().msg_handler def on_says(turn, who, what): print('%s says %r' % (who, what)) - turn.publish(ds, dataspace.Observe(P.rec('Says', P.CAPTURE, P.CAPTURE), - During(turn, on_msg = on_says).ref)) + @turn.linked_task() async def accept_input(): reader = asyncio.StreamReader() await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin) while line := (await reader.readline()).decode('utf-8'): actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip()))) actor.Turn.external(f, lambda turn: turn.stop(root_facet)) - turn.linked_task(accept_input()) def main(turn): root_facet = turn._facet + @During().add_handler def handle_gatekeeper(turn, gk): - turn.publish(gk.embeddedValue, gatekeeper.Resolve(cap, ds_receiver)) - gk_receiver = During(turn, on_add = handle_gatekeeper).ref - - def handle_ds(turn, ds): - return turn.facet(lambda turn: main_facet(turn, root_facet, ds.embeddedValue)) - ds_receiver = During(turn, on_add = handle_ds).ref + @During().add_handler + def handle_ds(turn, ds): + return turn.facet(lambda turn: main_facet(turn, root_facet, ds.embeddedValue)) + turn.publish(gk.embeddedValue, gatekeeper.Resolve(cap, turn.ref(handle_ds))) disarm = turn.prevent_inert_check() async def on_connected(tr): @@ -72,7 +70,7 @@ def main(turn): conn = syndicate.relay.TunnelRelay.from_str(turn, conn_str, - gatekeeper_peer = gk_receiver, + gatekeeper_peer = turn.ref(handle_gatekeeper), on_connected = on_connected, on_disconnected = on_disconnected) diff --git a/syndicate/actor.py b/syndicate/actor.py index 4452078..82ba4f4 100644 --- a/syndicate/actor.py +++ b/syndicate/actor.py @@ -251,7 +251,11 @@ class Turn: def prevent_inert_check(self): return self._facet.prevent_inert_check() - def linked_task(self, coro, loop = None): + # decorator + def linked_task(self, loop = None): + return lambda thunk: self._linked_task(thunk(), loop = loop) + + def _linked_task(self, coro, loop = None): task = None def cancel_linked_task(turn): nonlocal task diff --git a/syndicate/dataspace.py b/syndicate/dataspace.py new file mode 100644 index 0000000..94ad165 --- /dev/null +++ b/syndicate/dataspace.py @@ -0,0 +1,8 @@ +from .schema import dataspace + +# decorator +def observe(turn, ds, pattern): + def publish_observer(entity): + turn.publish(ds, dataspace.Observe(pattern, turn.ref(entity))) + return entity + return publish_observer diff --git a/syndicate/during.py b/syndicate/during.py index e56375e..07ec0b6 100644 --- a/syndicate/during.py +++ b/syndicate/during.py @@ -1,4 +1,5 @@ from . import actor +from . import dataspace def _ignore(*args, **kwargs): pass @@ -7,8 +8,7 @@ def _default_sync(turn, peer): turn.send(peer, True) class During(actor.Entity): - def __init__(self, turn, on_add=None, on_msg=None, on_sync=None, name=None): - self.ref = turn.ref(self) + def __init__(self, on_add=None, on_msg=None, on_sync=None, name=None): self.retract_handlers = {} self._on_add = on_add or _ignore self._on_msg = on_msg or _ignore @@ -45,3 +45,22 @@ class During(actor.Entity): def on_sync(self, turn, peer): self._on_sync(turn, peer) + + # decorator + def add_handler(self, on_add): + self._on_add = on_add + return self + + # decorator + def msg_handler(self, on_msg): + self._on_msg = on_msg + return self + + # decorator + def sync_handler(self, on_sync): + self._on_sync = on_sync + return self + + # decorator + def observe(self, turn, ds, pattern): + return lambda on_add: dataspace.observe(turn, ds, pattern)(self.add_handler(on_add))