Use thread-local storage instead of an explicit turn argument

This commit is contained in:
Tony Garnock-Jones 2021-12-25 16:59:21 -05:00
parent 2e3376a783
commit 27b6b57661
12 changed files with 253 additions and 200 deletions

View File

@ -3,7 +3,7 @@ import argparse
import asyncio
import random
import syndicate
from syndicate import patterns as P, actor, dataspace, Record, Embedded
from syndicate import patterns as P, actor, dataspace, Record, Embedded, turn
from syndicate.during import Handler
from syndicate.schema import sturdy
@ -11,7 +11,7 @@ parser = argparse.ArgumentParser(description='Test bidirectional object referenc
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--address', metavar='\'<tcp "HOST" PORT>\'',
help='transport address of the server',
default='<ws "ws://localhost:8001/">')
default='<ws "ws://localhost:9001/">')
parser.add_argument('--cap', metavar='\'<ref ...>\'',
help='capability for the dataspace on the server',
default='<ref "syndicate" [] #[pkgN9TBmEd3Q04grVG4Zdw==]>')
@ -41,46 +41,65 @@ args = parser.parse_args()
#
# ----Three()--->
#
# Here's a trace from a live session of this running against syndicate-rs:
#
# B --> server: [[1, <assert <Boot #!⌜141/402:00007f3e50021ef0⌝> 3>]]
#
# A --> server: [[1, <assert <Observe <rec Boot [<bind <_>>]> #!⌜151/422:00007f3e50025090⌝> 3>]]
# A <-- server: [[1, <assert [#!⌜141/402:00007f3e50021ef0⌝] 633>]]
# A --> server: [[2, <assert <One #!⌜151/422:00007f3e5c009b00⌝> 5>]]
#
# B <-- server: [[1, <assert <One #!⌜151/422:00007f3e5c009b00⌝> 643>]]
# B --> server: [[1, <retract 3>], [2, <assert <Two> 5>]]
#
# A <-- server: [[2, <assert <Two> 653>]]
# A <-- server: [[1, <retract 633>]]
# A --> server: [[2, <message <Three>>]]
#
# B <-- server: [[1, <message <Three>>]]
#
Boot = Record.makeConstructor('Boot', 'b')
One = Record.makeConstructor('One', 'a')
Two = Record.makeConstructor('Two', '')
Three = Record.makeConstructor('Three', '')
@actor.run_system(name = 'bidi-gc', debug = False)
def main(turn):
root_facet = turn._facet
def main():
root_facet = turn.active_facet()
@syndicate.relay.connect(turn, args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)),
@syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)),
on_disconnected = lambda _relay, _did_connect: sys.exit(1))
def on_connected(turn, ds):
def on_connected(ds):
if args.start:
# We are "A".
@dataspace.observe(turn, ds, P.rec('Boot', P.CAPTURE))
@dataspace.observe(ds, P.rec('Boot', P.CAPTURE))
@Handler().add_handler
def on_b(turn, b):
def on_b(b):
print('A got B', b)
@Handler().add_handler
def a(turn, two):
def a(two):
print('A got assertion:', two)
turn.send(b.embeddedValue, Three())
def on_two_retracted(turn):
def on_two_retracted():
print('Assertion', two, 'from B went')
turn.retract(one_handle)
return on_two_retracted
one_handle = turn.publish(b.embeddedValue, One(Embedded(turn.ref(a))))
return lambda turn: print('B\'s Boot record went')
return lambda: print('B\'s Boot record went')
else:
# We are "B".
@Handler().add_handler
def b(turn, one):
def b(one):
print('B got assertion:', one)
print('boot_handle =', boot_handle)
turn.retract(boot_handle)
turn.publish(One._a(one).embeddedValue, Two())
return lambda turn: print('B facet stopping')
return lambda: print('B facet stopping')
@b.msg_handler
def b_msg(turn, three):
print('B got message: ', three)
def b_msg(three):
print('B got message:', three)
boot_handle = turn.publish(ds, Boot(Embedded(turn.ref(b))))

24
chat.py
View File

