diff --git a/chat.py b/chat.py index 13f076b..c8479b8 100644 --- a/chat.py +++ b/chat.py @@ -2,8 +2,8 @@ import sys import asyncio import random import syndicate -from syndicate import patterns as P, actor, dataspace -from syndicate.schema import simpleChatProtocol, gatekeeper, sturdy +from syndicate import patterns as P, actor, dataspace, gatekeeper +from syndicate.schema import simpleChatProtocol, sturdy from syndicate.during import During Present = simpleChatProtocol.Present @@ -13,51 +13,30 @@ conn_str = '' cap_str = '' cap = sturdy.SturdyRef.decode(syndicate.parse(cap_str)) -# sys.stderr.write( -# 'Usage: chat.py [ | | ]\n') -# sys.exit(1) - -me = 'user_' + str(random.randint(10, 1000)) - -_print = print -def print(*items): - _print(*items) - sys.stdout.flush() - -def main_facet(turn, root_facet, ds): - print('main_facet', ds) - f = turn._facet - turn.publish(ds, Present(me)) - - @dataspace.during(turn, ds, P.rec('Present', P.CAPTURE)) - def on_presence(turn, who): - print('%s joined' % (who,)) - return lambda turn: print('%s left' % (who,)) - - @dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE)) - def on_says(turn, who, what): - print('%s says %r' % (who, what)) - - @turn.linked_task() - async def accept_input(): - reader = asyncio.StreamReader() - await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin) - while line := (await reader.readline()).decode('utf-8'): - actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip()))) - actor.Turn.external(f, lambda turn: turn.stop(root_facet)) - def main(turn): root_facet = turn._facet - @During().add_handler - def handle_gatekeeper(turn, gk): - @During().add_handler - def handle_ds(turn, ds): - return turn.facet(lambda turn: main_facet(turn, root_facet, ds.embeddedValue)) - turn.publish(gk.embeddedValue, gatekeeper.Resolve(cap, turn.ref(handle_ds))) + @syndicate.relay.connect(turn, conn_str, cap) + def on_connected(turn, ds): + me = 'user_' + str(random.randint(10, 1000)) - conn = syndicate.relay.TunnelRelay.from_str(turn, - conn_str, - gatekeeper_peer = turn.ref(handle_gatekeeper)) + turn.publish(ds, Present(me)) + + @dataspace.during(turn, ds, P.rec('Present', P.CAPTURE), inert_ok=True) + def on_presence(turn, who): + print('%s joined' % (who,)) + turn.on_stop(lambda turn: print('%s left' % (who,))) + + @dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE)) + def on_says(turn, who, what): + print('%s says %r' % (who, what)) + + @turn.linked_task() + async def accept_input(f): + reader = asyncio.StreamReader() + await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin) + while line := (await reader.readline()).decode('utf-8'): + actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip()))) + actor.Turn.external(f, lambda turn: turn.stop(root_facet)) actor.start_actor_system(main, name = 'chat', debug = False) diff --git a/syndicate/actor.py b/syndicate/actor.py index faa492f..667c39f 100644 --- a/syndicate/actor.py +++ b/syndicate/actor.py @@ -101,14 +101,14 @@ class Actor: queue_task(finish_termination) class Facet: - def __init__(self, actor, parent, initial_assertions = {}): + def __init__(self, actor, parent, initial_assertions = None): self.id = next(_next_facet_id) self.actor = actor self.parent = parent if parent: parent.children.add(self) self.children = set() - self.outbound = initial_assertions + self.outbound = initial_assertions or {} self.shutdown_actions = [] self.linked_tasks = [] self.alive = True @@ -154,7 +154,7 @@ class Facet: self.inert_check_preventers = self.inert_check_preventers - 1 return disarm - def linked_task(self, coro, loop = None): + def linked_task(self, coro_fn, loop = None): task = None def cancel_linked_task(turn): nonlocal task @@ -166,7 +166,7 @@ class Facet: self.actor.cancel_at_exit(cancel_linked_task) async def guarded_task(): try: - await coro + await coro_fn(self) finally: Turn.external(self, cancel_linked_task) task = find_loop(loop).create_task(guarded_task()) @@ -267,6 +267,7 @@ class Turn: def ref(self, entity): return Ref(self._facet, entity) + # this actually can work as a decorator as well as a normal method! def facet(self, boot_proc): new_facet = Facet(self._facet.actor, self._facet) with ActiveFacet(self, new_facet): @@ -278,7 +279,7 @@ class Turn: # decorator def linked_task(self, loop = None): - return lambda thunk: self._facet.linked_task(thunk(), loop = loop) + return lambda coro_fn: self._facet.linked_task(coro_fn, loop = loop) def stop(self, facet = None, continuation = None): if facet is None: @@ -289,6 +290,10 @@ class Turn: continuation(turn) self._enqueue(facet.parent, action) + # can also be used as a decorator + def on_stop(self, a): + self._facet.on_stop(a) + def spawn(self, boot_proc, name = None, initial_assertions = None, daemon = False): def action(turn): new_outbound = {} @@ -404,6 +409,10 @@ class OutboundAssertion: self.ref = ref self.established = False + def __repr__(self): + return '' % \ + (self.handle, self.ref, ' established' if self.established else '') + # Can act as a mixin class Entity: def on_publish(self, turn, v, handle): diff --git a/syndicate/dataspace.py b/syndicate/dataspace.py index 0ca7deb..6bfa461 100644 --- a/syndicate/dataspace.py +++ b/syndicate/dataspace.py @@ -9,9 +9,9 @@ def observe(turn, ds, pattern): return publish_observer # decorator -def on_message(turn, ds, pattern): - return lambda on_msg: observe(turn, ds, pattern)(During().msg_handler(on_msg)) +def on_message(turn, ds, pattern, *args, **kwargs): + return lambda on_msg: observe(turn, ds, pattern)(During(*args, **kwargs).msg_handler(on_msg)) # decorator -def during(turn, ds, pattern): - return lambda on_add: observe(turn, ds, pattern)(During().add_handler(on_add)) +def during(turn, ds, pattern, *args, **kwargs): + return lambda on_add: observe(turn, ds, pattern)(During(*args, **kwargs).add_handler(on_add)) diff --git a/syndicate/during.py b/syndicate/during.py index aafa67e..d6279c2 100644 --- a/syndicate/during.py +++ b/syndicate/during.py @@ -7,12 +7,13 @@ def _default_sync(turn, peer): turn.send(peer, True) class During(actor.Entity): - def __init__(self, on_add=None, on_msg=None, on_sync=None, name=None): - self.retract_handlers = {} + def __init__(self, on_add=None, on_msg=None, on_sync=None, name=None, inert_ok=False): + self.facets = {} self._on_add = on_add or _ignore self._on_msg = on_msg or _ignore self._on_sync = on_sync or _default_sync self.name = name + self.inert_ok = inert_ok self.flatten_arg = True def __repr__(self): @@ -24,20 +25,15 @@ class During(actor.Entity): return v if self.flatten_arg and isinstance(v, tuple) else (v,) def on_publish(self, turn, v, handle): - retract_handler = self._on_add(turn, *self._wrap(v)) - if retract_handler is not None: - if isinstance(retract_handler, actor.Facet): - self.retract_handlers[handle] = lambda turn: turn.stop(retract_handler) - elif callable(retract_handler): - self.retract_handlers[handle] = retract_handler - else: - raise ValueError('Non-callable retract_handler', { - 'retract_handler': retract_handler, - 'on_add': self._on_add, - }) + facet = turn.facet(lambda turn: self._on_add(turn, *self._wrap(v))) + if self.inert_ok: + facet.prevent_inert_check() + self.facets[handle] = facet def on_retract(self, turn, handle): - self.retract_handlers.pop(handle, lambda turn: ())(turn) + facet = self.facets.pop(handle, None) + if facet is not None: + turn.stop(facet) def on_message(self, turn, v): self._on_msg(turn, *self._wrap(v)) diff --git a/syndicate/gatekeeper.py b/syndicate/gatekeeper.py new file mode 100644 index 0000000..961c8e5 --- /dev/null +++ b/syndicate/gatekeeper.py @@ -0,0 +1,17 @@ +from .schema import gatekeeper +from .during import During + +# decorator +def resolve(turn, gk, cap, *args, **kwargs): + def configure_handler(handler): + def unwrapping_handler(turn, wrapped_ref): + return handler(turn, wrapped_ref.embeddedValue) + return _resolve(turn, gk, cap)(During(*args, **kwargs).add_handler(unwrapping_handler)) + return configure_handler + +# decorator +def _resolve(turn, gk, cap): + def publish_resolution_request(entity): + turn.publish(gk, gatekeeper.Resolve(cap, turn.ref(entity))) + return entity + return publish_resolution_request diff --git a/syndicate/relay.py b/syndicate/relay.py index d131619..6122301 100644 --- a/syndicate/relay.py +++ b/syndicate/relay.py @@ -5,7 +5,8 @@ import logging from preserves import Embedded, stringify from preserves.fold import map_embeddeds -from . import actor, encode, transport, Decoder +from . import actor, encode, transport, Decoder, gatekeeper +from .during import During from .actor import _inert_ref, Turn from .idgen import IdGenerator from .schema import externalProtocol as protocol, sturdy, transportAddress @@ -70,9 +71,10 @@ class TunnelRelay: self.gatekeeper_peer = gatekeeper_peer self.gatekeeper_oid = gatekeeper_oid self._reset() - self.facet.linked_task(self._reconnecting_main(asyncio.get_running_loop(), - on_connected = on_connected, - on_disconnected = on_disconnected)) + self.facet.linked_task( + lambda facet: self._reconnecting_main(asyncio.get_running_loop(), + on_connected = on_connected, + on_disconnected = on_disconnected)) def _reset(self): self.inbound_assertions = {} # map remote handle to InboundAssertion @@ -237,8 +239,21 @@ class TunnelRelay: should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect) @staticmethod - def from_str(turn, s, **kwargs): - return transport.connection_from_str(turn, s, **kwargs) + def from_str(turn, conn_str, **kwargs): + return transport.connection_from_str(turn, conn_str, **kwargs) + +# decorator +def connect(turn, conn_str, cap, **kwargs): + def prepare_resolution_handler(handler): + @During().add_handler + def handle_gatekeeper(turn, gk): + gatekeeper.resolve(turn, gk.embeddedValue, cap)(handler) + return transport.connection_from_str( + turn, + conn_str, + gatekeeper_peer = turn.ref(handle_gatekeeper), + **kwargs) + return prepare_resolution_handler class RelayEntity(actor.Entity): def __init__(self, relay, oid):