diff --git a/chat.py b/chat.py index 525a485..00b40bb 100644 --- a/chat.py +++ b/chat.py @@ -1,7 +1,6 @@ import sys import asyncio import random -import threading import syndicate from syndicate import patterns as P, actor from syndicate.schema import simpleChatProtocol, gatekeeper, sturdy, dataspace @@ -25,29 +24,35 @@ def print(*items): _print(*items) sys.stdout.flush() -def on_presence(turn, who): - print('%s joined' % (who,)) - return lambda turn: print('%s left' % (who,)) - def main_facet(turn, root_facet, ds): print('main_facet', ds) f = turn._facet turn.publish(ds, Present(me)) + + def on_presence(turn, who): + print('%s joined' % (who,)) + return lambda turn: print('%s left' % (who,)) turn.publish(ds, dataspace.Observe(P.rec('Present', P.CAPTURE), During(turn, on_add = on_presence).ref)) - turn.publish(ds, dataspace.Observe(P.rec('Says', P.CAPTURE, P.CAPTURE), During( - turn, - on_msg = lambda turn, who, what: print('%s says %r' % (who, what))).ref)) + + def on_says(turn, who, what): + print('%s says %r' % (who, what)) + turn.publish(ds, dataspace.Observe(P.rec('Says', P.CAPTURE, P.CAPTURE), + During(turn, on_msg = on_says).ref)) loop = asyncio.get_running_loop() - def accept_input(): + async def accept_input(): + reader = asyncio.StreamReader() + print(await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)) while True: - line = sys.stdin.readline() + line = await reader.readline() + line = line.decode('utf-8') if not line: actor.Turn.external(loop, f, lambda turn: turn.stop(root_facet)) break actor.Turn.external(loop, f, lambda turn: turn.send(ds, Says(me, line.strip()))) - threading.Thread(target=accept_input, daemon=True).start() + input_task = loop.create_task(accept_input()) + turn._facet.on_stop(lambda turn: input_task.cancel()) def main(turn): root_facet = turn._facet