New connection setup protocol

This commit is contained in:
Tony Garnock-Jones 2019-05-16 12:46:22 +01:00
parent 905cba6697
commit 3d4b0d547c
3 changed files with 28 additions and 22 deletions

14
chat.py
View File

@ -7,17 +7,17 @@ import syndicate.mini.core as S
Present = S.Record.makeConstructor('Present', 'who') Present = S.Record.makeConstructor('Present', 'who')
Says = S.Record.makeConstructor('Says', 'who what') Says = S.Record.makeConstructor('Says', 'who what')
if len(sys.argv) == 3: if len(sys.argv) == 4:
conn = S.TcpConnection(sys.argv[1], int(sys.argv[2])) conn = S.TcpConnection(sys.argv[1], int(sys.argv[2]), sys.argv[3])
elif len(sys.argv) == 2: elif len(sys.argv) == 3:
if sys.argv[1].startswith('ws:') or sys.argv[1].startswith('wss:'): if sys.argv[1].startswith('ws:') or sys.argv[1].startswith('wss:'):
conn = S.WebsocketConnection(sys.argv[1]) conn = S.WebsocketConnection(sys.argv[1], sys.argv[2])
else: else:
conn = S.UnixSocketConnection(sys.argv[1]) conn = S.UnixSocketConnection(sys.argv[1], sys.argv[2])
elif len(sys.argv) == 1: elif len(sys.argv) == 1:
conn = S.WebsocketConnection('ws://localhost:8000/broker') conn = S.WebsocketConnection('ws://localhost:8000/', 'broker')
else: else:
sys.stderr.write(b'Usage: chat.py [ HOST PORT | WEBSOCKETURL ]\n') sys.stderr.write(b'Usage: chat.py [ HOST PORT SCOPE | WEBSOCKETURL SCOPE ]\n')
sys.exit(1) sys.exit(1)
## Courtesy of http://listofrandomnames.com/ :-) ## Courtesy of http://listofrandomnames.com/ :-)

View File

@ -56,7 +56,7 @@ class Endpoint(object):
def _add(self, captures): def _add(self, captures):
if captures in self.cache: if captures in self.cache:
log.error('Broker error: duplicate captures %r added for endpoint %r %r' % ( log.error('Server error: duplicate captures %r added for endpoint %r %r' % (
captures, captures,
self.id, self.id,
self.assertion)) self.assertion))
@ -69,7 +69,7 @@ class Endpoint(object):
self.cache.discard(captures) self.cache.discard(captures)
self.on_del(*captures) self.on_del(*captures)
else: else:
log.error('Broker error: nonexistent captures %r removed from endpoint %r %r' % ( log.error('Server error: nonexistent captures %r removed from endpoint %r %r' % (
captures, captures,
self.id, self.id,
self.assertion)) self.assertion))
@ -85,8 +85,9 @@ class DummyEndpoint(object):
_dummy_endpoint = DummyEndpoint() _dummy_endpoint = DummyEndpoint()
class Connection(object): class Connection(object):
def __init__(self): def __init__(self, scope):
self.endpoints = {} self.endpoints = {}
self.scope = scope
def _each_endpoint(self): def _each_endpoint(self):
return list(self.endpoints.values()) return list(self.endpoints.values())
@ -119,6 +120,7 @@ class Connection(object):
self._disconnect() self._disconnect()
def _on_connected(self): def _on_connected(self):
self._send(self._encode(protocol.Connect(self.scope)))
for ep in self._each_endpoint(): for ep in self._each_endpoint():
self._update_endpoint(ep) self._update_endpoint(ep)
@ -138,8 +140,8 @@ class Connection(object):
raise Exception('subclassresponsibility') raise Exception('subclassresponsibility')
class _StreamConnection(Connection, asyncio.Protocol): class _StreamConnection(Connection, asyncio.Protocol):
def __init__(self): def __init__(self, scope):
super().__init__() super().__init__(scope)
self.decoder = None self.decoder = None
self.stop_signal = None self.stop_signal = None
self.transport = None self.transport = None
@ -179,7 +181,7 @@ class _StreamConnection(Connection, asyncio.Protocol):
try: try:
_transport, _protocol = await self._create_connection(loop) _transport, _protocol = await self._create_connection(loop)
except OSError as e: except OSError as e:
log.error('%s: Could not connect to broker: %s' % (self.__class__.__qualname__, e)) log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
return False return False
try: try:
@ -193,8 +195,8 @@ class _StreamConnection(Connection, asyncio.Protocol):
self.decoder = None self.decoder = None
class TcpConnection(_StreamConnection): class TcpConnection(_StreamConnection):
def __init__(self, host, port): def __init__(self, host, port, scope):
super().__init__() super().__init__(scope)
self.host = host self.host = host
self.port = port self.port = port
@ -202,16 +204,16 @@ class TcpConnection(_StreamConnection):
return await loop.create_connection(lambda: self, self.host, self.port) return await loop.create_connection(lambda: self, self.host, self.port)
class UnixSocketConnection(_StreamConnection): class UnixSocketConnection(_StreamConnection):
def __init__(self, path): def __init__(self, path, scope):
super().__init__() super().__init__(scope)
self.path = path self.path = path
async def _create_connection(self, loop): async def _create_connection(self, loop):
return await loop.create_unix_connection(lambda: self, self.path) return await loop.create_unix_connection(lambda: self, self.path)
class WebsocketConnection(Connection): class WebsocketConnection(Connection):
def __init__(self, url): def __init__(self, url, scope):
super().__init__() super().__init__(scope)
self.url = url self.url = url
self.loop = None self.loop = None
self.ws = None self.ws = None
@ -242,7 +244,7 @@ class WebsocketConnection(Connection):
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
pass pass
except OSError as e: except OSError as e:
log.error('%s: Could not connect to broker: %s' % (self.__class__.__qualname__, e)) log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
return False return False
finally: finally:
self._on_disconnected() self._on_disconnected()

View File

@ -1,12 +1,16 @@
import preserves import preserves
from preserves import Record, Symbol from preserves import Record, Symbol
## Client -> Broker ## Enrolment
Connect = Record.makeConstructor('Connect', 'scope')
Peer = Record.makeConstructor('Peer', 'scope')
## Client -> Server
Assert = Record.makeConstructor('Assert', 'endpointName assertion') Assert = Record.makeConstructor('Assert', 'endpointName assertion')
Clear = Record.makeConstructor('Clear', 'endpointName') Clear = Record.makeConstructor('Clear', 'endpointName')
Message = Record.makeConstructor('Message', 'body') Message = Record.makeConstructor('Message', 'body')
## Broker -> Client ## Server -> Client
Add = Record.makeConstructor('Add', 'endpointName captures') Add = Record.makeConstructor('Add', 'endpointName captures')
Del = Record.makeConstructor('Del', 'endpointName captures') Del = Record.makeConstructor('Del', 'endpointName captures')
Msg = Record.makeConstructor('Msg', 'endpointName captures') Msg = Record.makeConstructor('Msg', 'endpointName captures')