Better decorators; treat linked tasks as non-inert

This commit is contained in:
Tony Garnock-Jones 2021-08-19 16:08:53 -04:00
parent 82e5a2c07b
commit 38f6f500c3
5 changed files with 42 additions and 37 deletions

View File

@ -29,13 +29,12 @@ def main_facet(turn, root_facet, ds):
f = turn._facet f = turn._facet
turn.publish(ds, Present(me)) turn.publish(ds, Present(me))
@During().observe(turn, ds, P.rec('Present', P.CAPTURE)) @dataspace.during(turn, ds, P.rec('Present', P.CAPTURE))
def on_presence(turn, who): def on_presence(turn, who):
print('%s joined' % (who,)) print('%s joined' % (who,))
return lambda turn: print('%s left' % (who,)) return lambda turn: print('%s left' % (who,))
@dataspace.observe(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE)) @dataspace.on_message(turn, ds, P.rec('Says', P.CAPTURE, P.CAPTURE))
@During().msg_handler
def on_says(turn, who, what): def on_says(turn, who, what):
print('%s says %r' % (who, what)) print('%s says %r' % (who, what))
@ -57,9 +56,7 @@ def main(turn):
return turn.facet(lambda turn: main_facet(turn, root_facet, ds.embeddedValue)) return turn.facet(lambda turn: main_facet(turn, root_facet, ds.embeddedValue))
turn.publish(gk.embeddedValue, gatekeeper.Resolve(cap, turn.ref(handle_ds))) turn.publish(gk.embeddedValue, gatekeeper.Resolve(cap, turn.ref(handle_ds)))
disarm = turn.prevent_inert_check()
async def on_connected(tr): async def on_connected(tr):
disarm()
print('-'*50, 'Connected') print('-'*50, 'Connected')
async def on_disconnected(tr, did_connect): async def on_disconnected(tr, did_connect):
if did_connect: if did_connect:

View File

@ -110,6 +110,7 @@ class Facet:
self.children = set() self.children = set()
self.outbound = initial_assertions self.outbound = initial_assertions
self.shutdown_actions = [] self.shutdown_actions = []
self.linked_tasks = []
self.alive = True self.alive = True
self.inert_check_preventers = 0 self.inert_check_preventers = 0
@ -137,7 +138,11 @@ class Facet:
remove_noerror(self.shutdown_actions, a) remove_noerror(self.shutdown_actions, a)
def isinert(self): def isinert(self):
return len(self.children) == 0 and len(self.outbound) == 0 and self.inert_check_preventers == 0 return \
len(self.children) == 0 and \
len(self.outbound) == 0 and \
len(self.linked_tasks) == 0 and \
self.inert_check_preventers == 0
def prevent_inert_check(self): def prevent_inert_check(self):
armed = True armed = True
@ -149,6 +154,26 @@ class Facet:
self.inert_check_preventers = self.inert_check_preventers - 1 self.inert_check_preventers = self.inert_check_preventers - 1
return disarm return disarm
def linked_task(self, coro, loop = None):
task = None
def cancel_linked_task(turn):
nonlocal task
if task is not None:
remove_noerror(self.linked_tasks, task)
task.cancel()
task = None
self.cancel_on_stop(cancel_linked_task)
self.actor.cancel_at_exit(cancel_linked_task)
async def guarded_task():
try:
await coro
finally:
Turn.external(self, cancel_linked_task)
task = find_loop(loop).create_task(guarded_task())
self.linked_tasks.append(task)
self.on_stop(cancel_linked_task)
self.actor.at_exit(cancel_linked_task)
def _terminate(self, turn, orderly): def _terminate(self, turn, orderly):
if not self.alive: return if not self.alive: return
self.log.debug('%s terminating %r', 'orderly' if orderly else 'disorderly', self) self.log.debug('%s terminating %r', 'orderly' if orderly else 'disorderly', self)
@ -253,25 +278,7 @@ class Turn:
# decorator # decorator
def linked_task(self, loop = None): def linked_task(self, loop = None):
return lambda thunk: self._linked_task(thunk(), loop = loop) return lambda thunk: self._facet.linked_task(thunk(), loop = loop)
def _linked_task(self, coro, loop = None):
task = None
def cancel_linked_task(turn):
nonlocal task
if task is not None:
task.cancel()
task = None
self._facet.cancel_on_stop(cancel_linked_task)
self._facet.actor.cancel_at_exit(cancel_linked_task)
async def guarded_task():
try:
await coro
finally:
Turn.external(self._facet, cancel_linked_task)
task = find_loop(loop).create_task(guarded_task())
self._facet.on_stop(cancel_linked_task)
self._facet.actor.at_exit(cancel_linked_task)
def stop(self, facet = None, continuation = None): def stop(self, facet = None, continuation = None):
if facet is None: if facet is None:

