Compare commits

...

4 Commits

2 changed files with 33 additions and 14 deletions

View File

@ -29,6 +29,8 @@ def main():
@syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
def on_connected(ds):
turn.on_stop(lambda: turn.stop(root_facet))
me = 'user_' + str(random.randint(10, 1000))
turn.publish(ds, Present(me))
@ -48,4 +50,3 @@ def main():
await f.loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
while line := (await reader.readline()).decode('utf-8'):
turn.external(f, lambda: turn.send(ds, Says(me, line.strip())))
turn.external(f, lambda: turn.stop(root_facet))

View File

@ -28,36 +28,47 @@ class System:
def __init__(self, loop = None):
self.loop = loop or asyncio.get_event_loop()
self.inhabitant_count = 0
self.exit_signal = asyncio.Queue()
def run(self, boot_proc, debug = False, name = None, configure_logging = True):
def run(self, boot_proc, debug = None, name = None, configure_logging = True):
if configure_logging:
logging.basicConfig(level = logging.DEBUG if debug else logging.INFO)
if debug:
self.loop.set_debug(True)
self.queue_task(lambda: Actor(boot_proc, system = self, name = name))
# From Python 3.12, we may be able to use:
# asyncio.run(self._run, debug=debug, loop_factory=lambda: self.loop)
# but until then:
with asyncio.Runner(debug=debug, loop_factory=lambda: self.loop) as r:
return r.run(self._run())
async def _run(self):
try:
self.loop.run_forever()
except:
traceback.print_exc()
while asyncio.all_tasks(self.loop):
self.loop.stop()
self.loop.run_forever()
self.loop.close()
await self.exit_signal.get()
finally:
log.debug('System._run main loop exit')
def adjust_engine_inhabitant_count(self, delta):
self.inhabitant_count = self.inhabitant_count + delta
if self.inhabitant_count == 0:
log.debug('Inhabitant count reached zero')
self.loop.stop()
self.exit_signal.put_nowait(())
def queue_task(self, thunk):
async def task():
await ensure_awaitable(thunk())
try:
await ensure_awaitable(thunk())
except asyncio.CancelledError:
pass
return self.loop.create_task(task())
def queue_task_threadsafe(self, thunk):
async def task():
await ensure_awaitable(thunk())
try:
await ensure_awaitable(thunk())
except asyncio.CancelledError:
pass
return self.loop.call_soon_threadsafe(lambda: asyncio.run_coroutine_threadsafe(task(), self.loop))
async def ensure_awaitable(value):
@ -240,13 +251,20 @@ class Facet:
task.cancel()
task = None
async def guarded_task():
should_terminate_facet = True
try:
await coro_fn(self)
if await coro_fn(self) is True:
should_terminate_facet = False
except asyncio.CancelledError:
pass
except:
import traceback
traceback.print_exc()
finally:
Turn.external(self, cancel_linked_task)
if should_terminate_facet:
Turn.external(self, lambda: Turn.active.stop())
else:
Turn.external(self, cancel_linked_task)
task = self.loop.create_task(guarded_task())
self.linked_tasks.append(task)