Start on linked_task

This commit is contained in:
Tony Garnock-Jones 2021-08-19 14:04:38 -04:00
parent f942b6aefb
commit 1f7832a358
2 changed files with 47 additions and 15 deletions

15
chat.py
View File

@ -40,19 +40,18 @@ def main_facet(turn, root_facet, ds):
turn.publish(ds, dataspace.Observe(P.rec('Says', P.CAPTURE, P.CAPTURE), turn.publish(ds, dataspace.Observe(P.rec('Says', P.CAPTURE, P.CAPTURE),
During(turn, on_msg = on_says).ref)) During(turn, on_msg = on_says).ref))
loop = asyncio.get_running_loop()
async def accept_input(): async def accept_input():
reader = asyncio.StreamReader() reader = asyncio.StreamReader()
print(await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)) await actor.find_loop().connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader),
sys.stdin)
while True: while True:
line = await reader.readline() line = (await reader.readline()).decode('utf-8')
line = line.decode('utf-8')
if not line: if not line:
actor.Turn.external(loop, f, lambda turn: turn.stop(root_facet)) actor.Turn.external(f, lambda turn: turn.stop(root_facet))
break break
actor.Turn.external(loop, f, lambda turn: turn.send(ds, Says(me, line.strip()))) actor.Turn.external(f, lambda turn: turn.send(ds, Says(me, line.strip())))
input_task = loop.create_task(accept_input()) turn.linked_task(accept_input())
turn._facet.on_stop(lambda turn: input_task.cancel())
def main(turn): def main(turn):
root_facet = turn._facet root_facet = turn._facet

View File

@ -36,6 +36,12 @@ def adjust_engine_inhabitant_count(delta):
log.debug('Inhabitant count reached zero') log.debug('Inhabitant count reached zero')
loop.stop() loop.stop()
def remove_noerror(collection, item):
try:
collection.remove(item)
except ValueError:
pass
class Actor: class Actor:
def __init__(self, boot_proc, name = None, initial_assertions = {}, daemon = False): def __init__(self, boot_proc, name = None, initial_assertions = {}, daemon = False):
self.name = name or 'a' + str(next(_next_actor_number)) self.name = name or 'a' + str(next(_next_actor_number))
@ -75,6 +81,9 @@ class Actor:
def at_exit(self, hook): def at_exit(self, hook):
self.exit_hooks.append(hook) self.exit_hooks.append(hook)
def cancel_at_exit(self, hook):
remove_noerror(self.exit_hooks, hook)
def terminate(self, turn, exit_reason): def terminate(self, turn, exit_reason):
if self.exit_reason is not None: return if self.exit_reason is not None: return
self.log.debug('Terminating %r with exit_reason %r', self, exit_reason) self.log.debug('Terminating %r with exit_reason %r', self, exit_reason)
@ -124,6 +133,9 @@ class Facet:
def on_stop(self, a): def on_stop(self, a):
self.shutdown_actions.append(a) self.shutdown_actions.append(a)
def cancel_on_stop(self, a):
remove_noerror(self.shutdown_actions, a)
def isinert(self): def isinert(self):
return len(self.children) == 0 and len(self.outbound) == 0 and self.inert_check_preventers == 0 return len(self.children) == 0 and len(self.outbound) == 0 and self.inert_check_preventers == 0
@ -186,15 +198,18 @@ async def ensure_awaitable(value):
else: else:
return value return value
def queue_task(thunk, loop = asyncio): def find_loop(loop = None):
async def task(): return asyncio.get_running_loop() if loop is None else loop
await ensure_awaitable(thunk())
return loop.create_task(task())
def queue_task_threadsafe(thunk, loop): def queue_task(thunk, loop = None):
async def task(): async def task():
await ensure_awaitable(thunk()) await ensure_awaitable(thunk())
return asyncio.run_coroutine_threadsafe(task(), loop) return find_loop(loop).create_task(task())
def queue_task_threadsafe(thunk, loop = None):
async def task():
await ensure_awaitable(thunk())
return asyncio.run_coroutine_threadsafe(task(), find_loop(loop))
class Turn: class Turn:
@classmethod @classmethod
@ -213,7 +228,7 @@ class Turn:
turn._deliver() turn._deliver()
@classmethod @classmethod
def external(cls, loop, facet, action): def external(cls, facet, action, loop = None):
return queue_task_threadsafe(lambda: cls.run(facet, action), loop) return queue_task_threadsafe(lambda: cls.run(facet, action), loop)
def __init__(self, facet): def __init__(self, facet):
@ -236,6 +251,24 @@ class Turn:
def prevent_inert_check(self): def prevent_inert_check(self):
return self._facet.prevent_inert_check() return self._facet.prevent_inert_check()
def linked_task(self, coro, loop = None):
task = None
def cancel_linked_task(turn):
nonlocal task
if task is not None:
task.cancel()
task = None
self._facet.cancel_on_stop(cancel_linked_task)
self._facet.actor.cancel_at_exit(cancel_linked_task)
async def guarded_task():
try:
await coro
finally:
Turn.external(self._facet, cancel_linked_task)
task = find_loop(loop).create_task(guarded_task())
self._facet.on_stop(cancel_linked_task)
self._facet.actor.at_exit(cancel_linked_task)
def stop(self, facet = None, continuation = None): def stop(self, facet = None, continuation = None):
if facet is None: if facet is None:
facet = self._facet facet = self._facet