View File

@ -1,4 +1,5 @@
from .schema import dataspace from .schema import dataspace
from .during import During
# decorator # decorator
def observe(turn, ds, pattern): def observe(turn, ds, pattern):
@ -6,3 +7,11 @@ def observe(turn, ds, pattern):
turn.publish(ds, dataspace.Observe(pattern, turn.ref(entity))) turn.publish(ds, dataspace.Observe(pattern, turn.ref(entity)))
return entity return entity
return publish_observer return publish_observer
# decorator
def on_message(turn, ds, pattern):
return lambda on_msg: observe(turn, ds, pattern)(During().msg_handler(on_msg))
# decorator
def during(turn, ds, pattern):
return lambda on_add: observe(turn, ds, pattern)(During().add_handler(on_add))

View File

@ -1,5 +1,4 @@
from . import actor from . import actor
from . import dataspace
def _ignore(*args, **kwargs): def _ignore(*args, **kwargs):
pass pass
@ -60,7 +59,3 @@ class During(actor.Entity):
def sync_handler(self, on_sync): def sync_handler(self, on_sync):
self._on_sync = on_sync self._on_sync = on_sync
return self return self
# decorator
def observe(self, turn, ds, pattern):
return lambda on_add: dataspace.observe(turn, ds, pattern)(self.add_handler(on_add))

View File

@ -6,7 +6,7 @@ from preserves import Embedded, stringify
from preserves.fold import map_embeddeds from preserves.fold import map_embeddeds
from . import actor, encode, transport, Decoder from . import actor, encode, transport, Decoder
from .actor import _inert_ref, Turn, adjust_engine_inhabitant_count from .actor import _inert_ref, Turn
from .idgen import IdGenerator from .idgen import IdGenerator
from .schema import externalProtocol as protocol, sturdy, transportAddress from .schema import externalProtocol as protocol, sturdy, transportAddress
@ -64,16 +64,15 @@ class TunnelRelay:
on_connected = None, on_connected = None,
on_disconnected = None, on_disconnected = None,
): ):
self.ref = turn.ref(self)
self.facet = turn._facet self.facet = turn._facet
self.facet.on_stop(self._shutdown) self.facet.on_stop(self._shutdown)
self.address = address self.address = address
self.gatekeeper_peer = gatekeeper_peer self.gatekeeper_peer = gatekeeper_peer
self.gatekeeper_oid = gatekeeper_oid self.gatekeeper_oid = gatekeeper_oid
self._reset() self._reset()
actor.queue_task(lambda: self._reconnecting_main(asyncio.get_running_loop(), self.facet.linked_task(self._reconnecting_main(asyncio.get_running_loop(),
on_connected = on_connected, on_connected = on_connected,
on_disconnected = on_disconnected)) on_disconnected = on_disconnected))
def _reset(self): def _reset(self):
self.inbound_assertions = {} # map remote handle to InboundAssertion self.inbound_assertions = {} # map remote handle to InboundAssertion
@ -232,12 +231,10 @@ class TunnelRelay:
raise Exception('subclassresponsibility') raise Exception('subclassresponsibility')
async def _reconnecting_main(self, loop, on_connected=None, on_disconnected=None): async def _reconnecting_main(self, loop, on_connected=None, on_disconnected=None):
adjust_engine_inhabitant_count(1)
should_run = True should_run = True
while should_run and self.facet.alive: while should_run and self.facet.alive:
did_connect = await self.main(loop, on_connected=(on_connected or _default_on_connected)) did_connect = await self.main(loop, on_connected=(on_connected or _default_on_connected))
should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect) should_run = await (on_disconnected or _default_on_disconnected)(self, did_connect)
adjust_engine_inhabitant_count(-1)
@staticmethod @staticmethod
def from_str(turn, s, **kwargs): def from_str(turn, s, **kwargs):