@ -3,7 +3,7 @@ import argparse
import asyncio
import random
import syndicate
from syndicate import patterns as P, actor, dataspace
from syndicate import patterns as P, actor, dataspace, turn
from syndicate.schema import simpleChatProtocol, sturdy
parser = argparse.ArgumentParser(description='Simple dataspace-server-mediated text chat.',
@ -20,22 +20,22 @@ Present = simpleChatProtocol.Present
Says = simpleChatProtocol.Says
@actor.run_system(name = 'chat', debug = False)
def main(turn):
root_facet = turn._facet
def main():
root_facet = turn.active_facet()
@syndicate.relay.connect(turn, args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
def on_connected(turn, ds):
@syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
def on_connected(ds):
me = 'user_' + str(random.randint(10, 1000))
turn.publish(ds, Present(me))
@dataspace.during(turn, ds, P.rec('Present', P.CAPTURE), inert_ok=True)
def on_presence(turn, who):
@dataspace.during(ds, P.rec('Present', P.CAPTURE), inert_ok=True)
def on_presence(who):
print('%s joined' % (who,))
turn.on_stop(lambda turn: print('%s left' % (who,)))
turn.on_stop(lambda: print('%s left' % (who,)))
@dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE))
def on_says(turn, who, what):
@dataspace.on_message(ds, P.rec('Says', P.CAPTURE, P.CAPTURE))
def on_says(who, what):
print('%s says %r' % (who, what))
@turn.linked_task()
@ -43,5 +43,5 @@ def main(turn):
reader = asyncio.StreamReader()
await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
while line := (await reader.readline()).decode('utf-8'):
actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip())))
actor.Turn.external(f, lambda turn: turn.stop(root_facet))
turn.external(f, lambda: turn.send(ds, Says(me, line.strip())))
turn.external(f, lambda: turn.stop(root_facet))

8
inf.py
View File

@ -1,9 +1,9 @@
from syndicate import relay
from syndicate import relay, Turn
from syndicate.during import During
import logging
@relay.service(name='inf', debug=True)
@During().add_handler
def main(turn, args):
logging.info(f'in main {turn}, {args}')
turn.on_stop(lambda turn: logging.info(f'args retracted {args}'))
def main(args):
logging.info(f'in main {args}')
Turn.active.on_stop(lambda: logging.info(f'args retracted {args}'))

View File

@ -1,32 +0,0 @@
import sys
import asyncio
import random
import threading
import syndicate.mini.core as S
OverlayLink = S.Record.makeConstructor('OverlayLink', 'downNode upNode')
conn = S.Connection.from_url(sys.argv[1])
uplinks = {}
def add_uplink(turn, src, tgt):
uplinks[src] = tgt
summarise_uplinks()
def del_uplink(turn, src, tgt):
del uplinks[src]
summarise_uplinks()
def summarise_uplinks():
print(repr(uplinks))
with conn.turn() as t:
with conn.actor().react(t) as facet:
facet.add(S.Observe(OverlayLink(S.CAPTURE, S.CAPTURE)),
on_add=add_uplink,
on_del=del_uplink)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(conn.reconnecting_main(loop))
loop.stop()
loop.run_forever()
loop.close()

View File

