Compare commits
4 Commits
42cb62c094
...
3035b43941
Author | SHA1 | Date |
---|---|---|
Tony Garnock-Jones | 3035b43941 | |
Tony Garnock-Jones | 08e49fd14e | |
Tony Garnock-Jones | a4c0bf3e6f | |
Tony Garnock-Jones | e5b65ad0ed |
3
chat.py
3
chat.py
|
@ -29,6 +29,8 @@ def main():
|
|||
|
||||
@syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
|
||||
def on_connected(ds):
|
||||
turn.on_stop(lambda: turn.stop(root_facet))
|
||||
|
||||
me = 'user_' + str(random.randint(10, 1000))
|
||||
|
||||
turn.publish(ds, Present(me))
|
||||
|
@ -48,4 +50,3 @@ def main():
|
|||
await f.loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
|
||||
while line := (await reader.readline()).decode('utf-8'):
|
||||
turn.external(f, lambda: turn.send(ds, Says(me, line.strip())))
|
||||
turn.external(f, lambda: turn.stop(root_facet))
|
||||
|
|
|
@ -28,36 +28,47 @@ class System:
|
|||
def __init__(self, loop = None):
|
||||
self.loop = loop or asyncio.get_event_loop()
|
||||
self.inhabitant_count = 0
|
||||
self.exit_signal = asyncio.Queue()
|
||||
|
||||
def run(self, boot_proc, debug = False, name = None, configure_logging = True):
|
||||
def run(self, boot_proc, debug = None, name = None, configure_logging = True):
|
||||
if configure_logging:
|
||||
logging.basicConfig(level = logging.DEBUG if debug else logging.INFO)
|
||||
if debug:
|
||||
self.loop.set_debug(True)
|
||||
self.queue_task(lambda: Actor(boot_proc, system = self, name = name))
|
||||
|
||||
# From Python 3.12, we may be able to use:
|
||||
# asyncio.run(self._run, debug=debug, loop_factory=lambda: self.loop)
|
||||
# but until then:
|
||||
with asyncio.Runner(debug=debug, loop_factory=lambda: self.loop) as r:
|
||||
return r.run(self._run())
|
||||
|
||||
async def _run(self):
|
||||
try:
|
||||
self.loop.run_forever()
|
||||
except:
|
||||
traceback.print_exc()
|
||||
while asyncio.all_tasks(self.loop):
|
||||
self.loop.stop()
|
||||
self.loop.run_forever()
|
||||
self.loop.close()
|
||||
await self.exit_signal.get()
|
||||
finally:
|
||||
log.debug('System._run main loop exit')
|
||||
|
||||
def adjust_engine_inhabitant_count(self, delta):
|
||||
self.inhabitant_count = self.inhabitant_count + delta
|
||||
if self.inhabitant_count == 0:
|
||||
log.debug('Inhabitant count reached zero')
|
||||
self.loop.stop()
|
||||
self.exit_signal.put_nowait(())
|
||||
|
||||
def queue_task(self, thunk):
|
||||
async def task():
|
||||
await ensure_awaitable(thunk())
|
||||
try:
|
||||
await ensure_awaitable(thunk())
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
return self.loop.create_task(task())
|
||||
|
||||
def queue_task_threadsafe(self, thunk):
|
||||
async def task():
|
||||
await ensure_awaitable(thunk())
|
||||
try:
|
||||
await ensure_awaitable(thunk())
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
return self.loop.call_soon_threadsafe(lambda: asyncio.run_coroutine_threadsafe(task(), self.loop))
|
||||
|
||||
async def ensure_awaitable(value):
|
||||
|
@ -240,13 +251,20 @@ class Facet:
|
|||
task.cancel()
|
||||
task = None
|
||||
async def guarded_task():
|
||||
should_terminate_facet = True
|
||||
try:
|
||||
await coro_fn(self)
|
||||
if await coro_fn(self) is True:
|
||||
should_terminate_facet = False
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
Turn.external(self, cancel_linked_task)
|
||||
if should_terminate_facet:
|
||||
Turn.external(self, lambda: Turn.active.stop())
|
||||
else:
|
||||
Turn.external(self, cancel_linked_task)
|
||||
task = self.loop.create_task(guarded_task())
|
||||
self.linked_tasks.append(task)
|
||||
|
||||
|
|
Loading…
Reference in New Issue