diff --git a/chat.py b/chat.py index e7e4c3a..5c7a14c 100644 --- a/chat.py +++ b/chat.py @@ -10,7 +10,10 @@ Says = S.Record.makeConstructor('Says', 'who what') if len(sys.argv) == 3: conn = S.TcpConnection(sys.argv[1], int(sys.argv[2])) elif len(sys.argv) == 2: - conn = S.WebsocketConnection(sys.argv[1]) + if sys.argv[1].startswith('ws:') or sys.argv[1].startswith('wss:'): + conn = S.WebsocketConnection(sys.argv[1]) + else: + conn = S.UnixSocketConnection(sys.argv[1]) elif len(sys.argv) == 1: conn = S.WebsocketConnection('ws://localhost:8000/broker') else: diff --git a/syndicate/mini/core.py b/syndicate/mini/core.py index d275b29..8281a7c 100644 --- a/syndicate/mini/core.py +++ b/syndicate/mini/core.py @@ -126,11 +126,9 @@ class Connection(object): def _disconnect(self): raise Exception('subclassresponsibility') -class TcpConnection(Connection, asyncio.Protocol): - def __init__(self, host, port): +class _StreamConnection(Connection, asyncio.Protocol): + def __init__(self): super().__init__() - self.host = host - self.port = port self.decoder = None self.stop_signal = None self.transport = None @@ -157,6 +155,9 @@ class TcpConnection(Connection, asyncio.Protocol): if self.stop_signal: self.stop_signal.set_result(True) + async def _create_connection(self, loop): + raise Exception('subclassresponsibility') + async def main(self, loop, on_connected=None): if self.transport is not None: raise Exception('Cannot run connection twice!') @@ -164,12 +165,9 @@ class TcpConnection(Connection, asyncio.Protocol): self.decoder = protocol.Decoder() self.stop_signal = loop.create_future() try: - _transport, _protocol = await loop.create_connection( - lambda: self, - self.host, - self.port) + _transport, _protocol = await self._create_connection(loop) except OSError as e: - log.error('TcpConnection: Could not connect to broker: %s' % (e,)) + log.error('%s: Could not connect to broker: %s' % (self.__class__.__qualname__, e)) return False try: @@ -182,6 +180,23 @@ class TcpConnection(Connection, asyncio.Protocol): self.stop_signal = None self.decoder = None +class TcpConnection(_StreamConnection): + def __init__(self, host, port): + super().__init__() + self.host = host + self.port = port + + async def _create_connection(self, loop): + return await loop.create_connection(lambda: self, self.host, self.port) + +class UnixSocketConnection(_StreamConnection): + def __init__(self, path): + super().__init__() + self.path = path + + async def _create_connection(self, loop): + return await loop.create_unix_connection(lambda: self, self.path) + class WebsocketConnection(Connection): def __init__(self, url): super().__init__() @@ -215,7 +230,7 @@ class WebsocketConnection(Connection): except websockets.exceptions.ConnectionClosed: pass except OSError as e: - log.error('WebsocketConnection: Could not connect to broker: %s' % (e,)) + log.error('%s: Could not connect to broker: %s' % (self.__class__.__qualname__, e)) return False finally: self._on_disconnected()