@ -3,10 +3,12 @@ import inspect
import logging
import sys
import traceback
import threading
from preserves import Embedded, preserve
from .idgen import IdGenerator
from .metapy import staticproperty
from .dataflow import Graph, Field
log = logging.getLogger(__name__)
@ -15,6 +17,9 @@ _next_actor_number = IdGenerator()
_next_handle = IdGenerator()
_next_facet_id = IdGenerator()
_active = threading.local()
_active.turn = None
# decorator
def run_system(**kwargs):
return lambda boot_proc: start_actor_system(boot_proc, **kwargs)
@ -96,19 +101,19 @@ class Actor:
def cancel_at_exit(self, hook):
remove_noerror(self.exit_hooks, hook)
def _repair_dataflow_graph(self, turn):
def _repair_dataflow_graph(self):
if self._dataflow_graph is not None:
self._dataflow_graph.repair_damage(lambda a: a(turn))
self._dataflow_graph.repair_damage(lambda a: a())
def _terminate(self, turn, exit_reason):
def _terminate(self, exit_reason):
if self.exit_reason is not None: return
self.log.debug('Terminating %r with exit_reason %r', self, exit_reason)
self.exit_reason = exit_reason
if exit_reason != True:
self.log.error('crashed: %s' % (exit_reason,))
for h in self.exit_hooks:
h(turn)
self.root._terminate(turn, exit_reason == True)
h()
self.root._terminate(exit_reason == True)
if not self._daemon:
adjust_engine_inhabitant_count(-1)
@ -122,6 +127,10 @@ class Actor:
return e
class Facet:
@staticproperty
def active():
return _active.turn._facet
def __init__(self, actor, parent, initial_handles=None):
self.id = next(_next_facet_id)
self.actor = actor
@ -184,7 +193,7 @@ class Facet:
def linked_task(self, coro_fn, loop = None):
task = None
def cancel_linked_task(turn):
def cancel_linked_task():
nonlocal task
if task is not None:
remove_noerror(self.linked_tasks, task)
@ -202,7 +211,7 @@ class Facet:
self.on_stop(cancel_linked_task)
self.actor.at_exit(cancel_linked_task)
def _terminate(self, turn, orderly):
def _terminate(self, orderly):
if not self.alive: return
self.log.debug('%s terminating %r', 'orderly' if orderly else 'disorderly', self)
self.alive = False
@ -211,13 +220,14 @@ class Facet:
if parent:
parent.children.remove(self)
with ActiveFacet(turn, self):
with ActiveFacet(self):
for child in list(self.children):
child._terminate(turn, orderly)
child._terminate(orderly)
if orderly:
with ActiveFacet(turn, self.parent or self):
with ActiveFacet(self.parent or self):
for h in self.shutdown_actions:
h(turn)
h()
turn = Turn.active
for h in self.handles:
# Optimization: don't clear from source facet, the source facet is us and we're
# about to clear our handles in one fell swoop.
@ -227,13 +237,13 @@ class Facet:
if orderly:
if parent:
if parent.isinert():
parent._terminate(turn, True)
parent._terminate(True)
else:
self.actor._terminate(turn, True)
self.actor._terminate(True)
class ActiveFacet:
def __init__(self, turn, facet):
self.turn = turn
def __init__(self, facet):
self.turn = Turn.active
self.outer_facet = None
self.inner_facet = facet
@ -266,6 +276,10 @@ def queue_task_threadsafe(thunk, loop = None):
return asyncio.run_coroutine_threadsafe(task(), find_loop(loop))
class Turn:
@staticproperty
def active():
return _active.turn
@classmethod
def run(cls, facet, action, zombie_turn = False):
if not zombie_turn:
@ -273,12 +287,17 @@ class Turn:
if not facet.alive: return
turn = cls(facet)
try:
action(turn)
facet.actor._repair_dataflow_graph(turn)
saved = Turn.active
_active.turn = turn
try:
action()
facet.actor._repair_dataflow_graph()
finally:
_active.turn = saved
except:
ei = sys.exc_info()
facet.log.error('%s', ''.join(traceback.format_exception(*ei)))
Turn.run(facet.actor.root, lambda turn: facet.actor._terminate(turn, ei[1]))
Turn.run(facet.actor.root, lambda: facet.actor._terminate(ei[1]))
else:
turn._deliver()
@ -300,8 +319,8 @@ class Turn:
# this actually can work as a decorator as well as a normal method!
def facet(self, boot_proc):
new_facet = Facet(self._facet.actor, self._facet)
with ActiveFacet(self, new_facet):
stop_if_inert_after(boot_proc)(self)
with ActiveFacet(new_facet):
stop_if_inert_after(boot_proc)()
return new_facet
def prevent_inert_check(self):
@ -319,14 +338,14 @@ class Turn:
else:
if continuation is not None:
facet.on_stop(continuation)
facet._terminate(self, True)
facet._terminate(True)
# can also be used as a decorator
def on_stop(self, a):
self._facet.on_stop(a)
def spawn(self, boot_proc, name = None, initial_handles = None, daemon = False):
def action(turn):
def action():
new_outbound = {}
if initial_handles is not None:
for handle in initial_handles:
@ -339,10 +358,10 @@ class Turn:
self._enqueue(self._facet, action)
def stop_actor(self):
self._enqueue(self._facet.actor.root, lambda turn: self._facet.actor._terminate(turn, True))
self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(True))
def crash(self, exn):
self._enqueue(self._facet.actor.root, lambda turn: self._facet.actor._terminate(turn, exn))
self._enqueue(self._facet.actor.root, lambda: self._facet.actor._terminate(exn))
def field(self, initial_value=None, name=None):
return Field(self._facet.actor.dataflow_graph, initial_value, name)
@ -351,16 +370,16 @@ class Turn:
def dataflow(self, a):
f = self._facet
f.prevent_inert_check()
def subject(turn):
def subject():
if not f.alive: return
with ActiveFacet(turn, f):
a(turn)
f.on_stop(lambda turn: f.actor.dataflow_graph.forget_subject(subject))
with ActiveFacet(f):
a()
f.on_stop(lambda: f.actor.dataflow_graph.forget_subject(subject))
f.actor.dataflow_graph.with_subject(subject, lambda: subject(self))
def publish_dataflow(self, assertion_function):
endpoint = DataflowPublication(assertion_function)
self.dataflow(lambda turn: endpoint.update(turn))
self.dataflow(lambda: endpoint.update())
def publish(self, ref, assertion):
handle = next(_next_handle)
@ -374,10 +393,10 @@ class Turn:
e = OutboundAssertion(facet, handle, ref)
facet.actor.outbound[handle] = e
facet.handles.add(handle)
def action(turn):
def action():
e.established = True
self.log.debug('%r <-- publish %r handle %r', ref, assertion, handle)
ref.entity.on_publish(turn, assertion, handle)
ref.entity.on_publish(assertion, handle)
self._enqueue(ref.facet, action)
def retract(self, handle):
@ -397,32 +416,32 @@ class Turn:
def _retract(self, e):
# Assumes e has already been removed from self._facet.actor.outbound and the
# appropriate set of handles
def action(turn):
def action():
if e.established:
e.established = False
self.log.debug('%r <-- retract handle %r', e.ref, e.handle)
e.ref.entity.on_retract(turn, e.handle)
e.ref.entity.on_retract(e.handle)
self._enqueue(e.ref.facet, action)
def sync(self, ref, k):
class SyncContinuation(Entity):
def on_message(self, turn, _value):
k(turn)
def on_message(self, _value):
k()
self._sync(ref, self.ref(SyncContinuation()))
def _sync(self, ref, peer):
peer = preserve(peer)
def action(turn):
def action():
self.log.debug('%r <-- sync peer %r', ref, peer)
ref.entity.on_sync(turn, peer)
ref.entity.on_sync(peer)
self._enqueue(ref.facet, action)
def send(self, ref, message):
# TODO: attenuation
message = preserve(message)
def action(turn):
def action():
self.log.debug('%r <-- message %r', ref, message)
ref.entity.on_message(turn, message)
ref.entity.on_message(message)
self._enqueue(ref.facet, action)
def _enqueue(self, target_facet, action):
@ -435,20 +454,22 @@ class Turn:
for (actor, q) in self.queues.items():
# Stupid python scoping bites again
def make_deliver_q(actor, q): # gratuitous
def deliver_q(turn):
def deliver_q():
turn = Turn.active
saved_facet = turn._facet
for (facet, action) in q:
turn._facet = facet
action(turn)
action()
turn._facet = saved_facet
return lambda: Turn.run(actor.root, deliver_q)
queue_task(make_deliver_q(actor, q))
self.queues = {}
def stop_if_inert_after(action):
def wrapped_action(turn):
action(turn)
def check_action(turn):
def wrapped_action():
turn = Turn.active
action()
def check_action():
if (turn._facet.parent is not None and not turn._facet.parent.alive) \
or turn._facet.isinert():
turn.stop()
@ -462,12 +483,12 @@ class DataflowPublication:
self.target = None
self.assertion = None
def update(self, turn):
(next_target, next_assertion) = self.assertion_function(turn) or (None, None)
def update(self):
(next_target, next_assertion) = self.assertion_function() or (None, None)
if next_target != self.target or next_assertion != self.assertion_function:
self.target = next_target
self.assertion = next_assertion
self.handle = turn.replace(self.target, self.handle, self.assertion)
self.handle = Turn.active.replace(self.target, self.handle, self.assertion)
class Ref:
def __init__(self, facet, entity):
@ -490,27 +511,27 @@ class OutboundAssertion:
# Can act as a mixin
class Entity:
def on_publish(self, turn, v, handle):
def on_publish(self, v, handle):
pass
def on_retract(self, turn, handle):
def on_retract(self, handle):
pass
def on_message(self, turn, v):
def on_message(self, v):
pass
def on_sync(self, turn, peer):
turn.send(peer, True)
def on_sync(self, peer):
Turn.active.send(peer, True)
_inert_actor = None
_inert_facet = None
_inert_ref = None
_inert_entity = Entity()
def __boot_inert(turn):
def __boot_inert():
global _inert_actor, _inert_facet, _inert_ref
_inert_actor = turn._facet.actor
_inert_facet = turn._facet
_inert_ref = turn.ref(_inert_entity)
_inert_actor = Turn.active._facet.actor
_inert_facet = Turn.active._facet
_inert_ref = Turn.active.ref(_inert_entity)
async def __run_inert():
Actor(__boot_inert, name = '_inert_actor')
asyncio.get_event_loop().run_until_complete(__run_inert())

