From bd71008e13ba59f75f97c390c99460146013076d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 6 Mar 2023 23:24:36 +0100 Subject: [PATCH] Executors; repair relay.service --- chat.py | 2 +- syndicate/actor.py | 26 +++++++++++++++++++++----- syndicate/relay.py | 3 +-- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/chat.py b/chat.py index a9a0544..f00031f 100644 --- a/chat.py +++ b/chat.py @@ -45,7 +45,7 @@ def main(): @turn.linked_task() async def accept_input(f): reader = asyncio.StreamReader() - await actor.find_loop().connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin) + await f.loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin) while line := (await reader.readline()).decode('utf-8'): turn.external(f, lambda: turn.send(ds, Says(me, line.strip()))) turn.external(f, lambda: turn.stop(root_facet)) diff --git a/syndicate/actor.py b/syndicate/actor.py index 291cb37..602e79c 100644 --- a/syndicate/actor.py +++ b/syndicate/actor.py @@ -36,7 +36,10 @@ class System: if debug: self.loop.set_debug(True) self.queue_task(lambda: Actor(boot_proc, system = self, name = name)) - self.loop.run_forever() + try: + self.loop.run_forever() + except: + traceback.print_exc() while asyncio.all_tasks(self.loop): self.loop.stop() self.loop.run_forever() @@ -224,8 +227,21 @@ class Facet: self.inert_check_preventers = self.inert_check_preventers - 1 return disarm - def linked_task(self, coro_fn, loop = None): + @property + def loop(self): + return self.actor._system.loop + + def linked_task(self, coro_fn, run_in_executor=False): task = None + if run_in_executor: + inner_coro_fn = coro_fn + async def outer_coro_fn(facet): + try: + await self.loop.run_in_executor(None, lambda: inner_coro_fn(facet)) + except: + import traceback + traceback.print_exc() + coro_fn = outer_coro_fn @self.on_stop_or_crash def cancel_linked_task(): nonlocal task @@ -238,7 +254,7 @@ class Facet: await coro_fn(self) finally: Turn.external(self, cancel_linked_task) - task = self.actor._system.loop.create_task(guarded_task()) + task = self.loop.create_task(guarded_task()) self.linked_tasks.append(task) def _terminate(self, orderly): @@ -345,8 +361,8 @@ class Turn: return self._facet.prevent_inert_check() # decorator - def linked_task(self, loop = None): - return lambda coro_fn: self._facet.linked_task(coro_fn, loop = loop) + def linked_task(self, **kwargs): + return lambda coro_fn: self._facet.linked_task(coro_fn, **kwargs) def stop(self, facet = None, continuation = None): if facet is None: diff --git a/syndicate/relay.py b/syndicate/relay.py index e52772b..25d508c 100644 --- a/syndicate/relay.py +++ b/syndicate/relay.py @@ -473,5 +473,4 @@ def run_stdio_service(entity): # decorator def service(**kwargs): - return lambda entity: \ - actor.start_actor_system(lambda: run_stdio_service(entity), **kwargs) + return lambda entity: actor.run_system(**kwargs)(lambda: run_stdio_service(entity))