Further refinements

This commit is contained in:
Tony Garnock-Jones 2021-08-19 17:04:35 -04:00
parent 21aa08e75b
commit 026d1ec151
6 changed files with 89 additions and 73 deletions

67
chat.py
View File

@ -2,8 +2,8 @@ import sys
import asyncio
import random
import syndicate
from syndicate import patterns as P, actor, dataspace
from syndicate.schema import simpleChatProtocol, gatekeeper, sturdy
from syndicate import patterns as P, actor, dataspace, gatekeeper
from syndicate.schema import simpleChatProtocol, sturdy
from syndicate.during import During
Present = simpleChatProtocol.Present
@ -13,51 +13,30 @@ conn_str = '<ws "ws://localhost:8001/">'
cap_str = '<ref "syndicate" [] #[pkgN9TBmEd3Q04grVG4Zdw==]>'
cap = sturdy.SturdyRef.decode(syndicate.parse(cap_str))
# sys.stderr.write(
# 'Usage: chat.py [ <tcp "HOST" PORT> | <ws "ws://HOST[:PORT]/"> | <unix "PATH"> ]\n')
# sys.exit(1)
me = 'user_' + str(random.randint(10, 1000))
_print = print
def print(*items):
_print(*items)
sys.stdout.flush()
def main_facet(turn, root_facet, ds):
print('main_facet', ds)
f = turn._facet
turn.publish(ds, Present(me))
@dataspace.during(turn, ds, P.rec('Present', P.CAPTURE))
def on_presence(turn, who):
print('%s joined' % (who,))
return lambda turn: print('%s left' % (who,))
@dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE))
def on_says(turn, who, what):
print('%s says %r' % (who, what))
@turn.linked_task()
async def accept_input():
reader = asyncio.StreamReader()
await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
while line := (await reader.readline()).decode('utf-8'):
actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip())))
actor.Turn.external(f, lambda turn: turn.stop(root_facet))
def main(turn):
root_facet = turn._facet
@During().add_handler
def handle_gatekeeper(turn, gk):
@During().add_handler
def handle_ds(turn, ds):
return turn.facet(lambda turn: main_facet(turn, root_facet, ds.embeddedValue))
turn.publish(gk.embeddedValue, gatekeeper.Resolve(cap, turn.ref(handle_ds)))
@syndicate.relay.connect(turn, conn_str, cap)
def on_connected(turn, ds):
me = 'user_' + str(random.randint(10, 1000))
conn = syndicate.relay.TunnelRelay.from_str(turn,
conn_str,
gatekeeper_peer = turn.ref(handle_gatekeeper))
turn.publish(ds, Present(me))
@dataspace.during(turn, ds, P.rec('Present', P.CAPTURE), inert_ok=True)
def on_presence(turn, who):
print('%s joined' % (who,))
turn.on_stop(lambda turn: print('%s left' % (who,)))
@dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE))
def on_says(turn, who, what):
print('%s says %r' % (who, what))
@turn.linked_task()
async def accept_input(f):
reader = asyncio.StreamReader()
await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
while line := (await reader.readline()).decode('utf-8'):
actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip())))
actor.Turn.external(f, lambda turn: turn.stop(root_facet))
actor.start_actor_system(main, name = 'chat', debug = False)

View File

@ -101,14 +101,14 @@ class Actor:
queue_task(finish_termination)
class Facet:
def __init__(self, actor, parent, initial_assertions = {}):
def __init__(self, actor, parent, initial_assertions = None):
self.id = next(_next_facet_id)
self.actor = actor
self.parent = parent
if parent:
parent.children.add(self)
self.children = set()
self.outbound = initial_assertions
self.outbound = initial_assertions or {}
self.shutdown_actions = []
self.linked_tasks = []
self.alive = True
@ -154,7 +154,7 @@ class Facet:
self.inert_check_preventers = self.inert_check_preventers - 1
return disarm
def linked_task(self, coro, loop = None):
def linked_task(self, coro_fn, loop = None):
task = None
def cancel_linked_task(turn):
nonlocal task
@ -166,7 +166,7 @@ class Facet:
self.actor.cancel_at_exit(cancel_linked_task)
async def guarded_task():
try:
await coro
await coro_fn(self)
finally:
Turn.external(self, cancel_linked_task)
task = find_loop(loop).create_task(guarded_task())
@ -267,6 +267,7 @@ class Turn:
def ref(self, entity):
return Ref(self._facet, entity)
# this actually can work as a decorator as well as a normal method!
def facet(self, boot_proc):
new_facet = Facet(self._facet.actor, self._facet)
with ActiveFacet(self, new_facet):
@ -278,7 +279,7 @@ class Turn:
# decorator
def linked_task(self, loop = None):
return lambda thunk: self._facet.linked_task(thunk(), loop = loop)
return lambda coro_fn: self._facet.linked_task(coro_fn, loop = loop)
def stop(self, facet = None, continuation = None):
if facet is None:
@ -289,6 +290,10 @@ class Turn:
continuation(turn)
self._enqueue(facet.parent, action)
# can also be used as a decorator
def on_stop(self, a):
self._facet.on_stop(a)
def spawn(self, boot_proc, name = None, initial_assertions = None, daemon = False):
def action(turn):
new_outbound = {}
@ -404,6 +409,10 @@ class OutboundAssertion:
self.ref = ref
self.established = False
def __repr__(self):
return '<OutboundAssertion handle=%s ref=%r%s>' % \
(self.handle, self.ref, ' established' if self.established else '')
# Can act as a mixin
class Entity:
def on_publish(self, turn, v, handle):