View File

@ -1,17 +1,18 @@
from .schema import dataspace
from .during import During
from . import turn
# decorator
def observe(turn, ds, pattern):
def observe(ds, pattern):
def publish_observer(entity):
turn.publish(ds, dataspace.Observe(pattern, turn.ref(entity)))
return entity
return publish_observer
# decorator
def on_message(turn, ds, pattern, *args, **kwargs):
return lambda on_msg: observe(turn, ds, pattern)(During(*args, **kwargs).msg_handler(on_msg))
def on_message(ds, pattern, *args, **kwargs):
return lambda on_msg: observe(ds, pattern)(During(*args, **kwargs).msg_handler(on_msg))
# decorator
def during(turn, ds, pattern, *args, **kwargs):
return lambda on_add: observe(turn, ds, pattern)(During(*args, **kwargs).add_handler(on_add))
def during(ds, pattern, *args, **kwargs):
return lambda on_add: observe(ds, pattern)(During(*args, **kwargs).add_handler(on_add))

View File

@ -1,9 +1,9 @@
from . import actor
from . import turn, actor
def _ignore(*args, **kwargs):
pass
def _default_sync(turn, peer):
def _default_sync(peer):
turn.send(peer, True)
class Handler(actor.Entity):
@ -27,21 +27,21 @@ class Handler(actor.Entity):
def _wrap_add_handler(self, handler):
return handler
def on_publish(self, turn, v, handle):
retraction_handler = self._on_add(turn, *self._wrap(v))
def on_publish(self, v, handle):
retraction_handler = self._on_add(*self._wrap(v))
if retraction_handler is not None:
self.retraction_handlers[handle] = retraction_handler
def on_retract(self, turn, handle):
def on_retract(self, handle):
retraction_handler = self.retraction_handlers.pop(handle, None)
if retraction_handler is not None:
retraction_handler(turn)
retraction_handler()
def on_message(self, turn, v):
self._on_msg(turn, *self._wrap(v))
def on_message(self, v):
self._on_msg(*self._wrap(v))
def on_sync(self, turn, peer):
self._on_sync(turn, peer)
def on_sync(self, peer):
self._on_sync(peer)
# decorator
def add_handler(self, on_add):
@ -60,13 +60,13 @@ class Handler(actor.Entity):
class During(Handler):
def _wrap_add_handler(self, handler):
def facet_handler(turn, *args):
def facet_handler(*args):
@turn.facet
def facet(turn):
def facet():
if self.inert_ok:
turn.prevent_inert_check()
maybe_stop_action = handler(turn, *args)
maybe_stop_action = handler(*args)
if maybe_stop_action is not None:
turn.on_stop(maybe_stop_action)
return lambda turn: turn.stop(facet)
return lambda: turn.stop(facet)
return facet_handler

