diff --git a/chat.py b/chat.py index e3a6f42..99afd80 100644 --- a/chat.py +++ b/chat.py @@ -7,17 +7,17 @@ import syndicate.mini.core as S Present = S.Record.makeConstructor('Present', 'who') Says = S.Record.makeConstructor('Says', 'who what') -if len(sys.argv) == 3: - conn = S.TcpConnection(sys.argv[1], int(sys.argv[2])) -elif len(sys.argv) == 2: +if len(sys.argv) == 4: + conn = S.TcpConnection(sys.argv[1], int(sys.argv[2]), sys.argv[3]) +elif len(sys.argv) == 3: if sys.argv[1].startswith('ws:') or sys.argv[1].startswith('wss:'): - conn = S.WebsocketConnection(sys.argv[1]) + conn = S.WebsocketConnection(sys.argv[1], sys.argv[2]) else: - conn = S.UnixSocketConnection(sys.argv[1]) + conn = S.UnixSocketConnection(sys.argv[1], sys.argv[2]) elif len(sys.argv) == 1: - conn = S.WebsocketConnection('ws://localhost:8000/broker') + conn = S.WebsocketConnection('ws://localhost:8000/', 'broker') else: - sys.stderr.write(b'Usage: chat.py [ HOST PORT | WEBSOCKETURL ]\n') + sys.stderr.write(b'Usage: chat.py [ HOST PORT SCOPE | WEBSOCKETURL SCOPE ]\n') sys.exit(1) ## Courtesy of http://listofrandomnames.com/ :-) diff --git a/syndicate/mini/core.py b/syndicate/mini/core.py index 18049a5..23dbbfd 100644 --- a/syndicate/mini/core.py +++ b/syndicate/mini/core.py @@ -56,7 +56,7 @@ class Endpoint(object): def _add(self, captures): if captures in self.cache: - log.error('Broker error: duplicate captures %r added for endpoint %r %r' % ( + log.error('Server error: duplicate captures %r added for endpoint %r %r' % ( captures, self.id, self.assertion)) @@ -69,7 +69,7 @@ class Endpoint(object): self.cache.discard(captures) self.on_del(*captures) else: - log.error('Broker error: nonexistent captures %r removed from endpoint %r %r' % ( + log.error('Server error: nonexistent captures %r removed from endpoint %r %r' % ( captures, self.id, self.assertion)) @@ -85,8 +85,9 @@ class DummyEndpoint(object): _dummy_endpoint = DummyEndpoint() class Connection(object): - def __init__(self): + def __init__(self, scope): self.endpoints = {} + self.scope = scope def _each_endpoint(self): return list(self.endpoints.values()) @@ -119,6 +120,7 @@ class Connection(object): self._disconnect() def _on_connected(self): + self._send(self._encode(protocol.Connect(self.scope))) for ep in self._each_endpoint(): self._update_endpoint(ep) @@ -138,8 +140,8 @@ class Connection(object): raise Exception('subclassresponsibility') class _StreamConnection(Connection, asyncio.Protocol): - def __init__(self): - super().__init__() + def __init__(self, scope): + super().__init__(scope) self.decoder = None self.stop_signal = None self.transport = None @@ -179,7 +181,7 @@ class _StreamConnection(Connection, asyncio.Protocol): try: _transport, _protocol = await self._create_connection(loop) except OSError as e: - log.error('%s: Could not connect to broker: %s' % (self.__class__.__qualname__, e)) + log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e)) return False try: @@ -193,8 +195,8 @@ class _StreamConnection(Connection, asyncio.Protocol): self.decoder = None class TcpConnection(_StreamConnection): - def __init__(self, host, port): - super().__init__() + def __init__(self, host, port, scope): + super().__init__(scope) self.host = host self.port = port @@ -202,16 +204,16 @@ class TcpConnection(_StreamConnection): return await loop.create_connection(lambda: self, self.host, self.port) class UnixSocketConnection(_StreamConnection): - def __init__(self, path): - super().__init__() + def __init__(self, path, scope): + super().__init__(scope) self.path = path async def _create_connection(self, loop): return await loop.create_unix_connection(lambda: self, self.path) class WebsocketConnection(Connection): - def __init__(self, url): - super().__init__() + def __init__(self, url, scope): + super().__init__(scope) self.url = url self.loop = None self.ws = None @@ -242,7 +244,7 @@ class WebsocketConnection(Connection): except websockets.exceptions.ConnectionClosed: pass except OSError as e: - log.error('%s: Could not connect to broker: %s' % (self.__class__.__qualname__, e)) + log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e)) return False finally: self._on_disconnected() diff --git a/syndicate/mini/protocol.py b/syndicate/mini/protocol.py index ce7a92a..50d4be8 100644 --- a/syndicate/mini/protocol.py +++ b/syndicate/mini/protocol.py @@ -1,12 +1,16 @@ import preserves from preserves import Record, Symbol -## Client -> Broker +## Enrolment +Connect = Record.makeConstructor('Connect', 'scope') +Peer = Record.makeConstructor('Peer', 'scope') + +## Client -> Server Assert = Record.makeConstructor('Assert', 'endpointName assertion') Clear = Record.makeConstructor('Clear', 'endpointName') Message = Record.makeConstructor('Message', 'body') -## Broker -> Client +## Server -> Client Add = Record.makeConstructor('Add', 'endpointName captures') Del = Record.makeConstructor('Del', 'endpointName captures') Msg = Record.makeConstructor('Msg', 'endpointName captures')