View File

@ -9,9 +9,9 @@ def observe(turn, ds, pattern):
return publish_observer
# decorator
def on_message(turn, ds, pattern):
return lambda on_msg: observe(turn, ds, pattern)(During().msg_handler(on_msg))
def on_message(turn, ds, pattern, *args, **kwargs):
return lambda on_msg: observe(turn, ds, pattern)(During(*args, **kwargs).msg_handler(on_msg))
# decorator
def during(turn, ds, pattern):
return lambda on_add: observe(turn, ds, pattern)(During().add_handler(on_add))
def during(turn, ds, pattern, *args, **kwargs):
return lambda on_add: observe(turn, ds, pattern)(During(*args, **kwargs).add_handler(on_add))

View File

@ -7,12 +7,13 @@ def _default_sync(turn, peer):
turn.send(peer, True)
class During(actor.Entity):
def __init__(self, on_add=None, on_msg=None, on_sync=None, name=None):
self.retract_handlers = {}
def __init__(self, on_add=None, on_msg=None, on_sync=None, name=None, inert_ok=False):
self.facets = {}
self._on_add = on_add or _ignore
self._on_msg = on_msg or _ignore
self._on_sync = on_sync or _default_sync
self.name = name
self.inert_ok = inert_ok
self.flatten_arg = True
def __repr__(self):
@ -24,20 +25,15 @@ class During(actor.Entity):
return v if self.flatten_arg and isinstance(v, tuple) else (v,)
def on_publish(self, turn, v, handle):
retract_handler = self._on_add(turn, *self._wrap(v))
if retract_handler is not None:
if isinstance(retract_handler, actor.Facet):
self.retract_handlers[handle] = lambda turn: turn.stop(retract_handler)
elif callable(retract_handler):
self.retract_handlers[handle] = retract_handler
else:
raise ValueError('Non-callable retract_handler', {
'retract_handler': retract_handler,
'on_add': self._on_add,
})
facet = turn.facet(lambda turn: self._on_add(turn, *self._wrap(v)))
if self.inert_ok:
facet.prevent_inert_check()
self.facets[handle] = facet
def on_retract(self, turn, handle):
self.retract_handlers.pop(handle, lambda turn: ())(turn)
facet = self.facets.pop(handle, None)
if facet is not None:
turn.stop(facet)
def on_message(self, turn, v):
self._on_msg(turn, *self._wrap(v))

17
syndicate/gatekeeper.py Normal file
View File

@ -0,0 +1,17 @@
from .schema import gatekeeper
from .during import During
# decorator
def resolve(turn, gk, cap, *args, **kwargs):
def configure_handler(handler):
def unwrapping_handler(turn, wrapped_ref):
return handler(turn, wrapped_ref.embeddedValue)
return _resolve(turn, gk, cap)(During(*args, **kwargs).add_handler(unwrapping_handler))
return configure_handler
# decorator
def _resolve(turn, gk, cap):
def publish_resolution_request(entity):
turn.publish(gk, gatekeeper.Resolve(cap, turn.ref(entity)))
return entity
return publish_resolution_request

View File

@ -5,7 +5,8 @@ import logging
from preserves import Embedded, stringify
from preserves.fold import map_embeddeds
from . import actor, encode, transport, Decoder
from . import actor, encode, transport, Decoder, gatekeeper
from .during import During
from .actor import _inert_ref, Turn
from .idgen import IdGenerator
from .schema import externalProtocol as protocol, sturdy, transportAddress
@ -70,9 +71,10 @@ class TunnelRelay:
self.gatekeeper_peer = gatekeeper_peer
self.gatekeeper_oid = gatekeeper_oid
self._reset()
self.facet.linked_task(self._reconnecting_main(asyncio.get_running_loop(),
on_connected = on_connected,
on_disconnected = on_disconnected))
self.facet.linked_task(
lambda facet: self._reconnecting_main(asyncio.get_running_loop(),
on_connected = on_connected,
on_disconnected = on_disconnected))
def _reset(self):
self.inbound_assertions = {} # map remote handle to InboundAssertion
@ -237,8 +239,21 @@ class TunnelRelay:
should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect)
@staticmethod
def from_str(turn, s, **kwargs):
return transport.connection_from_str(turn, s, **kwargs)
def from_str(turn, conn_str, **kwargs):
return transport.connection_from_str(turn, conn_str, **kwargs)
# decorator
def connect(turn, conn_str, cap, **kwargs):
def prepare_resolution_handler(handler):
@During().add_handler
def handle_gatekeeper(turn, gk):
gatekeeper.resolve(turn, gk.embeddedValue, cap)(handler)
return transport.connection_from_str(
turn,
conn_str,
gatekeeper_peer = turn.ref(handle_gatekeeper),
**kwargs)
return prepare_resolution_handler
class RelayEntity(actor.Entity):
def __init__(self, relay, oid):