From 27b6b57661808602c4bc3eaf1b96c60a7899b5e9 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 25 Dec 2021 16:59:21 -0500 Subject: [PATCH] Use thread-local storage instead of an explicit turn argument --- bidi-gc.py | 49 ++++++++++----- chat.py | 24 ++++---- inf.py | 8 +-- ovlinfo.py | 32 ---------- syndicate/actor.py | 133 +++++++++++++++++++++++----------------- syndicate/dataspace.py | 11 ++-- syndicate/during.py | 28 ++++----- syndicate/gatekeeper.py | 11 ++-- syndicate/metapy.py | 15 +++++ syndicate/relay.py | 106 +++++++++++++++----------------- syndicate/transport.py | 4 +- syndicate/turn.py | 32 ++++++++++ 12 files changed, 253 insertions(+), 200 deletions(-) delete mode 100644 ovlinfo.py create mode 100644 syndicate/metapy.py create mode 100644 syndicate/turn.py diff --git a/bidi-gc.py b/bidi-gc.py index e8cf018..8418818 100644 --- a/bidi-gc.py +++ b/bidi-gc.py @@ -3,7 +3,7 @@ import argparse import asyncio import random import syndicate -from syndicate import patterns as P, actor, dataspace, Record, Embedded +from syndicate import patterns as P, actor, dataspace, Record, Embedded, turn from syndicate.during import Handler from syndicate.schema import sturdy @@ -11,7 +11,7 @@ parser = argparse.ArgumentParser(description='Test bidirectional object referenc formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--address', metavar='\'\'', help='transport address of the server', - default='') + default='') parser.add_argument('--cap', metavar='\'\'', help='capability for the dataspace on the server', default='') @@ -41,46 +41,65 @@ args = parser.parse_args() # # ----Three()---> +# +# Here's a trace from a live session of this running against syndicate-rs: +# +# B --> server: [[1, 3>]] +# +# A --> server: [[1, >]> #!⌜151/422:00007f3e50025090⌝> 3>]] +# A <-- server: [[1, ]] +# A --> server: [[2, 5>]] +# +# B <-- server: [[1, 643>]] +# B --> server: [[1, ], [2, 5>]] +# +# A <-- server: [[2, 653>]] +# A <-- server: [[1, ]] +# A --> server: [[2, >]] +# +# B <-- server: [[1, >]] +# + Boot = Record.makeConstructor('Boot', 'b') One = Record.makeConstructor('One', 'a') Two = Record.makeConstructor('Two', '') Three = Record.makeConstructor('Three', '') @actor.run_system(name = 'bidi-gc', debug = False) -def main(turn): - root_facet = turn._facet +def main(): + root_facet = turn.active_facet() - @syndicate.relay.connect(turn, args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)), + @syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)), on_disconnected = lambda _relay, _did_connect: sys.exit(1)) - def on_connected(turn, ds): + def on_connected(ds): if args.start: # We are "A". - @dataspace.observe(turn, ds, P.rec('Boot', P.CAPTURE)) + @dataspace.observe(ds, P.rec('Boot', P.CAPTURE)) @Handler().add_handler - def on_b(turn, b): + def on_b(b): print('A got B', b) @Handler().add_handler - def a(turn, two): + def a(two): print('A got assertion:', two) turn.send(b.embeddedValue, Three()) - def on_two_retracted(turn): + def on_two_retracted(): print('Assertion', two, 'from B went') turn.retract(one_handle) return on_two_retracted one_handle = turn.publish(b.embeddedValue, One(Embedded(turn.ref(a)))) - return lambda turn: print('B\'s Boot record went') + return lambda: print('B\'s Boot record went') else: # We are "B". @Handler().add_handler - def b(turn, one): + def b(one): print('B got assertion:', one) print('boot_handle =', boot_handle) turn.retract(boot_handle) turn.publish(One._a(one).embeddedValue, Two()) - return lambda turn: print('B facet stopping') + return lambda: print('B facet stopping') @b.msg_handler - def b_msg(turn, three): - print('B got message: ', three) + def b_msg(three): + print('B got message:', three) boot_handle = turn.publish(ds, Boot(Embedded(turn.ref(b)))) diff --git a/chat.py b/chat.py index a07a9ba..1f22397 100644 --- a/chat.py +++ b/chat.py @@ -3,7 +3,7 @@ import argparse import asyncio import random import syndicate -from syndicate import patterns as P, actor, dataspace +from syndicate import patterns as P, actor, dataspace, turn from syndicate.schema import simpleChatProtocol, sturdy parser = argparse.ArgumentParser(description='Simple dataspace-server-mediated text chat.', @@ -20,22 +20,22 @@ Present = simpleChatProtocol.Present Says = simpleChatProtocol.Says @actor.run_system(name = 'chat', debug = False) -def main(turn): - root_facet = turn._facet +def main(): + root_facet = turn.active_facet() - @syndicate.relay.connect(turn, args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap))) - def on_connected(turn, ds): + @syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap))) + def on_connected(ds): me = 'user_' + str(random.randint(10, 1000)) turn.publish(ds, Present(me)) - @dataspace.during(turn, ds, P.rec('Present', P.CAPTURE), inert_ok=True) - def on_presence(turn, who): + @dataspace.during(ds, P.rec('Present', P.CAPTURE), inert_ok=True) + def on_presence(who): print('%s joined' % (who,)) - turn.on_stop(lambda turn: print('%s left' % (who,))) + turn.on_stop(lambda: print('%s left' % (who,))) - @dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE)) - def on_says(turn, who, what): + @dataspace.on_message(ds, P.rec('Says', P.CAPTURE, P.CAPTURE)) + def on_says(who, what): print('%s says %r' % (who, what)) @turn.linked_task() @@ -43,5 +43,5 @@ def main(turn): reader = asyncio.StreamReader() await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin) while line := (await reader.readline()).decode('utf-8'): - actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip()))) - actor.Turn.external(f, lambda turn: turn.stop(root_facet)) + turn.external(f, lambda: turn.send(ds, Says(me, line.strip()))) + turn.external(f, lambda: turn.stop(root_facet)) diff --git a/inf.py b/inf.py index 2ed092b..e00db06 100644 --- a/inf.py +++ b/inf.py @@ -1,9 +1,9 @@ -from syndicate import relay +from syndicate import relay, Turn from syndicate.during import During import logging @relay.service(name='inf', debug=True) @During().add_handler -def main(turn, args): - logging.info(f'in main {turn}, {args}') - turn.on_stop(lambda turn: logging.info(f'args retracted {args}')) +def main(args): + logging.info(f'in main {args}') + Turn.active.on_stop(lambda: logging.info(f'args retracted {args}')) diff --git a/ovlinfo.py b/ovlinfo.py deleted file mode 100644 index f3e0c8c..0000000 --- a/ovlinfo.py +++ /dev/null @@ -1,32 +0,0 @@ -import sys -import asyncio -import random -import threading -import syndicate.mini.core as S - -OverlayLink = S.Record.makeConstructor('OverlayLink', 'downNode upNode') - -conn = S.Connection.from_url(sys.argv[1]) - -uplinks = {} -def add_uplink(turn, src, tgt): - uplinks[src] = tgt - summarise_uplinks() -def del_uplink(turn, src, tgt): - del uplinks[src] - summarise_uplinks() -def summarise_uplinks(): - print(repr(uplinks)) - -with conn.turn() as t: - with conn.actor().react(t) as facet: - facet.add(S.Observe(OverlayLink(S.CAPTURE, S.CAPTURE)), - on_add=add_uplink, - on_del=del_uplink) - -loop = asyncio.get_event_loop() -loop.set_debug(True) -loop.run_until_complete(conn.reconnecting_main(loop)) -loop.stop() -loop.run_forever() -loop.close() diff --git a/syndicate/actor.py b/syndicate/actor.py index f714817..36edca6 100644 --- a/syndicate/actor.py +++ b/syndicate/actor.py @@ -3,10 +3,12 @@ import inspect import logging import sys import traceback +import threading from preserves import Embedded, preserve from .idgen import IdGenerator +from .metapy import staticproperty from .dataflow import Graph, Field log = logging.getLogger(__name__) @@ -15,6 +17,9 @@ _next_actor_number = IdGenerator() _next_handle = IdGenerator() _next_facet_id = IdGenerator() +_active = threading.local() +_active.turn = None + # decorator def run_system(**kwargs): return lambda boot_proc: start_actor_system(boot_proc, **kwargs) @@ -96,19 +101,19 @@ class Actor: def cancel_at_exit(self, hook): remove_noerror(self.exit_hooks, hook) - def _repair_dataflow_graph(self, turn): + def _repair_dataflow_graph(self): if self._dataflow_graph is not None: - self._dataflow_graph.repair_damage(lambda a: a(turn)) + self._dataflow_graph.repair_damage(lambda a: a()) - def _terminate(self, turn, exit_reason): + def _terminate(self, exit_reason): if self.exit_reason is not None: return self.log.debug('Terminating %r with exit_reason %r', self, exit_reason) self.exit_reason = exit_reason if exit_reason != True: self.log.error('crashed: %s' % (exit_reason,)) for h in self.exit_hooks: - h(turn) - self.root._terminate(turn, exit_reason == True) + h() + self.root._terminate(exit_reason == True) if not self._daemon: adjust_engine_inhabitant_count(-1) @@ -122,6 +127,10 @@ class Actor: return e class Facet: + @staticproperty + def active(): + return _active.turn._facet + def __init__(self, actor, parent, initial_handles=None): self.id = next(_next_facet_id) self.actor = actor @@ -184,7 +193,7 @@ class Facet: def linked_task(self, coro_fn, loop = None): task = None - def cancel_linked_task(turn): + def cancel_linked_task(): nonlocal task if task is not None: remove_noerror(self.linked_tasks, task) @@ -202,7 +211,7 @@ class Facet: self.on_stop(cancel_linked_task) self.actor.at_exit(cancel_linked_task) - def _terminate(self, turn, orderly): + def _terminate(self, orderly): if not self.alive: return self.log.debug('%s terminating %r', 'orderly' if orderly else 'disorderly', self) self.alive = False @@ -211,13 +220,14 @@ class Facet: if parent: parent.children.remove(self) - with ActiveFacet(turn, self): + with ActiveFacet(self): for child in list(self.children): - child._terminate(turn, orderly) + child._terminate(orderly) if orderly: - with ActiveFacet(turn, self.parent or self): + with ActiveFacet(self.parent or self): for h in self.shutdown_actions: - h(turn) + h() + turn = Turn.active for h in self.handles: # Optimization: don't clear from source facet, the source facet is us and we're # about to clear our handles in one fell swoop. @@ -227,13 +237,13 @@ class Facet: if orderly: if parent: if parent.isinert(): - parent._terminate(turn, True) + parent._terminate(True) else: - self.actor._terminate(turn, True) + self.actor._terminate(True) class ActiveFacet: - def __init__(self, turn, facet): - self.turn = turn + def __init__(self, facet): + self.turn = Turn.active self.outer_facet = None self.inner_facet = facet @@ -266,6 +276,10 @@ def queue_task_threadsafe(thunk, loop = None): return asyncio.run_coroutine_threadsafe(task(), find_loop(loop)) class Turn: + @staticproperty + def active(): + return _active.turn + @classmethod def run(cls, facet, action, zombie_turn = False): if not zombie_turn: @@ -273,12 +287,17 @@ class Turn: if not facet.alive: return turn = cls(facet) try: - action(turn) - facet.actor._repair_dataflow_graph(turn) + saved = Turn.active + _active.turn = turn + try: + action() + facet.actor._repair_dataflow_graph() + finally: + _active.turn = saved except: ei = sys.exc_info() facet.log.error('%s', ''.join(traceback.format_exception(*ei))) - Turn.run(facet.actor.root, lambda turn: facet.actor._terminate(turn, ei[1])) + Turn.run(facet.actor.root, lambda: facet.actor._terminate(ei[1])) else: turn._deliver() @@ -300,8 +319,8 @@ class Turn: # this actually can work as a decorator as well as a normal method! def facet(self, boot_proc): new_facet = Facet(self._facet.actor, self._facet) - with ActiveFacet(self, new_facet): - stop_if_inert_after(boot_proc)(self) + with ActiveFacet(new_facet): + stop_if_inert_after(boot_proc)() return new_facet def prevent_inert_check(self): @@ -319,14 +338,14 @@ class Turn: else: if continuation is not None: facet.on_stop(continuation) - facet._terminate(self, True) + facet._terminate(True) # can also be used as a decorator def on_stop(self, a): self._facet.on_stop(a) def spawn(self, boot_proc, name = None, initial_handles = None, daemon = False): - def action(turn): + def action(): new_outbound = {} if initial_handles is not None: for handle in initial_handles: @@ -339,10 +358,10 @@ class Turn: self._enqueue(self._facet, action) def stop_actor(self): - self._enqueue(self._facet.actor.root, lambda turn: self._facet.actor._terminate(turn, True)) + self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(True)) def crash(self, exn): - self._enqueue(self._facet.actor.root, lambda turn: self._facet.actor._terminate(turn, exn)) + self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(exn)) def field(self, initial_value=None, name=None): return Field(self._facet.actor.dataflow_graph, initial_value, name) @@ -351,16 +370,16 @@ class Turn: def dataflow(self, a): f = self._facet f.prevent_inert_check() - def subject(turn): + def subject(): if not f.alive: return - with ActiveFacet(turn, f): - a(turn) - f.on_stop(lambda turn: f.actor.dataflow_graph.forget_subject(subject)) + with ActiveFacet(f): + a() + f.on_stop(lambda: f.actor.dataflow_graph.forget_subject(subject)) f.actor.dataflow_graph.with_subject(subject, lambda: subject(self)) def publish_dataflow(self, assertion_function): endpoint = DataflowPublication(assertion_function) - self.dataflow(lambda turn: endpoint.update(turn)) + self.dataflow(lambda: endpoint.update()) def publish(self, ref, assertion): handle = next(_next_handle) @@ -374,10 +393,10 @@ class Turn: e = OutboundAssertion(facet, handle, ref) facet.actor.outbound[handle] = e facet.handles.add(handle) - def action(turn): + def action(): e.established = True self.log.debug('%r <-- publish %r handle %r', ref, assertion, handle) - ref.entity.on_publish(turn, assertion, handle) + ref.entity.on_publish(assertion, handle) self._enqueue(ref.facet, action) def retract(self, handle): @@ -397,32 +416,32 @@ class Turn: def _retract(self, e): # Assumes e has already been removed from self._facet.actor.outbound and the # appropriate set of handles - def action(turn): + def action(): if e.established: e.established = False self.log.debug('%r <-- retract handle %r', e.ref, e.handle) - e.ref.entity.on_retract(turn, e.handle) + e.ref.entity.on_retract(e.handle) self._enqueue(e.ref.facet, action) def sync(self, ref, k): class SyncContinuation(Entity): - def on_message(self, turn, _value): - k(turn) + def on_message(self, _value): + k() self._sync(ref, self.ref(SyncContinuation())) def _sync(self, ref, peer): peer = preserve(peer) - def action(turn): + def action(): self.log.debug('%r <-- sync peer %r', ref, peer) - ref.entity.on_sync(turn, peer) + ref.entity.on_sync(peer) self._enqueue(ref.facet, action) def send(self, ref, message): # TODO: attenuation message = preserve(message) - def action(turn): + def action(): self.log.debug('%r <-- message %r', ref, message) - ref.entity.on_message(turn, message) + ref.entity.on_message(message) self._enqueue(ref.facet, action) def _enqueue(self, target_facet, action): @@ -435,20 +454,22 @@ class Turn: for (actor, q) in self.queues.items(): # Stupid python scoping bites again def make_deliver_q(actor, q): # gratuitous - def deliver_q(turn): + def deliver_q(): + turn = Turn.active saved_facet = turn._facet for (facet, action) in q: turn._facet = facet - action(turn) + action() turn._facet = saved_facet return lambda: Turn.run(actor.root, deliver_q) queue_task(make_deliver_q(actor, q)) self.queues = {} def stop_if_inert_after(action): - def wrapped_action(turn): - action(turn) - def check_action(turn): + def wrapped_action(): + turn = Turn.active + action() + def check_action(): if (turn._facet.parent is not None and not turn._facet.parent.alive) \ or turn._facet.isinert(): turn.stop() @@ -462,12 +483,12 @@ class DataflowPublication: self.target = None self.assertion = None - def update(self, turn): - (next_target, next_assertion) = self.assertion_function(turn) or (None, None) + def update(self): + (next_target, next_assertion) = self.assertion_function() or (None, None) if next_target != self.target or next_assertion != self.assertion_function: self.target = next_target self.assertion = next_assertion - self.handle = turn.replace(self.target, self.handle, self.assertion) + self.handle = Turn.active.replace(self.target, self.handle, self.assertion) class Ref: def __init__(self, facet, entity): @@ -490,27 +511,27 @@ class OutboundAssertion: # Can act as a mixin class Entity: - def on_publish(self, turn, v, handle): + def on_publish(self, v, handle): pass - def on_retract(self, turn, handle): + def on_retract(self, handle): pass - def on_message(self, turn, v): + def on_message(self, v): pass - def on_sync(self, turn, peer): - turn.send(peer, True) + def on_sync(self, peer): + Turn.active.send(peer, True) _inert_actor = None _inert_facet = None _inert_ref = None _inert_entity = Entity() -def __boot_inert(turn): +def __boot_inert(): global _inert_actor, _inert_facet, _inert_ref - _inert_actor = turn._facet.actor - _inert_facet = turn._facet - _inert_ref = turn.ref(_inert_entity) + _inert_actor = Turn.active._facet.actor + _inert_facet = Turn.active._facet + _inert_ref = Turn.active.ref(_inert_entity) async def __run_inert(): Actor(__boot_inert, name = '_inert_actor') asyncio.get_event_loop().run_until_complete(__run_inert()) diff --git a/syndicate/dataspace.py b/syndicate/dataspace.py index 6bfa461..52e4e69 100644 --- a/syndicate/dataspace.py +++ b/syndicate/dataspace.py @@ -1,17 +1,18 @@ from .schema import dataspace from .during import During +from . import turn # decorator -def observe(turn, ds, pattern): +def observe(ds, pattern): def publish_observer(entity): turn.publish(ds, dataspace.Observe(pattern, turn.ref(entity))) return entity return publish_observer # decorator -def on_message(turn, ds, pattern, *args, **kwargs): - return lambda on_msg: observe(turn, ds, pattern)(During(*args, **kwargs).msg_handler(on_msg)) +def on_message(ds, pattern, *args, **kwargs): + return lambda on_msg: observe(ds, pattern)(During(*args, **kwargs).msg_handler(on_msg)) # decorator -def during(turn, ds, pattern, *args, **kwargs): - return lambda on_add: observe(turn, ds, pattern)(During(*args, **kwargs).add_handler(on_add)) +def during(ds, pattern, *args, **kwargs): + return lambda on_add: observe(ds, pattern)(During(*args, **kwargs).add_handler(on_add)) diff --git a/syndicate/during.py b/syndicate/during.py index 9c1ada1..c0bd7ac 100644 --- a/syndicate/during.py +++ b/syndicate/during.py @@ -1,9 +1,9 @@ -from . import actor +from . import turn, actor def _ignore(*args, **kwargs): pass -def _default_sync(turn, peer): +def _default_sync(peer): turn.send(peer, True) class Handler(actor.Entity): @@ -27,21 +27,21 @@ class Handler(actor.Entity): def _wrap_add_handler(self, handler): return handler - def on_publish(self, turn, v, handle): - retraction_handler = self._on_add(turn, *self._wrap(v)) + def on_publish(self, v, handle): + retraction_handler = self._on_add(*self._wrap(v)) if retraction_handler is not None: self.retraction_handlers[handle] = retraction_handler - def on_retract(self, turn, handle): + def on_retract(self, handle): retraction_handler = self.retraction_handlers.pop(handle, None) if retraction_handler is not None: - retraction_handler(turn) + retraction_handler() - def on_message(self, turn, v): - self._on_msg(turn, *self._wrap(v)) + def on_message(self, v): + self._on_msg(*self._wrap(v)) - def on_sync(self, turn, peer): - self._on_sync(turn, peer) + def on_sync(self, peer): + self._on_sync(peer) # decorator def add_handler(self, on_add): @@ -60,13 +60,13 @@ class Handler(actor.Entity): class During(Handler): def _wrap_add_handler(self, handler): - def facet_handler(turn, *args): + def facet_handler(*args): @turn.facet - def facet(turn): + def facet(): if self.inert_ok: turn.prevent_inert_check() - maybe_stop_action = handler(turn, *args) + maybe_stop_action = handler(*args) if maybe_stop_action is not None: turn.on_stop(maybe_stop_action) - return lambda turn: turn.stop(facet) + return lambda: turn.stop(facet) return facet_handler diff --git a/syndicate/gatekeeper.py b/syndicate/gatekeeper.py index 961c8e5..37fdda2 100644 --- a/syndicate/gatekeeper.py +++ b/syndicate/gatekeeper.py @@ -1,16 +1,17 @@ from .schema import gatekeeper from .during import During +from . import turn # decorator -def resolve(turn, gk, cap, *args, **kwargs): +def resolve(gk, cap, *args, **kwargs): def configure_handler(handler): - def unwrapping_handler(turn, wrapped_ref): - return handler(turn, wrapped_ref.embeddedValue) - return _resolve(turn, gk, cap)(During(*args, **kwargs).add_handler(unwrapping_handler)) + def unwrapping_handler(wrapped_ref): + return handler(wrapped_ref.embeddedValue) + return _resolve(gk, cap)(During(*args, **kwargs).add_handler(unwrapping_handler)) return configure_handler # decorator -def _resolve(turn, gk, cap): +def _resolve(gk, cap): def publish_resolution_request(entity): turn.publish(gk, gatekeeper.Resolve(cap, turn.ref(entity))) return entity diff --git a/syndicate/metapy.py b/syndicate/metapy.py new file mode 100644 index 0000000..1b7ee0d --- /dev/null +++ b/syndicate/metapy.py @@ -0,0 +1,15 @@ +class staticproperty: + """For use as @staticproperty, like @property, but for static properties of classes. + Read-only for now.""" + def __init__(self, getter): + self.getter = getter + def __get__(self, inst, cls=None): + return self.getter() + +class classproperty: + """For use as @classproperty, like @property, but for class-side properties of classes. + Read-only for now.""" + def __init__(self, getter): + self.getter = getter + def __get__(self, inst, cls=None): + return self.getter(cls) diff --git a/syndicate/relay.py b/syndicate/relay.py index f87d75f..ff0388b 100644 --- a/syndicate/relay.py +++ b/syndicate/relay.py @@ -6,9 +6,9 @@ import logging from preserves import Embedded, stringify from preserves.fold import map_embeddeds -from . import actor, encode, transport, Decoder, gatekeeper +from . import actor, encode, transport, Decoder, gatekeeper, turn from .during import During -from .actor import _inert_ref, Turn +from .actor import _inert_ref from .idgen import IdGenerator from .schema import protocol, sturdy, transportAddress @@ -66,7 +66,6 @@ def drop_all(wss): # There are other kinds of relay. This one has exactly two participants connected to each other. class TunnelRelay: def __init__(self, - turn, address, gatekeeper_peer = None, gatekeeper_oid = 0, @@ -75,7 +74,7 @@ class TunnelRelay: on_connected = None, on_disconnected = None, ): - self.facet = turn._facet + self.facet = turn.active_facet() self.facet.on_stop(self._shutdown) self.address = address self.gatekeeper_peer = gatekeeper_peer @@ -108,7 +107,7 @@ class TunnelRelay: def connected(self): return self._connected - def _shutdown(self, turn): + def _shutdown(self): self._disconnect() def deregister(self, handle): @@ -143,13 +142,13 @@ class TunnelRelay: self.exported_references)) return sturdy.WireRef.mine(sturdy.Oid(ws.oid)) - def rewrite_in(self, turn, assertion, pins): + def rewrite_in(self, assertion, pins): rewritten = map_embeddeds( - lambda wire_ref: Embedded(self.rewrite_ref_in(turn, wire_ref, pins)), + lambda wire_ref: Embedded(self.rewrite_ref_in(wire_ref, pins)), assertion) return rewritten - def rewrite_ref_in(self, turn, wire_ref, pins): + def rewrite_ref_in(self, wire_ref, pins): if wire_ref.VARIANT.name == 'mine': oid = wire_ref.oid.value ws = self.imported_references.get_oid( @@ -166,59 +165,57 @@ class TunnelRelay: def _on_disconnected(self): self._connected = False - def retract_inbound(turn): + def retract_inbound(): for ia in self.inbound_assertions.values(): turn.retract(ia.local_handle) if self.gatekeeper_handle is not None: turn.retract(self.gatekeeper_handle) self._reset() - Turn.run(self.facet, retract_inbound) + turn.run(self.facet, retract_inbound) self._disconnect() def _on_connected(self): self._connected = True if self.gatekeeper_peer is not None: - def connected_action(turn): - gk = self.rewrite_ref_in(turn, - sturdy.WireRef.mine(sturdy.Oid(self.gatekeeper_oid)), - []) + def connected_action(): + gk = self.rewrite_ref_in(sturdy.WireRef.mine(sturdy.Oid(self.gatekeeper_oid)), []) self.gatekeeper_handle = turn.publish(self.gatekeeper_peer, Embedded(gk)) - Turn.run(self.facet, connected_action) + turn.run(self.facet, connected_action) def _on_event(self, v): - Turn.run(self.facet, lambda turn: self._handle_event(turn, v)) + turn.run(self.facet, lambda: self._handle_event(v)) - def _handle_event(self, turn, v): + def _handle_event(self, v): packet = protocol.Packet.decode(v) variant = packet.VARIANT.name - if variant == 'Turn': self._handle_turn_events(turn, packet.value.value) - elif variant == 'Error': self._on_error(turn, packet.value.message, packet.value.detail) + if variant == 'Turn': self._handle_turn_events(packet.value.value) + elif variant == 'Error': self._on_error(packet.value.message, packet.value.detail) - def _on_error(self, turn, message, detail): + def _on_error(self, message, detail): self.facet.log.error('Error from server: %r (detail: %r)', message, detail) self._disconnect() - def _handle_turn_events(self, turn, events): + def _handle_turn_events(self, events): for e in events: pins = [] ref = self._lookup_exported_oid(e.oid.value, pins) event = e.event variant = event.VARIANT.name if variant == 'Assert': - self._handle_publish(pins, turn, ref, event.value.assertion.value, event.value.handle.value) + self._handle_publish(pins, ref, event.value.assertion.value, event.value.handle.value) elif variant == 'Retract': - self._handle_retract(pins, turn, ref, event.value.handle.value) + self._handle_retract(pins, ref, event.value.handle.value) elif variant == 'Message': - self._handle_message(pins, turn, ref, event.value.body.value) + self._handle_message(pins, ref, event.value.body.value) elif variant == 'Sync': - self._handle_sync(pins, turn, ref, event.value.peer) + self._handle_sync(pins, ref, event.value.peer) - def _handle_publish(self, pins, turn, ref, assertion, remote_handle): - assertion = self.rewrite_in(turn, assertion, pins) + def _handle_publish(self, pins, ref, assertion, remote_handle): + assertion = self.rewrite_in(assertion, pins) self.inbound_assertions[remote_handle] = \ InboundAssertion(remote_handle, turn.publish(ref, assertion), pins) - def _handle_retract(self, pins, turn, ref, remote_handle): + def _handle_retract(self, pins, ref, remote_handle): ia = self.inbound_assertions.pop(remote_handle, None) if ia is None: raise ValueError('Peer retracted invalid handle %s' % (remote_handle,)) @@ -226,28 +223,28 @@ class TunnelRelay: drop_all(pins) turn.retract(ia.local_handle) - def _handle_message(self, pins, turn, ref, message): - message = self.rewrite_in(turn, message, pins) + def _handle_message(self, pins, ref, message): + message = self.rewrite_in(message, pins) for ws in pins: if ws.count == 1: raise ValueError('Cannot receive transient reference') turn.send(ref, message) drop_all(pins) - def _handle_sync(self, pins, turn, ref, wire_peer): - peer = self.rewrite_ref_in(turn, wire_peer, pins) - def done(turn): + def _handle_sync(self, pins, ref, wire_peer): + peer = self.rewrite_ref_in(wire_peer, pins) + def done(): turn.send(peer, True) drop_all(pins) turn.sync(ref, done) def _send(self, remote_oid, turn_event): if len(self.pending_turn) == 0: - def flush_pending(turn): + def flush_pending(): packet = protocol.Packet.Turn(protocol.Turn(self.pending_turn)) self.pending_turn = [] self._send_bytes(encode(packet)) - actor.queue_task(lambda: Turn.run(self.facet, flush_pending)) + actor.queue_task(lambda: turn.run(self.facet, flush_pending)) self.pending_turn.append(protocol.TurnEvent(protocol.Oid(remote_oid), turn_event)) def _send_bytes(self, bs): @@ -263,17 +260,16 @@ class TunnelRelay: should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect) @staticmethod - def from_str(turn, conn_str, **kwargs): - return transport.connection_from_str(turn, conn_str, **kwargs) + def from_str(conn_str, **kwargs): + return transport.connection_from_str(conn_str, **kwargs) # decorator -def connect(turn, conn_str, cap, **kwargs): +def connect(conn_str, cap, **kwargs): def prepare_resolution_handler(handler): @During().add_handler - def handle_gatekeeper(turn, gk): - gatekeeper.resolve(turn, gk.embeddedValue, cap)(handler) + def handle_gatekeeper(gk): + gatekeeper.resolve(gk.embeddedValue, cap)(handler) return transport.connection_from_str( - turn, conn_str, gatekeeper_peer = turn.ref(handle_gatekeeper), **kwargs) @@ -290,20 +286,20 @@ class RelayEntity(actor.Entity): def _send(self, e): self.relay._send(self.oid, e) - def on_publish(self, turn, assertion, handle): + def on_publish(self, assertion, handle): self._send(protocol.Event.Assert(protocol.Assert( protocol.Assertion(self.relay.register(self.oid, assertion, handle)), protocol.Handle(handle)))) - def on_retract(self, turn, handle): + def on_retract(self, handle): self.relay.deregister(handle) self._send(protocol.Event.Retract(protocol.Retract(protocol.Handle(handle)))) - def on_message(self, turn, message): + def on_message(self, message): self._send(protocol.Event.Message(protocol.Message( protocol.Assertion(self.relay.register(self.oid, message, None))))) - def on_sync(self, turn, peer): + def on_sync(self, peer): pins = [] self.relay.register_imported_oid(self.oid, pins) entity = SyncPeerEntity(self.relay, peer, pins) @@ -316,7 +312,7 @@ class SyncPeerEntity(actor.Entity): self.peer = peer self.pins = pins - def on_message(self, turn, body): + def on_message(self, body): drop_all(self.pins) turn.send(self.peer, body) @@ -332,8 +328,8 @@ async def _default_on_disconnected(relay, did_connect): return True class _StreamTunnelRelay(TunnelRelay, asyncio.Protocol): - def __init__(self, turn, address, **kwargs): - super().__init__(turn, address, **kwargs) + def __init__(self, address, **kwargs): + super().__init__(address, **kwargs) self.decoder = None self.stop_signal = None self.transport = None @@ -402,8 +398,8 @@ class UnixSocketTunnelRelay(_StreamTunnelRelay): @transport.address(transportAddress.WebSocket) class WebsocketTunnelRelay(TunnelRelay): - def __init__(self, turn, address, **kwargs): - super().__init__(turn, address, **kwargs) + def __init__(self, address, **kwargs): + super().__init__(address, **kwargs) self.loop = None self.ws = None @@ -457,8 +453,8 @@ class WebsocketTunnelRelay(TunnelRelay): @transport.address(transportAddress.Stdio) class PipeTunnelRelay(_StreamTunnelRelay): - def __init__(self, turn, address, input_fileobj = sys.stdin, output_fileobj = sys.stdout, **kwargs): - super().__init__(turn, address, **kwargs) + def __init__(self, address, input_fileobj = sys.stdin, output_fileobj = sys.stdout, **kwargs): + super().__init__(address, **kwargs) self.input_fileobj = input_fileobj self.output_fileobj = output_fileobj self.reader = asyncio.StreamReader() @@ -470,10 +466,10 @@ class PipeTunnelRelay(_StreamTunnelRelay): self.output_fileobj.buffer.write(bs) self.output_fileobj.buffer.flush() -def run_stdio_service(turn, entity): - PipeTunnelRelay(turn, transportAddress.Stdio(), publish_service=turn.ref(entity)) +def run_stdio_service(entity): + PipeTunnelRelay(transportAddress.Stdio(), publish_service=turn.ref(entity)) # decorator def service(**kwargs): return lambda entity: \ - actor.start_actor_system(lambda turn: run_stdio_service(turn, entity), **kwargs) + actor.start_actor_system(lambda: run_stdio_service(entity), **kwargs) diff --git a/syndicate/transport.py b/syndicate/transport.py index 3373ee2..2015fe4 100644 --- a/syndicate/transport.py +++ b/syndicate/transport.py @@ -11,10 +11,10 @@ def address(address_class): return connection_factory_class return k -def connection_from_str(turn, s, **kwargs): +def connection_from_str(s, **kwargs): address = parse(s) for (address_class, factory_class) in constructors.items(): decoded_address = address_class.try_decode(address) if decoded_address is not None: - return factory_class(turn, decoded_address, **kwargs) + return factory_class(decoded_address, **kwargs) raise InvalidTransportAddress('Invalid transport address', address) diff --git a/syndicate/turn.py b/syndicate/turn.py new file mode 100644 index 0000000..96af4bd --- /dev/null +++ b/syndicate/turn.py @@ -0,0 +1,32 @@ +from .actor import Turn + +def __setup(): + from .actor import _active + from types import FunctionType + import sys + + mod = sys.modules[__name__] + + def install_definition(name, definition): + def handler(*args, **kwargs): + return definition(_active.turn, *args, **kwargs) + setattr(mod, name, handler) + + for (name, definition) in Turn.__dict__.items(): + if name[0] == '_': + continue + elif type(definition) == FunctionType: + install_definition(name, definition) + else: + pass + +__setup() + +def run(facet, action): + Turn.run(facet, action) + +def external(facet, action, loop=None): + Turn.external(facet, action, loop=loop) + +def active_facet(): + return Turn.active._facet