View File

@ -1,16 +1,17 @@
from .schema import gatekeeper
from .during import During
from . import turn
# decorator
def resolve(turn, gk, cap, *args, **kwargs):
def resolve(gk, cap, *args, **kwargs):
def configure_handler(handler):
def unwrapping_handler(turn, wrapped_ref):
return handler(turn, wrapped_ref.embeddedValue)
return _resolve(turn, gk, cap)(During(*args, **kwargs).add_handler(unwrapping_handler))
def unwrapping_handler(wrapped_ref):
return handler(wrapped_ref.embeddedValue)
return _resolve(gk, cap)(During(*args, **kwargs).add_handler(unwrapping_handler))
return configure_handler
# decorator
def _resolve(turn, gk, cap):
def _resolve(gk, cap):
def publish_resolution_request(entity):
turn.publish(gk, gatekeeper.Resolve(cap, turn.ref(entity)))
return entity

15
syndicate/metapy.py Normal file
View File

@ -0,0 +1,15 @@
class staticproperty:
"""For use as @staticproperty, like @property, but for static properties of classes.
Read-only for now."""
def __init__(self, getter):
self.getter = getter
def __get__(self, inst, cls=None):
return self.getter()
class classproperty:
"""For use as @classproperty, like @property, but for class-side properties of classes.
Read-only for now."""
def __init__(self, getter):
self.getter = getter
def __get__(self, inst, cls=None):
return self.getter(cls)

