UnixSocketConnection

This commit is contained in:
Tony Garnock-Jones 2018-11-22 12:15:27 +00:00
parent 8eb1d320ae
commit 563ad1503a
2 changed files with 29 additions and 11 deletions

View File

@ -10,7 +10,10 @@ Says = S.Record.makeConstructor('Says', 'who what')
if len(sys.argv) == 3:
conn = S.TcpConnection(sys.argv[1], int(sys.argv[2]))
elif len(sys.argv) == 2:
conn = S.WebsocketConnection(sys.argv[1])
if sys.argv[1].startswith('ws:') or sys.argv[1].startswith('wss:'):
conn = S.WebsocketConnection(sys.argv[1])
else:
conn = S.UnixSocketConnection(sys.argv[1])
elif len(sys.argv) == 1:
conn = S.WebsocketConnection('ws://localhost:8000/broker')
else:

View File

@ -126,11 +126,9 @@ class Connection(object):
def _disconnect(self):
raise Exception('subclassresponsibility')
class TcpConnection(Connection, asyncio.Protocol):
def __init__(self, host, port):
class _StreamConnection(Connection, asyncio.Protocol):
def __init__(self):
super().__init__()
self.host = host
self.port = port
self.decoder = None
self.stop_signal = None
self.transport = None
@ -157,6 +155,9 @@ class TcpConnection(Connection, asyncio.Protocol):
if self.stop_signal:
self.stop_signal.set_result(True)
async def _create_connection(self, loop):
raise Exception('subclassresponsibility')
async def main(self, loop, on_connected=None):
if self.transport is not None:
raise Exception('Cannot run connection twice!')
@ -164,12 +165,9 @@ class TcpConnection(Connection, asyncio.Protocol):
self.decoder = protocol.Decoder()
self.stop_signal = loop.create_future()
try:
_transport, _protocol = await loop.create_connection(
lambda: self,
self.host,
self.port)
_transport, _protocol = await self._create_connection(loop)
except OSError as e:
log.error('TcpConnection: Could not connect to broker: %s' % (e,))
log.error('%s: Could not connect to broker: %s' % (self.__class__.__qualname__, e))
return False
try:
@ -182,6 +180,23 @@ class TcpConnection(Connection, asyncio.Protocol):
self.stop_signal = None
self.decoder = None
class TcpConnection(_StreamConnection):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
async def _create_connection(self, loop):
return await loop.create_connection(lambda: self, self.host, self.port)
class UnixSocketConnection(_StreamConnection):
def __init__(self, path):
super().__init__()
self.path = path
async def _create_connection(self, loop):
return await loop.create_unix_connection(lambda: self, self.path)
class WebsocketConnection(Connection):
def __init__(self, url):
super().__init__()
@ -215,7 +230,7 @@ class WebsocketConnection(Connection):
except websockets.exceptions.ConnectionClosed:
pass
except OSError as e:
log.error('WebsocketConnection: Could not connect to broker: %s' % (e,))
log.error('%s: Could not connect to broker: %s' % (self.__class__.__qualname__, e))
return False
finally:
self._on_disconnected()