Compare commits
4 Commits
fffcd025ad
...
4821f32e8b
Author | SHA1 | Date |
---|---|---|
Tony Garnock-Jones | 4821f32e8b | |
Tony Garnock-Jones | d6dc75e41d | |
Tony Garnock-Jones | 03522c32ce | |
Tony Garnock-Jones | f71b8b0790 |
|
@ -0,0 +1,83 @@
|
||||||
|
import sys
|
||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import random
|
||||||
|
import syndicate
|
||||||
|
from syndicate import patterns as P, actor, dataspace, Record, Embedded
|
||||||
|
from syndicate.during import Handler
|
||||||
|
from syndicate.schema import sturdy
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description='Test bidirectional object reference GC.',
|
||||||
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
|
parser.add_argument('--address', metavar='\'<tcp "HOST" PORT>\'',
|
||||||
|
help='transport address of the server',
|
||||||
|
default='<ws "ws://localhost:8001/">')
|
||||||
|
parser.add_argument('--cap', metavar='\'<ref ...>\'',
|
||||||
|
help='capability for the dataspace on the server',
|
||||||
|
default='<ref "syndicate" [] #[pkgN9TBmEd3Q04grVG4Zdw==]>')
|
||||||
|
parser.add_argument('--start',
|
||||||
|
help='make this instance kick off the procedure',
|
||||||
|
action='store_true')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# A B DS
|
||||||
|
# ----- ----- ------
|
||||||
|
#
|
||||||
|
# ---1:Boot(b)---o
|
||||||
|
#
|
||||||
|
# ------2:Observe(Boot($))-----o
|
||||||
|
# o----------3:[b]--------------
|
||||||
|
#
|
||||||
|
# ---4:One(a)---o
|
||||||
|
#
|
||||||
|
# -------1-------x
|
||||||
|
# x------------3----------------
|
||||||
|
#
|
||||||
|
# (At this point, B has no outgoing
|
||||||
|
# assertions, but has one incoming
|
||||||
|
# assertion.)
|
||||||
|
#
|
||||||
|
# o---5:Two()----
|
||||||
|
#
|
||||||
|
# ----Three()--->
|
||||||
|
|
||||||
|
Boot = Record.makeConstructor('Boot', 'b')
|
||||||
|
One = Record.makeConstructor('One', 'a')
|
||||||
|
Two = Record.makeConstructor('Two', '')
|
||||||
|
Three = Record.makeConstructor('Three', '')
|
||||||
|
|
||||||
|
@actor.run_system(name = 'bidi-gc', debug = False)
|
||||||
|
def main(turn):
|
||||||
|
root_facet = turn._facet
|
||||||
|
|
||||||
|
@syndicate.relay.connect(turn, args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)),
|
||||||
|
on_disconnected = lambda _relay, _did_connect: sys.exit(1))
|
||||||
|
def on_connected(turn, ds):
|
||||||
|
if args.start:
|
||||||
|
# We are "A".
|
||||||
|
|
||||||
|
@dataspace.observe(turn, ds, P.rec('Boot', P.CAPTURE))
|
||||||
|
@Handler().add_handler
|
||||||
|
def on_b(turn, b):
|
||||||
|
print('A got B', b)
|
||||||
|
@Handler().add_handler
|
||||||
|
def a(turn, two):
|
||||||
|
print('A got assertion:', two)
|
||||||
|
turn.send(b.embeddedValue, Three())
|
||||||
|
return lambda turn: print('Assertion', two, 'from B went')
|
||||||
|
turn.publish(b.embeddedValue, One(Embedded(turn.ref(a))))
|
||||||
|
return lambda turn: print('B\'s Boot record went')
|
||||||
|
else:
|
||||||
|
# We are "B".
|
||||||
|
|
||||||
|
@Handler().add_handler
|
||||||
|
def b(turn, one):
|
||||||
|
print('B got assertion:', one)
|
||||||
|
print('boot_handle =', boot_handle)
|
||||||
|
turn.retract(boot_handle)
|
||||||
|
turn.publish(One._a(one).embeddedValue, Two())
|
||||||
|
return lambda turn: print('B facet stopping')
|
||||||
|
@b.msg_handler
|
||||||
|
def b_msg(turn, three):
|
||||||
|
print('B got message: ', three)
|
||||||
|
boot_handle = turn.publish(ds, Boot(Embedded(turn.ref(b))))
|
|
@ -53,11 +53,11 @@ class Actor:
|
||||||
if not daemon:
|
if not daemon:
|
||||||
adjust_engine_inhabitant_count(1)
|
adjust_engine_inhabitant_count(1)
|
||||||
self.root = Facet(self, None)
|
self.root = Facet(self, None)
|
||||||
|
self.outbound = initial_assertions or {}
|
||||||
self.exit_reason = None # None -> running, True -> terminated OK, exn -> error
|
self.exit_reason = None # None -> running, True -> terminated OK, exn -> error
|
||||||
self.exit_hooks = []
|
self.exit_hooks = []
|
||||||
self._log = None
|
self._log = None
|
||||||
Turn.run(Facet(self, self.root, initial_assertions = initial_assertions),
|
Turn.run(Facet(self, self.root, set(self.outbound.keys())), stop_if_inert_after(boot_proc))
|
||||||
stop_if_inert_after(boot_proc))
|
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<Actor:%s>' % (self.name,)
|
return '<Actor:%s>' % (self.name,)
|
||||||
|
@ -104,15 +104,24 @@ class Actor:
|
||||||
adjust_engine_inhabitant_count(-1)
|
adjust_engine_inhabitant_count(-1)
|
||||||
queue_task(finish_termination)
|
queue_task(finish_termination)
|
||||||
|
|
||||||
|
def _pop_outbound(self, handle, clear_from_source_facet):
|
||||||
|
e = self.outbound.pop(handle)
|
||||||
|
if e and clear_from_source_facet:
|
||||||
|
try:
|
||||||
|
e.source_facet.handles.remove(handle)
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
return e
|
||||||
|
|
||||||
class Facet:
|
class Facet:
|
||||||
def __init__(self, actor, parent, initial_assertions = None):
|
def __init__(self, actor, parent, initial_handles=None):
|
||||||
self.id = next(_next_facet_id)
|
self.id = next(_next_facet_id)
|
||||||
self.actor = actor
|
self.actor = actor
|
||||||
self.parent = parent
|
self.parent = parent
|
||||||
if parent:
|
if parent:
|
||||||
parent.children.add(self)
|
parent.children.add(self)
|
||||||
self.children = set()
|
self.children = set()
|
||||||
self.outbound = initial_assertions or {}
|
self.handles = initial_handles or set()
|
||||||
self.shutdown_actions = []
|
self.shutdown_actions = []
|
||||||
self.linked_tasks = []
|
self.linked_tasks = []
|
||||||
self.alive = True
|
self.alive = True
|
||||||
|
@ -144,7 +153,7 @@ class Facet:
|
||||||
def isinert(self):
|
def isinert(self):
|
||||||
return \
|
return \
|
||||||
len(self.children) == 0 and \
|
len(self.children) == 0 and \
|
||||||
len(self.outbound) == 0 and \
|
len(self.handles) == 0 and \
|
||||||
len(self.linked_tasks) == 0 and \
|
len(self.linked_tasks) == 0 and \
|
||||||
self.inert_check_preventers == 0
|
self.inert_check_preventers == 0
|
||||||
|
|
||||||
|
@ -193,9 +202,11 @@ class Facet:
|
||||||
if orderly:
|
if orderly:
|
||||||
for h in self.shutdown_actions:
|
for h in self.shutdown_actions:
|
||||||
h(turn)
|
h(turn)
|
||||||
for e in self.outbound.values():
|
for h in self.handles:
|
||||||
turn._retract(e)
|
# Optimization: don't clear from source facet, the source facet is us and we're
|
||||||
self.outbound.clear()
|
# about to clear our handles in one fell swoop.
|
||||||
|
turn._retract(self.actor._pop_outbound(h, clear_from_source_facet=False))
|
||||||
|
self.handles.clear()
|
||||||
|
|
||||||
if orderly:
|
if orderly:
|
||||||
if parent:
|
if parent:
|
||||||
|
@ -298,12 +309,13 @@ class Turn:
|
||||||
def on_stop(self, a):
|
def on_stop(self, a):
|
||||||
self._facet.on_stop(a)
|
self._facet.on_stop(a)
|
||||||
|
|
||||||
def spawn(self, boot_proc, name = None, initial_assertions = None, daemon = False):
|
def spawn(self, boot_proc, name = None, initial_handles = None, daemon = False):
|
||||||
def action(turn):
|
def action(turn):
|
||||||
new_outbound = {}
|
new_outbound = {}
|
||||||
if initial_assertions is not None:
|
if initial_handles is not None:
|
||||||
for handle in initial_assertions:
|
for handle in initial_handles:
|
||||||
new_outbound[handle] = self._facet.outbound.pop(handle)
|
new_outbound[handle] = \
|
||||||
|
self._facet.actor._pop_outbound(handle, clear_from_source_facet=True)
|
||||||
queue_task(lambda: Actor(boot_proc,
|
queue_task(lambda: Actor(boot_proc,
|
||||||
name = name,
|
name = name,
|
||||||
initial_assertions = new_outbound,
|
initial_assertions = new_outbound,
|
||||||
|
@ -324,8 +336,10 @@ class Turn:
|
||||||
def _publish(self, ref, assertion, handle):
|
def _publish(self, ref, assertion, handle):
|
||||||
# TODO: attenuation
|
# TODO: attenuation
|
||||||
assertion = preserve(assertion)
|
assertion = preserve(assertion)
|
||||||
e = OutboundAssertion(handle, ref)
|
facet = self._facet
|
||||||
self._facet.outbound[handle] = e
|
e = OutboundAssertion(facet, handle, ref)
|
||||||
|
facet.actor.outbound[handle] = e
|
||||||
|
facet.handles.add(handle)
|
||||||
def action(turn):
|
def action(turn):
|
||||||
e.established = True
|
e.established = True
|
||||||
self.log.debug('%r <-- publish %r handle %r', ref, assertion, handle)
|
self.log.debug('%r <-- publish %r handle %r', ref, assertion, handle)
|
||||||
|
@ -334,7 +348,7 @@ class Turn:
|
||||||
|
|
||||||
def retract(self, handle):
|
def retract(self, handle):
|
||||||
if handle is not None:
|
if handle is not None:
|
||||||
e = self._facet.outbound.pop(handle, None)
|
e = self._facet.actor._pop_outbound(handle, clear_from_source_facet=True)
|
||||||
if e is not None:
|
if e is not None:
|
||||||
self._retract(e)
|
self._retract(e)
|
||||||
|
|
||||||
|
@ -344,7 +358,8 @@ class Turn:
|
||||||
return new_handle
|
return new_handle
|
||||||
|
|
||||||
def _retract(self, e):
|
def _retract(self, e):
|
||||||
# Assumes e has already been removed from self._facet.outbound
|
# Assumes e has already been removed from self._facet.actor.outbound and the
|
||||||
|
# appropriate set of handles
|
||||||
def action(turn):
|
def action(turn):
|
||||||
if e.established:
|
if e.established:
|
||||||
e.established = False
|
e.established = False
|
||||||
|
@ -408,14 +423,15 @@ class Ref:
|
||||||
return '<Ref:%s/%r>' % (self.facet._repr_labels(), self.entity)
|
return '<Ref:%s/%r>' % (self.facet._repr_labels(), self.entity)
|
||||||
|
|
||||||
class OutboundAssertion:
|
class OutboundAssertion:
|
||||||
def __init__(self, handle, ref):
|
def __init__(self, source_facet, handle, ref):
|
||||||
|
self.source_facet = source_facet
|
||||||
self.handle = handle
|
self.handle = handle
|
||||||
self.ref = ref
|
self.ref = ref
|
||||||
self.established = False
|
self.established = False
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<OutboundAssertion handle=%s ref=%r%s>' % \
|
return '<OutboundAssertion src=%r handle=%s ref=%r%s>' % \
|
||||||
(self.handle, self.ref, ' established' if self.established else '')
|
(self.source_facet, self.handle, self.ref, ' established' if self.established else '')
|
||||||
|
|
||||||
# Can act as a mixin
|
# Can act as a mixin
|
||||||
class Entity:
|
class Entity:
|
||||||
|
|
|
@ -6,10 +6,10 @@ def _ignore(*args, **kwargs):
|
||||||
def _default_sync(turn, peer):
|
def _default_sync(turn, peer):
|
||||||
turn.send(peer, True)
|
turn.send(peer, True)
|
||||||
|
|
||||||
class During(actor.Entity):
|
class Handler(actor.Entity):
|
||||||
def __init__(self, on_add=None, on_msg=None, on_sync=None, name=None, inert_ok=False):
|
def __init__(self, on_add=None, on_msg=None, on_sync=None, name=None, inert_ok=True):
|
||||||
self.facets = {}
|
self.retraction_handlers = {}
|
||||||
self._on_add = on_add or _ignore
|
self._on_add = self._wrap_add_handler(on_add) or _ignore
|
||||||
self._on_msg = on_msg or _ignore
|
self._on_msg = on_msg or _ignore
|
||||||
self._on_sync = on_sync or _default_sync
|
self._on_sync = on_sync or _default_sync
|
||||||
self.name = name
|
self.name = name
|
||||||
|
@ -24,16 +24,18 @@ class During(actor.Entity):
|
||||||
def _wrap(self, v):
|
def _wrap(self, v):
|
||||||
return v if self.flatten_arg and isinstance(v, tuple) else (v,)
|
return v if self.flatten_arg and isinstance(v, tuple) else (v,)
|
||||||
|
|
||||||
|
def _wrap_add_handler(self, handler):
|
||||||
|
return handler
|
||||||
|
|
||||||
def on_publish(self, turn, v, handle):
|
def on_publish(self, turn, v, handle):
|
||||||
facet = turn.facet(lambda turn: self._on_add(turn, *self._wrap(v)))
|
retraction_handler = self._on_add(turn, *self._wrap(v))
|
||||||
if self.inert_ok:
|
if retraction_handler is not None:
|
||||||
facet.prevent_inert_check()
|
self.retraction_handlers[handle] = retraction_handler
|
||||||
self.facets[handle] = facet
|
|
||||||
|
|
||||||
def on_retract(self, turn, handle):
|
def on_retract(self, turn, handle):
|
||||||
facet = self.facets.pop(handle, None)
|
retraction_handler = self.retraction_handlers.pop(handle, None)
|
||||||
if facet is not None:
|
if retraction_handler is not None:
|
||||||
turn.stop(facet)
|
retraction_handler(turn)
|
||||||
|
|
||||||
def on_message(self, turn, v):
|
def on_message(self, turn, v):
|
||||||
self._on_msg(turn, *self._wrap(v))
|
self._on_msg(turn, *self._wrap(v))
|
||||||
|
@ -43,7 +45,7 @@ class During(actor.Entity):
|
||||||
|
|
||||||
# decorator
|
# decorator
|
||||||
def add_handler(self, on_add):
|
def add_handler(self, on_add):
|
||||||
self._on_add = on_add
|
self._on_add = self._wrap_add_handler(on_add)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
# decorator
|
# decorator
|
||||||
|
@ -55,3 +57,14 @@ class During(actor.Entity):
|
||||||
def sync_handler(self, on_sync):
|
def sync_handler(self, on_sync):
|
||||||
self._on_sync = on_sync
|
self._on_sync = on_sync
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
class During(Handler):
|
||||||
|
def _wrap_add_handler(self, handler):
|
||||||
|
def facet_handler(turn, *args):
|
||||||
|
@turn.facet
|
||||||
|
def facet(turn):
|
||||||
|
if self.inert_ok:
|
||||||
|
turn.prevent_inert_check()
|
||||||
|
handler(turn, *args)
|
||||||
|
return lambda turn: turn.stop(facet)
|
||||||
|
return facet_handler
|
||||||
|
|
|
@ -13,48 +13,57 @@ from .idgen import IdGenerator
|
||||||
from .schema import externalProtocol as protocol, sturdy, transportAddress
|
from .schema import externalProtocol as protocol, sturdy, transportAddress
|
||||||
|
|
||||||
class InboundAssertion:
|
class InboundAssertion:
|
||||||
def __init__(self, remote_handle, local_handle, wire_symbols):
|
def __init__(self, remote_handle, local_handle, pins):
|
||||||
self.remote_handle = remote_handle
|
self.remote_handle = remote_handle
|
||||||
self.local_handle = local_handle
|
self.local_handle = local_handle
|
||||||
self.wire_symbols = wire_symbols
|
self.pins = pins
|
||||||
|
|
||||||
_next_local_oid = IdGenerator()
|
_next_local_oid = IdGenerator()
|
||||||
|
|
||||||
class WireSymbol:
|
class WireSymbol:
|
||||||
def __init__(self, oid, ref):
|
def __init__(self, oid, ref, membrane):
|
||||||
self.oid = oid
|
self.oid = oid
|
||||||
self.ref = ref
|
self.ref = ref
|
||||||
|
self.membrane = membrane
|
||||||
self.count = 0
|
self.count = 0
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return '<ws:%d/%d:%r>' % (self.oid, self.count, self.ref)
|
return '<ws:%d/%d:%r>' % (self.oid, self.count, self.ref)
|
||||||
|
|
||||||
|
def grab(self, pins):
|
||||||
|
self.count = self.count + 1
|
||||||
|
pins.append(self)
|
||||||
|
|
||||||
|
def drop(self):
|
||||||
|
self.count = self.count - 1
|
||||||
|
if self.count == 0:
|
||||||
|
del self.membrane.oid_map[self.oid]
|
||||||
|
del self.membrane.ref_map[self.ref]
|
||||||
|
|
||||||
class Membrane:
|
class Membrane:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.oid_map = {}
|
self.oid_map = {}
|
||||||
self.ref_map = {}
|
self.ref_map = {}
|
||||||
|
|
||||||
def _get(self, map, key, is_transient, ws_maker):
|
def _get(self, pins, map, key, is_transient, ws_maker):
|
||||||
ws = map.get(key, None)
|
ws = map.get(key, None)
|
||||||
if ws is None:
|
if ws is None and ws_maker is not None:
|
||||||
ws = ws_maker()
|
ws = ws_maker()
|
||||||
self.oid_map[ws.oid] = ws
|
self.oid_map[ws.oid] = ws
|
||||||
self.ref_map[ws.ref] = ws
|
self.ref_map[ws.ref] = ws
|
||||||
if not is_transient:
|
if not is_transient and ws is not None:
|
||||||
ws.count = ws.count + 1
|
ws.grab(pins)
|
||||||
return ws
|
return ws
|
||||||
|
|
||||||
def get_ref(self, local_ref, is_transient, ws_maker):
|
def get_ref(self, pins, local_ref, is_transient, ws_maker):
|
||||||
return self._get(self.ref_map, local_ref, is_transient, ws_maker)
|
return self._get(pins, self.ref_map, local_ref, is_transient, ws_maker)
|
||||||
|
|
||||||
def get_oid(self, remote_oid, ws_maker):
|
def get_oid(self, pins, remote_oid, ws_maker):
|
||||||
return self._get(self.oid_map, remote_oid, False, ws_maker)
|
return self._get(pins, self.oid_map, remote_oid, False, ws_maker)
|
||||||
|
|
||||||
def drop(self, ws):
|
def drop_all(wss):
|
||||||
ws.count = ws.count - 1
|
for ws in wss:
|
||||||
if ws.count == 0:
|
ws.drop()
|
||||||
del self.oid_map[ws.oid]
|
|
||||||
del self.ref_map[ws.ref]
|
|
||||||
|
|
||||||
# There are other kinds of relay. This one has exactly two participants connected to each other.
|
# There are other kinds of relay. This one has exactly two participants connected to each other.
|
||||||
class TunnelRelay:
|
class TunnelRelay:
|
||||||
|
@ -79,7 +88,7 @@ class TunnelRelay:
|
||||||
|
|
||||||
def _reset(self):
|
def _reset(self):
|
||||||
self.inbound_assertions = {} # map remote handle to InboundAssertion
|
self.inbound_assertions = {} # map remote handle to InboundAssertion
|
||||||
self.outbound_assertions = {} # map local handle to wire_symbols
|
self.outbound_assertions = {} # map local handle to `WireSymbol`s
|
||||||
self.exported_references = Membrane()
|
self.exported_references = Membrane()
|
||||||
self.imported_references = Membrane()
|
self.imported_references = Membrane()
|
||||||
self.pending_turn = []
|
self.pending_turn = []
|
||||||
|
@ -94,49 +103,53 @@ class TunnelRelay:
|
||||||
self._disconnect()
|
self._disconnect()
|
||||||
|
|
||||||
def deregister(self, handle):
|
def deregister(self, handle):
|
||||||
for ws in self.outbound_assertions.pop(handle, ()):
|
drop_all(self.outbound_assertions.pop(handle, ()))
|
||||||
self.exported_references.drop(ws)
|
|
||||||
|
|
||||||
def _lookup(self, local_oid):
|
def _lookup_exported_oid(self, local_oid, pins):
|
||||||
ws = self.exported_references.oid_map.get(local_oid, None)
|
ws = self.exported_references.get_oid(pins, local_oid, None)
|
||||||
return _inert_ref if ws is None else ws.ref
|
if ws is None:
|
||||||
|
return _inert_ref
|
||||||
|
return ws.ref
|
||||||
|
|
||||||
def register(self, assertion, maybe_handle):
|
def register_imported_oid(self, remote_oid, pins):
|
||||||
exported = []
|
self.imported_references.get_oid(pins, remote_oid, None)
|
||||||
|
|
||||||
|
def register(self, target_oid, assertion, maybe_handle):
|
||||||
|
pins = []
|
||||||
|
self.register_imported_oid(target_oid, pins)
|
||||||
rewritten = map_embeddeds(
|
rewritten = map_embeddeds(
|
||||||
lambda r: Embedded(self.rewrite_ref_out(r, maybe_handle is None, exported)),
|
lambda r: Embedded(self.rewrite_ref_out(r, maybe_handle is None, pins)),
|
||||||
assertion)
|
assertion)
|
||||||
if maybe_handle is not None:
|
if maybe_handle is not None:
|
||||||
self.outbound_assertions[maybe_handle] = exported
|
self.outbound_assertions[maybe_handle] = pins
|
||||||
return rewritten
|
return rewritten
|
||||||
|
|
||||||
def rewrite_ref_out(self, r, is_transient, exported):
|
def rewrite_ref_out(self, r, is_transient, pins):
|
||||||
if isinstance(r.entity, RelayEntity) and r.entity.relay == self:
|
if isinstance(r.entity, RelayEntity) and r.entity.relay == self:
|
||||||
# TODO attenuation
|
# TODO attenuation
|
||||||
return sturdy.WireRef.yours(sturdy.Oid(r.entity.oid), ())
|
return sturdy.WireRef.yours(sturdy.Oid(r.entity.oid), ())
|
||||||
else:
|
else:
|
||||||
ws = self.exported_references.get_ref(
|
ws = self.exported_references.get_ref(
|
||||||
r, is_transient, lambda: WireSymbol(next(_next_local_oid), r))
|
pins, r, is_transient, lambda: WireSymbol(next(_next_local_oid), r,
|
||||||
exported.append(ws)
|
self.exported_references))
|
||||||
return sturdy.WireRef.mine(sturdy.Oid(ws.oid))
|
return sturdy.WireRef.mine(sturdy.Oid(ws.oid))
|
||||||
|
|
||||||
def rewrite_in(self, turn, assertion):
|
def rewrite_in(self, turn, assertion, pins):
|
||||||
imported = []
|
|
||||||
rewritten = map_embeddeds(
|
rewritten = map_embeddeds(
|
||||||
lambda wire_ref: Embedded(self.rewrite_ref_in(turn, wire_ref, imported)),
|
lambda wire_ref: Embedded(self.rewrite_ref_in(turn, wire_ref, pins)),
|
||||||
assertion)
|
assertion)
|
||||||
return (rewritten, imported)
|
return rewritten
|
||||||
|
|
||||||
def rewrite_ref_in(self, turn, wire_ref, imported):
|
def rewrite_ref_in(self, turn, wire_ref, pins):
|
||||||
if wire_ref.VARIANT.name == 'mine':
|
if wire_ref.VARIANT.name == 'mine':
|
||||||
oid = wire_ref.oid.value
|
oid = wire_ref.oid.value
|
||||||
ws = self.imported_references.get_oid(
|
ws = self.imported_references.get_oid(
|
||||||
oid, lambda: WireSymbol(oid, turn.ref(RelayEntity(self, oid))))
|
pins, oid, lambda: WireSymbol(oid, turn.ref(RelayEntity(self, oid)),
|
||||||
imported.append(ws)
|
self.imported_references))
|
||||||
return ws.ref
|
return ws.ref
|
||||||
else:
|
else:
|
||||||
oid = wire_ref.oid.value
|
oid = wire_ref.oid.value
|
||||||
local_ref = self._lookup(oid)
|
local_ref = self._lookup_exported_oid(oid, pins)
|
||||||
attenuation = wire_ref.attenuation
|
attenuation = wire_ref.attenuation
|
||||||
if len(attenuation) > 0:
|
if len(attenuation) > 0:
|
||||||
raise NotImplementedError('Non-empty attenuations not yet implemented') # TODO
|
raise NotImplementedError('Non-empty attenuations not yet implemented') # TODO
|
||||||
|
@ -178,44 +191,45 @@ class TunnelRelay:
|
||||||
|
|
||||||
def _handle_turn_events(self, turn, events):
|
def _handle_turn_events(self, turn, events):
|
||||||
for e in events:
|
for e in events:
|
||||||
ref = self._lookup(e.oid.value)
|
pins = []
|
||||||
|
ref = self._lookup_exported_oid(e.oid.value, pins)
|
||||||
event = e.event
|
event = e.event
|
||||||
variant = event.VARIANT.name
|
variant = event.VARIANT.name
|
||||||
if variant == 'Assert':
|
if variant == 'Assert':
|
||||||
self._handle_publish(turn, ref, event.value.assertion.value, event.value.handle.value)
|
self._handle_publish(pins, turn, ref, event.value.assertion.value, event.value.handle.value)
|
||||||
elif variant == 'Retract':
|
elif variant == 'Retract':
|
||||||
self._handle_retract(turn, ref, event.value.handle.value)
|
self._handle_retract(pins, turn, ref, event.value.handle.value)
|
||||||
elif variant == 'Message':
|
elif variant == 'Message':
|
||||||
self._handle_message(turn, ref, event.value.body.value)
|
self._handle_message(pins, turn, ref, event.value.body.value)
|
||||||
elif variant == 'Sync':
|
elif variant == 'Sync':
|
||||||
self._handle_sync(turn, ref, event.value.peer)
|
self._handle_sync(pins, turn, ref, event.value.peer)
|
||||||
|
|
||||||
def _handle_publish(self, turn, ref, assertion, remote_handle):
|
def _handle_publish(self, pins, turn, ref, assertion, remote_handle):
|
||||||
(assertion, imported) = self.rewrite_in(turn, assertion)
|
assertion = self.rewrite_in(turn, assertion, pins)
|
||||||
self.inbound_assertions[remote_handle] = \
|
self.inbound_assertions[remote_handle] = \
|
||||||
InboundAssertion(remote_handle, turn.publish(ref, assertion), imported)
|
InboundAssertion(remote_handle, turn.publish(ref, assertion), pins)
|
||||||
|
|
||||||
def _handle_retract(self, turn, ref, remote_handle):
|
def _handle_retract(self, pins, turn, ref, remote_handle):
|
||||||
ia = self.inbound_assertions.pop(remote_handle, None)
|
ia = self.inbound_assertions.pop(remote_handle, None)
|
||||||
if ia is None:
|
if ia is None:
|
||||||
raise ValueError('Peer retracted invalid handle %s' % (remote_handle,))
|
raise ValueError('Peer retracted invalid handle %s' % (remote_handle,))
|
||||||
for ws in ia.wire_symbols:
|
drop_all(ia.pins)
|
||||||
self.imported_references.drop(ws)
|
drop_all(pins)
|
||||||
turn.retract(ia.local_handle)
|
turn.retract(ia.local_handle)
|
||||||
|
|
||||||
def _handle_message(self, turn, ref, message):
|
def _handle_message(self, pins, turn, ref, message):
|
||||||
(message, imported) = self.rewrite_in(turn, message)
|
message = self.rewrite_in(turn, message, pins)
|
||||||
if len(imported) > 0:
|
for ws in pins:
|
||||||
raise ValueError('Cannot receive transient reference')
|
if ws.count == 1:
|
||||||
|
raise ValueError('Cannot receive transient reference')
|
||||||
turn.send(ref, message)
|
turn.send(ref, message)
|
||||||
|
drop_all(pins)
|
||||||
|
|
||||||
def _handle_sync(self, turn, ref, wire_peer):
|
def _handle_sync(self, pins, turn, ref, wire_peer):
|
||||||
imported = []
|
peer = self.rewrite_ref_in(turn, wire_peer, pins)
|
||||||
peer = self.rewrite_ref_in(turn, wire_peer, imported)
|
|
||||||
def done(turn):
|
def done(turn):
|
||||||
turn.send(peer, True)
|
turn.send(peer, True)
|
||||||
for ws in imported:
|
drop_all(pins)
|
||||||
self.imported_references.drop(ws)
|
|
||||||
turn.sync(ref, done)
|
turn.sync(ref, done)
|
||||||
|
|
||||||
def _send(self, remote_oid, turn_event):
|
def _send(self, remote_oid, turn_event):
|
||||||
|
@ -269,7 +283,7 @@ class RelayEntity(actor.Entity):
|
||||||
|
|
||||||
def on_publish(self, turn, assertion, handle):
|
def on_publish(self, turn, assertion, handle):
|
||||||
self._send(protocol.Event.Assert(protocol.Assert(
|
self._send(protocol.Event.Assert(protocol.Assert(
|
||||||
protocol.Assertion(self.relay.register(assertion, handle)),
|
protocol.Assertion(self.relay.register(self.oid, assertion, handle)),
|
||||||
protocol.Handle(handle))))
|
protocol.Handle(handle))))
|
||||||
|
|
||||||
def on_retract(self, turn, handle):
|
def on_retract(self, turn, handle):
|
||||||
|
@ -278,22 +292,23 @@ class RelayEntity(actor.Entity):
|
||||||
|
|
||||||
def on_message(self, turn, message):
|
def on_message(self, turn, message):
|
||||||
self._send(protocol.Event.Message(protocol.Message(
|
self._send(protocol.Event.Message(protocol.Message(
|
||||||
protocol.Assertion(self.relay.register(message, None)))))
|
protocol.Assertion(self.relay.register(self.oid, message, None)))))
|
||||||
|
|
||||||
def on_sync(self, turn, peer):
|
def on_sync(self, turn, peer):
|
||||||
exported = []
|
pins = []
|
||||||
entity = SyncPeerEntity(self.relay, peer, exported)
|
self.relay.register_imported_oid(self.oid, pins)
|
||||||
rewritten = Embedded(self.relay.rewrite_ref_out(turn.ref(entity), False, exported))
|
entity = SyncPeerEntity(self.relay, peer, pins)
|
||||||
|
rewritten = Embedded(self.relay.rewrite_ref_out(turn.ref(entity), False, pins))
|
||||||
self._send(protocol.Event.Sync(protocol.Sync(rewritten)))
|
self._send(protocol.Event.Sync(protocol.Sync(rewritten)))
|
||||||
|
|
||||||
class SyncPeerEntity(actor.Entity):
|
class SyncPeerEntity(actor.Entity):
|
||||||
def __init__(self, relay, peer, exported):
|
def __init__(self, relay, peer, pins):
|
||||||
self.relay = relay
|
self.relay = relay
|
||||||
self.peer = peer
|
self.peer = peer
|
||||||
self.exported = exported
|
self.pins = pins
|
||||||
|
|
||||||
def on_message(self, turn, body):
|
def on_message(self, turn, body):
|
||||||
self.relay.exported_references.drop(self.exported[0])
|
drop_all(self.pins)
|
||||||
turn.send(self.peer, body)
|
turn.send(self.peer, body)
|
||||||
|
|
||||||
async def _default_on_connected(relay):
|
async def _default_on_connected(relay):
|
||||||
|
|
Loading…
Reference in New Issue