View File

@ -6,9 +6,9 @@ import logging
from preserves import Embedded, stringify
from preserves.fold import map_embeddeds
from . import actor, encode, transport, Decoder, gatekeeper
from . import actor, encode, transport, Decoder, gatekeeper, turn
from .during import During
from .actor import _inert_ref, Turn
from .actor import _inert_ref
from .idgen import IdGenerator
from .schema import protocol, sturdy, transportAddress
@ -66,7 +66,6 @@ def drop_all(wss):
# There are other kinds of relay. This one has exactly two participants connected to each other.
class TunnelRelay:
def __init__(self,
turn,
address,
gatekeeper_peer = None,
gatekeeper_oid = 0,
@ -75,7 +74,7 @@ class TunnelRelay:
on_connected = None,
on_disconnected = None,
):
self.facet = turn._facet
self.facet = turn.active_facet()
self.facet.on_stop(self._shutdown)
self.address = address
self.gatekeeper_peer = gatekeeper_peer
@ -108,7 +107,7 @@ class TunnelRelay:
def connected(self):
return self._connected
def _shutdown(self, turn):
def _shutdown(self):
self._disconnect()
def deregister(self, handle):
@ -143,13 +142,13 @@ class TunnelRelay:
self.exported_references))
return sturdy.WireRef.mine(sturdy.Oid(ws.oid))
def rewrite_in(self, turn, assertion, pins):
def rewrite_in(self, assertion, pins):
rewritten = map_embeddeds(
lambda wire_ref: Embedded(self.rewrite_ref_in(turn, wire_ref, pins)),
lambda wire_ref: Embedded(self.rewrite_ref_in(wire_ref, pins)),
assertion)
return rewritten
def rewrite_ref_in(self, turn, wire_ref, pins):
def rewrite_ref_in(self, wire_ref, pins):
if wire_ref.VARIANT.name == 'mine':
oid = wire_ref.oid.value
ws = self.imported_references.get_oid(
@ -166,59 +165,57 @@ class TunnelRelay:
def _on_disconnected(self):
self._connected = False
def retract_inbound(turn):
def retract_inbound():
for ia in self.inbound_assertions.values():
turn.retract(ia.local_handle)
if self.gatekeeper_handle is not None:
turn.retract(self.gatekeeper_handle)
self._reset()
Turn.run(self.facet, retract_inbound)
turn.run(self.facet, retract_inbound)
self._disconnect()
def _on_connected(self):
self._connected = True
if self.gatekeeper_peer is not None:
def connected_action(turn):
gk = self.rewrite_ref_in(turn,
sturdy.WireRef.mine(sturdy.Oid(self.gatekeeper_oid)),
[])
def connected_action():
gk = self.rewrite_ref_in(sturdy.WireRef.mine(sturdy.Oid(self.gatekeeper_oid)), [])
self.gatekeeper_handle = turn.publish(self.gatekeeper_peer, Embedded(gk))
Turn.run(self.facet, connected_action)
turn.run(self.facet, connected_action)
def _on_event(self, v):
Turn.run(self.facet, lambda turn: self._handle_event(turn, v))
turn.run(self.facet, lambda: self._handle_event(v))
def _handle_event(self, turn, v):
def _handle_event(self, v):
packet = protocol.Packet.decode(v)
variant = packet.VARIANT.name
if variant == 'Turn': self._handle_turn_events(turn, packet.value.value)
elif variant == 'Error': self._on_error(turn, packet.value.message, packet.value.detail)
if variant == 'Turn': self._handle_turn_events(packet.value.value)
elif variant == 'Error': self._on_error(packet.value.message, packet.value.detail)
def _on_error(self, turn, message, detail):
def _on_error(self, message, detail):
self.facet.log.error('Error from server: %r (detail: %r)', message, detail)
self._disconnect()
def _handle_turn_events(self, turn, events):
def _handle_turn_events(self, events):
for e in events:
pins = []
ref = self._lookup_exported_oid(e.oid.value, pins)
event = e.event
variant = event.VARIANT.name
if variant == 'Assert':
self._handle_publish(pins, turn, ref, event.value.assertion.value, event.value.handle.value)
self._handle_publish(pins, ref, event.value.assertion.value, event.value.handle.value)
elif variant == 'Retract':
self._handle_retract(pins, turn, ref, event.value.handle.value)
self._handle_retract(pins, ref, event.value.handle.value)
elif variant == 'Message':
self._handle_message(pins, turn, ref, event.value.body.value)
self._handle_message(pins, ref, event.value.body.value)
elif variant == 'Sync':
self._handle_sync(pins, turn, ref, event.value.peer)
self._handle_sync(pins, ref, event.value.peer)
def _handle_publish(self, pins, turn, ref, assertion, remote_handle):
assertion = self.rewrite_in(turn, assertion, pins)
def _handle_publish(self, pins, ref, assertion, remote_handle):
assertion = self.rewrite_in(assertion, pins)
self.inbound_assertions[remote_handle] = \
InboundAssertion(remote_handle, turn.publish(ref, assertion), pins)
def _handle_retract(self, pins, turn, ref, remote_handle):
def _handle_retract(self, pins, ref, remote_handle):
ia = self.inbound_assertions.pop(remote_handle, None)
if ia is None:
raise ValueError('Peer retracted invalid handle %s' % (remote_handle,))
@ -226,28 +223,28 @@ class TunnelRelay:
drop_all(pins)
turn.retract(ia.local_handle)
def _handle_message(self, pins, turn, ref, message):
message = self.rewrite_in(turn, message, pins)
def _handle_message(self, pins, ref, message):
message = self.rewrite_in(message, pins)
for ws in pins:
if ws.count == 1:
raise ValueError('Cannot receive transient reference')
turn.send(ref, message)
drop_all(pins)
def _handle_sync(self, pins, turn, ref, wire_peer):
peer = self.rewrite_ref_in(turn, wire_peer, pins)
def done(turn):
def _handle_sync(self, pins, ref, wire_peer):
peer = self.rewrite_ref_in(wire_peer, pins)
def done():
turn.send(peer, True)
drop_all(pins)
turn.sync(ref, done)
def _send(self, remote_oid, turn_event):
if len(self.pending_turn) == 0:
def flush_pending(turn):
def flush_pending():
packet = protocol.Packet.Turn(protocol.Turn(self.pending_turn))
self.pending_turn = []
self._send_bytes(encode(packet))
actor.queue_task(lambda: Turn.run(self.facet, flush_pending))
actor.queue_task(lambda: turn.run(self.facet, flush_pending))
self.pending_turn.append(protocol.TurnEvent(protocol.Oid(remote_oid), turn_event))
def _send_bytes(self, bs):
@ -263,17 +260,16 @@ class TunnelRelay:
should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect)
@staticmethod
def from_str(turn, conn_str, **kwargs):
return transport.connection_from_str(turn, conn_str, **kwargs)
def from_str(conn_str, **kwargs):
return transport.connection_from_str(conn_str, **kwargs)
# decorator
def connect(turn, conn_str, cap, **kwargs):
def connect(conn_str, cap, **kwargs):
def prepare_resolution_handler(handler):
@During().add_handler
def handle_gatekeeper(turn, gk):
gatekeeper.resolve(turn, gk.embeddedValue, cap)(handler)
def handle_gatekeeper(gk):
gatekeeper.resolve(gk.embeddedValue, cap)(handler)
return transport.connection_from_str(
turn,
conn_str,
gatekeeper_peer = turn.ref(handle_gatekeeper),
**kwargs)
@ -290,20 +286,20 @@ class RelayEntity(actor.Entity):
def _send(self, e):
self.relay._send(self.oid, e)
def on_publish(self, turn, assertion, handle):
def on_publish(self, assertion, handle):
self._send(protocol.Event.Assert(protocol.Assert(
protocol.Assertion(self.relay.register(self.oid, assertion, handle)),
protocol.Handle(handle))))
def on_retract(self, turn, handle):
def on_retract(self, handle):
self.relay.deregister(handle)
self._send(protocol.Event.Retract(protocol.Retract(protocol.Handle(handle))))
def on_message(self, turn, message):
def on_message(self, message):
self._send(protocol.Event.Message(protocol.Message(
protocol.Assertion(self.relay.register(self.oid, message, None)))))
def on_sync(self, turn, peer):
def on_sync(self, peer):
pins = []
self.relay.register_imported_oid(self.oid, pins)
entity = SyncPeerEntity(self.relay, peer, pins)
@ -316,7 +312,7 @@ class SyncPeerEntity(actor.Entity):
self.peer = peer
self.pins = pins
def on_message(self, turn, body):
def on_message(self, body):
drop_all(self.pins)
turn.send(self.peer, body)
@ -332,8 +328,8 @@ async def _default_on_disconnected(relay, did_connect):
return True
class _StreamTunnelRelay(TunnelRelay, asyncio.Protocol):
def __init__(self, turn, address, **kwargs):
super().__init__(turn, address, **kwargs)
def __init__(self, address, **kwargs):
super().__init__(address, **kwargs)
self.decoder = None
self.stop_signal = None
self.transport = None
@ -402,8 +398,8 @@ class UnixSocketTunnelRelay(_StreamTunnelRelay):
@transport.address(transportAddress.WebSocket)
class WebsocketTunnelRelay(TunnelRelay):
def __init__(self, turn, address, **kwargs):
super().__init__(turn, address, **kwargs)
def __init__(self, address, **kwargs):
super().__init__(address, **kwargs)
self.loop = None
self.ws = None
@ -457,8 +453,8 @@ class WebsocketTunnelRelay(TunnelRelay):
@transport.address(transportAddress.Stdio)
class PipeTunnelRelay(_StreamTunnelRelay):
def __init__(self, turn, address, input_fileobj = sys.stdin, output_fileobj = sys.stdout, **kwargs):
super().__init__(turn, address, **kwargs)
def __init__(self, address, input_fileobj = sys.stdin, output_fileobj = sys.stdout, **kwargs):
super().__init__(address, **kwargs)
self.input_fileobj = input_fileobj
self.output_fileobj = output_fileobj
self.reader = asyncio.StreamReader()
@ -470,10 +466,10 @@ class PipeTunnelRelay(_StreamTunnelRelay):
self.output_fileobj.buffer.write(bs)
self.output_fileobj.buffer.flush()
def run_stdio_service(turn, entity):
PipeTunnelRelay(turn, transportAddress.Stdio(), publish_service=turn.ref(entity))
def run_stdio_service(entity):
PipeTunnelRelay(transportAddress.Stdio(), publish_service=turn.ref(entity))
# decorator
def service(**kwargs):
return lambda entity: \
actor.start_actor_system(lambda turn: run_stdio_service(turn, entity), **kwargs)
actor.start_actor_system(lambda: run_stdio_service(entity), **kwargs)

View File

@ -11,10 +11,10 @@ def address(address_class):
return connection_factory_class
return k
def connection_from_str(turn, s, **kwargs):
def connection_from_str(s, **kwargs):
address = parse(s)
for (address_class, factory_class) in constructors.items():
decoded_address = address_class.try_decode(address)
if decoded_address is not None:
return factory_class(turn, decoded_address, **kwargs)
return factory_class(decoded_address, **kwargs)
raise InvalidTransportAddress('Invalid transport address', address)

32
syndicate/turn.py Normal file
View File

@ -0,0 +1,32 @@
from .actor import Turn
def __setup():
from .actor import _active
from types import FunctionType
import sys
mod = sys.modules[__name__]
def install_definition(name, definition):
def handler(*args, **kwargs):
return definition(_active.turn, *args, **kwargs)
setattr(mod, name, handler)
for (name, definition) in Turn.__dict__.items():
if name[0] == '_':
continue
elif type(definition) == FunctionType:
install_definition(name, definition)
else:
pass
__setup()
def run(facet, action):
Turn.run(facet, action)
def external(facet, action, loop=None):
Turn.external(facet, action, loop=loop)
def active_facet():
return Turn.active._facet