2018-11-20 19:45:27 +00:00
|
|
|
import asyncio
|
|
|
|
import secrets
|
|
|
|
import logging
|
|
|
|
import websockets
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
import syndicate.mini.protocol as protocol
|
|
|
|
|
|
|
|
from syndicate.mini.protocol import Capture, Discard, Observe
|
|
|
|
CAPTURE = Capture(Discard())
|
|
|
|
|
|
|
|
from preserves import *
|
|
|
|
|
|
|
|
_instance_id = secrets.token_urlsafe(8)
|
|
|
|
_uuid_counter = 0
|
|
|
|
|
|
|
|
def uuid(prefix='__@syndicate'):
|
|
|
|
global _uuid_counter
|
|
|
|
c = _uuid_counter
|
|
|
|
_uuid_counter = c + 1
|
|
|
|
return prefix + '_' + _instance_id + '_' + str(c)
|
|
|
|
|
|
|
|
def _ignore(*args, **kwargs):
|
|
|
|
pass
|
|
|
|
|
|
|
|
class Endpoint(object):
|
2019-06-07 14:06:18 +00:00
|
|
|
def __init__(self, conn, assertion, id=None,
|
|
|
|
on_add=None, on_del=None, on_msg=None, on_end=None):
|
2018-11-20 19:45:27 +00:00
|
|
|
self.conn = conn
|
|
|
|
self.assertion = assertion
|
|
|
|
self.id = id or uuid('sub' if Observe.isClassOf(assertion) else 'pub')
|
|
|
|
self.on_add = on_add or _ignore
|
|
|
|
self.on_del = on_del or _ignore
|
|
|
|
self.on_msg = on_msg or _ignore
|
2019-06-07 14:06:18 +00:00
|
|
|
self.on_end = on_end or _ignore
|
2018-11-20 19:45:27 +00:00
|
|
|
self.cache = set()
|
|
|
|
self.conn._update_endpoint(self)
|
|
|
|
|
|
|
|
def set(self, new_assertion):
|
|
|
|
self.assertion = new_assertion
|
|
|
|
if self.conn:
|
|
|
|
self.conn._update_endpoint(self)
|
|
|
|
|
|
|
|
def send(self, message):
|
|
|
|
'''Shortcut to Connection.send.'''
|
|
|
|
if self.conn:
|
|
|
|
self.conn.send(message)
|
|
|
|
|
|
|
|
def destroy(self):
|
|
|
|
if self.conn:
|
|
|
|
self.conn._clear_endpoint(self)
|
|
|
|
self.conn = None
|
|
|
|
|
|
|
|
def _reset(self):
|
|
|
|
for captures in set(self.cache):
|
|
|
|
self._del(captures)
|
|
|
|
|
|
|
|
def _add(self, captures):
|
2019-03-22 14:21:10 +00:00
|
|
|
if captures in self.cache:
|
2019-05-16 11:46:22 +00:00
|
|
|
log.error('Server error: duplicate captures %r added for endpoint %r %r' % (
|
2019-03-22 14:21:10 +00:00
|
|
|
captures,
|
|
|
|
self.id,
|
|
|
|
self.assertion))
|
|
|
|
else:
|
2018-11-20 19:45:27 +00:00
|
|
|
self.cache.add(captures)
|
|
|
|
self.on_add(*captures)
|
|
|
|
|
|
|
|
def _del(self, captures):
|
|
|
|
if captures in self.cache:
|
|
|
|
self.cache.discard(captures)
|
|
|
|
self.on_del(*captures)
|
2019-03-22 14:21:10 +00:00
|
|
|
else:
|
2019-05-16 11:46:22 +00:00
|
|
|
log.error('Server error: nonexistent captures %r removed from endpoint %r %r' % (
|
2019-03-22 14:21:10 +00:00
|
|
|
captures,
|
|
|
|
self.id,
|
|
|
|
self.assertion))
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
def _msg(self, captures):
|
|
|
|
self.on_msg(*captures)
|
|
|
|
|
2019-06-07 14:06:18 +00:00
|
|
|
def _end(self):
|
|
|
|
self.on_end()
|
|
|
|
|
2018-11-20 19:45:27 +00:00
|
|
|
class DummyEndpoint(object):
|
|
|
|
def _add(self, captures): pass
|
|
|
|
def _del(self, captures): pass
|
|
|
|
def _msg(self, captures): pass
|
2019-06-07 14:06:18 +00:00
|
|
|
def _end(self): pass
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
_dummy_endpoint = DummyEndpoint()
|
|
|
|
|
|
|
|
class Connection(object):
|
2019-05-16 11:46:22 +00:00
|
|
|
def __init__(self, scope):
|
2018-11-20 19:45:27 +00:00
|
|
|
self.endpoints = {}
|
2019-05-16 11:46:22 +00:00
|
|
|
self.scope = scope
|
2019-05-30 21:35:56 +00:00
|
|
|
self.commitNeeded = False
|
|
|
|
self.worklist = []
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
def _each_endpoint(self):
|
|
|
|
return list(self.endpoints.values())
|
|
|
|
|
|
|
|
def destroy(self):
|
|
|
|
for ep in self._each_endpoint():
|
|
|
|
ep.destroy()
|
|
|
|
self._disconnect()
|
|
|
|
|
|
|
|
def _encode(self, event):
|
|
|
|
e = protocol.Encoder()
|
|
|
|
e.append(event)
|
|
|
|
return e.contents()
|
|
|
|
|
|
|
|
def _update_endpoint(self, ep):
|
|
|
|
self.endpoints[ep.id] = ep
|
2019-05-30 21:35:56 +00:00
|
|
|
self._send(self._encode(protocol.Assert(ep.id, ep.assertion)), commitNeeded = True)
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
def _clear_endpoint(self, ep):
|
|
|
|
if ep.id in self.endpoints:
|
|
|
|
del self.endpoints[ep.id]
|
2019-05-30 21:35:56 +00:00
|
|
|
self._send(self._encode(protocol.Clear(ep.id)), commitNeeded = True)
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
def send(self, message):
|
2019-05-30 21:35:56 +00:00
|
|
|
self._send(self._encode(protocol.Message(message)), commitNeeded = True)
|
|
|
|
self._commit_if_needed()
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
def _on_disconnected(self):
|
|
|
|
for ep in self._each_endpoint():
|
|
|
|
ep._reset()
|
|
|
|
self._disconnect()
|
|
|
|
|
|
|
|
def _on_connected(self):
|
2019-05-16 11:46:22 +00:00
|
|
|
self._send(self._encode(protocol.Connect(self.scope)))
|
2018-11-20 19:45:27 +00:00
|
|
|
for ep in self._each_endpoint():
|
|
|
|
self._update_endpoint(ep)
|
2019-05-30 21:35:56 +00:00
|
|
|
self._commit_work()
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
def _lookup(self, endpointId):
|
|
|
|
return self.endpoints.get(endpointId, _dummy_endpoint)
|
|
|
|
|
2019-05-30 21:35:56 +00:00
|
|
|
def _push_work(self, thunk):
|
|
|
|
self.worklist.append(thunk)
|
|
|
|
|
|
|
|
def _commit_work(self):
|
|
|
|
for thunk in self.worklist:
|
|
|
|
thunk()
|
|
|
|
self.worklist.clear()
|
|
|
|
self._commit_if_needed()
|
|
|
|
|
|
|
|
def _commit_if_needed(self):
|
|
|
|
if self.commitNeeded:
|
|
|
|
self._send(self._encode(protocol.Commit()))
|
|
|
|
self.commitNeeded = False
|
|
|
|
|
2018-11-20 19:45:27 +00:00
|
|
|
def _on_event(self, v):
|
2019-05-30 21:35:56 +00:00
|
|
|
if protocol.Add.isClassOf(v): return self._push_work(lambda: self._lookup(v[0])._add(v[1]))
|
|
|
|
if protocol.Del.isClassOf(v): return self._push_work(lambda: self._lookup(v[0])._del(v[1]))
|
|
|
|
if protocol.Msg.isClassOf(v): return self._push_work(lambda: self._lookup(v[0])._msg(v[1]))
|
2019-06-07 14:06:18 +00:00
|
|
|
if protocol.End.isClassOf(v): return self._push_work(lambda: self._lookup(v[0])._end())
|
2019-05-30 21:35:56 +00:00
|
|
|
if protocol.Commit.isClassOf(v): return self._commit_work()
|
2019-05-16 13:59:23 +00:00
|
|
|
if protocol.Err.isClassOf(v): return self._on_error(v[0])
|
2019-03-22 12:52:25 +00:00
|
|
|
if protocol.Ping.isClassOf(v): self._send(self._encode(protocol.Pong()))
|
2018-11-20 19:45:27 +00:00
|
|
|
|
2019-05-16 13:59:23 +00:00
|
|
|
def _on_error(self, detail):
|
|
|
|
log.error('%s: error from server: %r' % (self.__class__.__qualname__, detail))
|
|
|
|
self._disconnect()
|
|
|
|
|
2019-05-30 21:35:56 +00:00
|
|
|
def _send(self, bs, commitNeeded = False):
|
2018-11-20 19:45:27 +00:00
|
|
|
raise Exception('subclassresponsibility')
|
|
|
|
|
|
|
|
def _disconnect(self):
|
|
|
|
raise Exception('subclassresponsibility')
|
|
|
|
|
2018-11-22 12:15:27 +00:00
|
|
|
class _StreamConnection(Connection, asyncio.Protocol):
|
2019-05-16 11:46:22 +00:00
|
|
|
def __init__(self, scope):
|
|
|
|
super().__init__(scope)
|
2018-11-20 19:45:27 +00:00
|
|
|
self.decoder = None
|
|
|
|
self.stop_signal = None
|
|
|
|
self.transport = None
|
|
|
|
|
|
|
|
def connection_lost(self, exc):
|
|
|
|
self._on_disconnected()
|
|
|
|
|
|
|
|
def connection_made(self, transport):
|
|
|
|
self.transport = transport
|
|
|
|
self._on_connected()
|
|
|
|
|
|
|
|
def data_received(self, chunk):
|
|
|
|
self.decoder.extend(chunk)
|
|
|
|
while True:
|
|
|
|
v = self.decoder.try_next()
|
|
|
|
if v is None: break
|
|
|
|
self._on_event(v)
|
|
|
|
|
2019-05-30 21:35:56 +00:00
|
|
|
def _send(self, bs, commitNeeded = False):
|
2018-11-20 19:45:27 +00:00
|
|
|
if self.transport:
|
|
|
|
self.transport.write(bs)
|
2019-05-30 21:35:56 +00:00
|
|
|
if commitNeeded:
|
|
|
|
self.commitNeeded = True
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
def _disconnect(self):
|
|
|
|
if self.stop_signal:
|
2019-03-22 14:21:21 +00:00
|
|
|
self.stop_signal.get_loop().call_soon_threadsafe(
|
|
|
|
lambda: self.stop_signal.set_result(True))
|
2018-11-20 19:45:27 +00:00
|
|
|
|
2018-11-22 12:15:27 +00:00
|
|
|
async def _create_connection(self, loop):
|
|
|
|
raise Exception('subclassresponsibility')
|
|
|
|
|
2018-11-21 13:22:00 +00:00
|
|
|
async def main(self, loop, on_connected=None):
|
2018-11-20 19:45:27 +00:00
|
|
|
if self.transport is not None:
|
|
|
|
raise Exception('Cannot run connection twice!')
|
|
|
|
|
|
|
|
self.decoder = protocol.Decoder()
|
|
|
|
self.stop_signal = loop.create_future()
|
|
|
|
try:
|
2018-11-22 12:15:27 +00:00
|
|
|
_transport, _protocol = await self._create_connection(loop)
|
2018-11-21 11:10:45 +00:00
|
|
|
except OSError as e:
|
2019-05-16 11:46:22 +00:00
|
|
|
log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
|
2018-11-20 19:45:27 +00:00
|
|
|
return False
|
|
|
|
|
|
|
|
try:
|
2018-11-21 13:22:00 +00:00
|
|
|
if on_connected: on_connected()
|
2018-11-20 19:45:27 +00:00
|
|
|
await self.stop_signal
|
|
|
|
return True
|
|
|
|
finally:
|
|
|
|
self.transport.close()
|
|
|
|
self.transport = None
|
|
|
|
self.stop_signal = None
|
|
|
|
self.decoder = None
|
|
|
|
|
2018-11-22 12:15:27 +00:00
|
|
|
class TcpConnection(_StreamConnection):
|
2019-05-16 11:46:22 +00:00
|
|
|
def __init__(self, host, port, scope):
|
|
|
|
super().__init__(scope)
|
2018-11-22 12:15:27 +00:00
|
|
|
self.host = host
|
|
|
|
self.port = port
|
|
|
|
|
|
|
|
async def _create_connection(self, loop):
|
|
|
|
return await loop.create_connection(lambda: self, self.host, self.port)
|
|
|
|
|
|
|
|
class UnixSocketConnection(_StreamConnection):
|
2019-05-16 11:46:22 +00:00
|
|
|
def __init__(self, path, scope):
|
|
|
|
super().__init__(scope)
|
2018-11-22 12:15:27 +00:00
|
|
|
self.path = path
|
|
|
|
|
|
|
|
async def _create_connection(self, loop):
|
|
|
|
return await loop.create_unix_connection(lambda: self, self.path)
|
|
|
|
|
2018-11-20 19:45:27 +00:00
|
|
|
class WebsocketConnection(Connection):
|
2019-05-16 11:46:22 +00:00
|
|
|
def __init__(self, url, scope):
|
|
|
|
super().__init__(scope)
|
2018-11-20 19:45:27 +00:00
|
|
|
self.url = url
|
|
|
|
self.loop = None
|
|
|
|
self.ws = None
|
|
|
|
|
2019-05-30 21:35:56 +00:00
|
|
|
def _send(self, bs, commitNeeded = False):
|
2019-05-16 14:00:00 +00:00
|
|
|
if self.loop:
|
|
|
|
def _do_send():
|
|
|
|
if self.ws:
|
|
|
|
self.loop.create_task(self.ws.send(bs))
|
|
|
|
self.loop.call_soon_threadsafe(_do_send)
|
2019-05-30 21:35:56 +00:00
|
|
|
if commitNeeded:
|
|
|
|
self.commitNeeded = True
|
2018-11-20 19:45:27 +00:00
|
|
|
|
|
|
|
def _disconnect(self):
|
2019-05-16 14:00:00 +00:00
|
|
|
if self.loop:
|
|
|
|
def _do_disconnect():
|
|
|
|
if self.ws:
|
|
|
|
self.loop.create_task(self.ws.close())
|
|
|
|
self.loop.call_soon_threadsafe(_do_disconnect)
|
2018-11-20 19:45:27 +00:00
|
|
|
|
2018-11-21 13:22:00 +00:00
|
|
|
async def main(self, loop, on_connected=None):
|
2018-11-20 19:45:27 +00:00
|
|
|
if self.ws is not None:
|
|
|
|
raise Exception('Cannot run connection twice!')
|
|
|
|
|
|
|
|
self.loop = loop
|
|
|
|
|
|
|
|
try:
|
|
|
|
async with websockets.connect(self.url) as ws:
|
2018-11-21 13:22:00 +00:00
|
|
|
if on_connected: on_connected()
|
2018-11-20 19:45:27 +00:00
|
|
|
self.ws = ws
|
|
|
|
self._on_connected()
|
|
|
|
try:
|
|
|
|
while True:
|
2019-05-16 14:00:12 +00:00
|
|
|
chunk = await ws.recv()
|
2018-11-20 19:45:27 +00:00
|
|
|
self._on_event(protocol.Decoder(chunk).next())
|
|
|
|
except websockets.exceptions.ConnectionClosed:
|
|
|
|
pass
|
2018-11-21 11:10:45 +00:00
|
|
|
except OSError as e:
|
2019-05-16 11:46:22 +00:00
|
|
|
log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
|
2018-11-20 19:45:27 +00:00
|
|
|
return False
|
|
|
|
finally:
|
|
|
|
self._on_disconnected()
|
|
|
|
|
|
|
|
if self.ws:
|
|
|
|
await self.ws.close()
|
|
|
|
self.loop = None
|
|
|
|
self.ws = None
|
|
|
|
return True
|