Reimplement following novy design

This commit is contained in:
Tony Garnock-Jones 2021-08-18 13:35:49 -04:00
parent d4387d645a
commit 10f4bc9e34
9 changed files with 407 additions and 655 deletions

View File

@ -1 +1,4 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
# This is 'import *' in order to effectively re-export preserves as part of this module's API.
from preserves import *

346
syndicate/actor.py Normal file
View File

@ -0,0 +1,346 @@
import asyncio
import inspect
import logging
import sys
import traceback
from .idgen import IdGenerator
log = logging.getLogger(__name__)
_next_actor_number = IdGenerator()
_next_handle = IdGenerator()
_next_facet_id = IdGenerator()
def start_actor_system(boot_proc):
loop = asyncio.get_event_loop()
loop.set_debug(True)
queue_task(lambda: Actor(boot_proc), loop = loop)
loop.run_forever()
loop.close()
def adjust_engine_inhabitant_count(delta):
loop = asyncio.get_running_loop()
if not hasattr(loop, '__syndicate_inhabitant_count'):
loop.__syndicate_inhabitant_count = 0
loop.__syndicate_inhabitant_count = loop.__syndicate_inhabitant_count + delta
if loop.__syndicate_inhabitant_count == 0:
log.debug('Inhabitant count reached zero')
loop.stop()
class Actor:
def __init__(self, boot_proc, name = None, initial_assertions = {}, daemon = False):
self.name = name or 'a' + str(next(_next_actor_number))
self._daemon = daemon
if not daemon:
adjust_engine_inhabitant_count(1)
self.root = Facet(self, None)
self.exit_reason = None # None -> running, True -> terminated OK, exn -> error
self.exit_hooks = []
self._log = None
Turn.run(Facet(self, self.root, initial_assertions = initial_assertions),
stop_if_inert_after(boot_proc))
def __repr__(self):
return '<Actor:%s>' % (self.name,)
@property
def daemon(self):
return self._daemon
@daemon.setter
def daemon(self, value):
if self._daemon != value:
self._daemon = value
adjust_engine_inhabitant_count(-1 if value else 1)
@property
def alive(self):
return self.exit_reason is None
@property
def log(self):
if self._log is None:
self._log = logging.getLogger('syndicate.Actor.%s' % (self.name,))
return self._log
def at_exit(self, hook):
self.exit_hooks.append(hook)
def terminate(self, turn, exit_reason):
if self.exit_reason is not None: return
self.exit_reason = exit_reason
if exit_reason != True:
self.log.error('crashed: %s' % (exit_reason,))
for h in self.exit_hooks:
h(turn)
def finish_termination():
Turn.run(self,
lambda turn: self.root._terminate(turn, exit_reason == True),
zombie_turn = True)
if not self._daemon:
adjust_engine_inhabitant_count(-1)
queue_task(finish_termination)
class Facet:
def __init__(self, actor, parent, initial_assertions = {}):
self.id = next(_next_facet_id)
self.actor = actor
self.parent = parent
if parent:
parent.children.add(self)
self.children = set()
self.outbound = initial_assertions
self.shutdown_actions = []
self.alive = True
self.inert_check_preventers = 0
@property
def log(self):
return self.actor.log
def _repr_labels(self):
pieces = []
f = self
while f.parent is not None:
pieces.append(str(f.id))
f = f.parent
pieces.append(self.actor.name)
pieces.reverse()
return ':'.join(pieces)
def __repr__(self):
return '<Facet:%s>' % (self._repr_labels(),)
def on_stop(self, a):
self.shutdown_actions.append(a)
def isinert(self):
return len(self.children) == 0 and len(self.outbound) == 0 and self.inert_check_preventers == 0
def prevent_inert_check(self):
armed = True
self.inert_check_preventers = self.inert_check_preventers + 1
def disarm():
nonlocal armed
if not armed: return
armed = False
self.inert_check_preventers = self.inert_check_preventers - 1
return disarm
def _terminate(self, turn, orderly):
if not self.alive: return
self.alive = False
parent = self.parent
if parent:
parent.children.remove(self)
with ActiveFacet(turn, self):
for child in list(self.children):
child._terminate(turn, orderly)
if orderly:
for h in self.shutdown_actions:
h(turn)
for e in self.outbound.values():
turn._retract(e)
if orderly:
if parent:
if parent.isinert():
Turn.run(parent, lambda turn: parent._terminate(turn, True))
else:
Turn.run(self.actor.root,
lambda turn: self.actor.terminate(turn, True),
zombie_turn = True)
class ActiveFacet:
def __init__(self, turn, facet):
self.turn = turn
self.outer_facet = None
self.inner_facet = facet
def __enter__(self):
self.outer_facet = self.turn.facet
self.turn.facet = self.inner_facet
return None
def __exit__(self, t, v, tb):
self.turn.facet = self.outer_facet
self.outer_facet = None
async def ensure_awaitable(value):
if inspect.isawaitable(value):
return await value
else:
return value
def queue_task(thunk, loop = asyncio):
async def task():
await ensure_awaitable(thunk())
return loop.create_task(task())
class Turn:
@classmethod
def run(cls, facet, action, zombie_turn = False):
if not zombie_turn:
if not facet.actor.alive: return
if not facet.alive: return
turn = cls(facet)
try:
action(turn)
except:
ei = sys.exc_info()
self.log.error('%s', ''.join(traceback.format_exception(*ei)))
Turn.run(facet.actor.root, lambda turn: facet.actor.terminate(turn, ei[1]))
else:
turn._deliver()
def __init__(self, facet):
self.facet = facet
self.queues = {}
@property
def log(self):
return self.facet.actor.log
def ref(self, entity):
return Ref(self.facet, entity)
def facet(self, boot_proc):
new_facet = Facet(self.facet.actor, self.facet)
with ActiveFacet(self, new_facet):
stop_if_inert_after(boot_proc)(self)
return new_facet
def stop(self, facet = None, continuation = None):
if facet is None:
facet = self.facet
def action(turn):
facet._terminate(turn, True)
if continuation is not None:
continuation(turn)
self._enqueue(facet.parent, action)
def spawn(self, boot_proc, name = None, initial_assertions = None, daemon = False):
def action(turn):
new_outbound = {}
if initial_assertions is not None:
for handle in initial_assertions:
new_outbound[handle] = self.facet.outbound[handle]
del self.facet.outbound[handle]
queue_task(lambda: Actor(boot_proc,
name = name,
initial_assertions = new_outbound,
daemon = daemon))
self._enqueue(self.facet, action)
def stop_actor(self):
self._enqueue(self.facet.actor.root, lambda turn: self.facet.actor.terminate(turn, True))
def crash(self, exn):
self._enqueue(self.facet.actor.root, lambda turn: self.facet.actor.terminate(turn, exn))
def publish(self, ref, assertion):
handle = next(_next_handle)
self._publish(ref, assertion, handle)
return handle
def _publish(self, ref, assertion, handle):
# TODO: attenuation
e = OutboundAssertion(handle, ref)
self.facet.outbound[handle] = e
def action(turn):
e.established = True
ref.entity.on_publish(turn, assertion, handle)
self._enqueue(ref.facet, action)
def retract(self, handle):
if handle is not None:
e = self.facet.outbound.get(handle, None)
if e is not None:
self._retract(e)
def replace(self, ref, handle, assertion):
new_handle = None if assertion is None else self.publish(ref, assertion)
self.retract(handle)
return new_handle
def _retract(self, e):
del self.facet.outbound[e.handle]
def action(turn):
if e.established:
e.established = False
e.ref.entity.on_retract(turn, e.handle)
self._enqueue(e.ref.facet, action)
def sync(self, ref, k):
class SyncContinuation(Entity):
def on_message(self, turn, _value):
k(turn)
self._sync(ref, self.ref(SyncContinuation()))
def _sync(self, ref, peer):
self._enqueue(ref.facet, lambda turn: ref.entity.on_sync(turn, peer))
def send(self, ref, message):
# TODO: attenuation
def action(turn):
ref.entity.on_message(turn, message)
self._enqueue(ref.facet, action)
def _enqueue(self, target_facet, action):
if target_facet not in self.queues:
self.queues[target_facet] = []
self.queues[target_facet].append(action)
def _deliver(self):
for (facet, q) in self.queues.items():
# Stupid python scoping bites again
def make_deliver_q(facet, q): # gratuitous
def deliver_q(turn):
for action in q:
action(turn)
return lambda: Turn.run(facet, deliver_q)
queue_task(make_deliver_q(facet, q))
self.queues = {}
def stop_if_inert_after(action):
def wrapped_action(turn):
action(turn)
def check_action(turn):
if (turn.facet.parent is not None and not turn.facet.parent.alive) \
or turn.facet.isinert():
turn.stop()
turn._enqueue(turn.facet, check_action)
return wrapped_action
class Ref:
def __init__(self, facet, entity):
self.facet = facet
self.entity = entity
def __repr__(self):
return '<Ref:%s/%r>' % (self.facet._repr_labels(), self.entity)
class OutboundAssertion:
def __init__(self, handle, ref):
self.handle = handle
self.ref = ref
self.established = False
# Can act as a mixin
class Entity:
def on_publish(self, turn, v, handle):
pass
def on_retract(self, turn, handle):
pass
def on_message(self, turn, v):
pass
def on_sync(self, turn, peer):
turn.send(peer, True)
_inert_entity = Entity()

41
syndicate/during.py Normal file
View File

@ -0,0 +1,41 @@
from . import actor
def _ignore(*args, **kwargs):
pass
def _default_sync(turn, peer):
turn.send(peer, True)
class During(actor.Entity):
def __init__(self, turn, on_add=None, on_msg=None, on_sync=None, name=None):
self.ref = turn.ref(self)
self.retract_handlers = {}
self._on_add = on_add or _ignore
self._on_msg = on_msg or _ignore
self._on_sync = on_sync or _default_sync
self.name = name
self.flatten_arg = True
def __repr__(self):
if self.name is None:
return super().__repr__()
return self.name
def _wrap(self, v):
return v if self.flatten_arg and isinstance(v, tuple) else (v,)
def on_publish(self, turn, v, handle):
retract_handler = self._on_add(turn, *self._wrap(v))
if retract_handler is not None:
self.retract_handlers[handle] = retract_handler
def on_retract(self, turn, handle):
if handle in self.retract_handlers:
self.retract_handlers[handle](turn)
del self.retract_handlers[handle]
def on_message(self, turn, v):
self._on_msg(turn, *self._wrap(v))
def on_sync(self, turn, peer):
self._on_sync(turn, peer)

11
syndicate/idgen.py Normal file
View File

@ -0,0 +1,11 @@
class IdGenerator:
def __init__(self, initial_value = 0):
self.next = initial_value
def __iter__(self):
return self
def __next__(self):
n = self.next
self.next = n + 1
return n

View File

@ -1 +0,0 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)

View File

@ -1,589 +0,0 @@
import asyncio
import secrets
import logging
import websockets
import re
from urllib.parse import urlparse, urlunparse
log = logging.getLogger(__name__)
import syndicate.mini.protocol as protocol
import syndicate.mini.url as url
from syndicate.mini.protocol import Capture, Discard, Observe
CAPTURE = Capture(Discard())
# This is 'import *' in order to effectively re-export preserves as part of this module's API.
from preserves import *
def _encode(event):
e = protocol.Encoder()
e.append(event)
return e.contents()
_instance_id = secrets.token_urlsafe(8)
_uuid_counter = 0
def uuid(prefix='__@syndicate'):
global _uuid_counter
c = _uuid_counter
_uuid_counter = c + 1
return prefix + '_' + _instance_id + '_' + str(c)
def _ignore(*args, **kwargs):
pass
class Turn(object):
def __init__(self, conn):
self.conn = conn
self.items = []
def _ensure(self, what):
if self.items is None:
raise Exception('Attempt to %s a completed Turn' % (what,))
def _extend(self, item):
self._ensure('extend')
self.items.append(item)
def _reset(self):
self._ensure('reset')
self.items.clear()
def _commit(self):
self._ensure('commit')
if self.items: self.conn._send(protocol.Turn(self.items))
self.items = None
def send(self, message):
self._extend(protocol.Message(message))
def merge_into(self, other):
self._ensure('merge from')
other._ensure('merge into')
if self.items: other.items.extend(self.items)
self.items.clear()
def __enter__(self):
return self
def __exit__(self, t, v, tb):
if t is None:
self._commit()
def _fresh_id(assertion):
return uuid('sub' if Observe.isClassOf(assertion) else 'pub')
class FacetSetupContext(object):
def __init__(self, turn, facet):
self.turn = turn
self.facet = facet
def __enter__(self):
return self.facet
def __exit__(self, t, v, tb):
if t is None:
self.facet.start(self.turn)
else:
self.facet.actor.kill(self.turn, (t, v, tb))
_next_actor_number = 0
def _next_actor_name():
global _next_actor_number
n = _next_actor_number
_next_actor_number = n + 1
return str(n)
class Actor(object):
def __init__(self, conn, name=None):
self.conn = conn
self.name = name or _next_actor_name()
self.children = set()
self.alive = True
self._log = None
@property
def log(self):
if self._log is None:
self._log = logging.getLogger('syndicate.mini.Actor.%s' % (self.name,))
return self._log
def is_alive(self):
return self.alive
def react(self, turn):
return FacetSetupContext(turn, Facet(self.conn, self, self))
def _add_child(self, facet):
if not self.alive:
raise Exception('Cannot add facet because actor is termianted')
self.children.add(facet)
def guard(self, turn):
return ActorGuard(turn, self)
def is_inert(self):
return len(self.children) == 0
def _stop(self, turn, pending, is_abort):
for child in list(self.children):
child._stop(turn, pending, is_abort)
self.children.clear()
def stop(self, turn):
self.alive = False
pending = []
self._stop(turn, pending, False)
for t in pending: t(turn)
def kill(self, turn, exc_info):
log.error('%r%s died', self, repr(self.name) if self.name else '', exc_info=exc_info)
for child in list(self.children):
child._abort(turn)
self.children.clear()
class ActorGuard(object):
def __init__(self, turn, actor):
self.turn = turn
self.actor = actor
def __enter__(self):
pass
def __exit__(self, t, v, tb):
if t is not None:
self.actor.kill(self.turn, (t, v, tb))
return True ## suppress exception
class Facet(object):
def __init__(self, conn, actor, parent):
self.conn = conn
self.actor = actor
self.parent = parent
if parent:
parent._add_child(self)
self.children = set()
self.start_callbacks = []
self.stop_callbacks = []
self.endpoints = []
self.state = 0
@property
def log(self):
return self.actor.log
def is_alive(self):
return self.state == 1
def _ensure_state(self, wanted_state, message):
if self.state != wanted_state:
raise Exception(message, self.state)
def _add_child(self, facet):
self._ensure_state(1, 'Cannot add child facet in this state')
self.children.add(facet)
def react(self, turn):
self._ensure_state(1, 'Cannot create subfacet in this state')
return FacetSetupContext(turn, Facet(self.conn, self.actor, self))
def add(self, *args, **kwargs):
self._ensure_state(0, 'Cannot add endpoint to facet in this state')
endpoint = Endpoint(self, *args, **kwargs)
self.endpoints.append(endpoint)
return endpoint
def on_start(self, callback):
self._ensure_state(0, 'Cannot add on_start callback to facet in this state')
self.start_callbacks.append(callback)
def on_stop(self, callback):
self._ensure_state(0, 'Cannot add on_stop callback to facet in this state')
self.stop_callbacks.append(callback)
def start(self, turn):
self._ensure_state(0, 'Cannot start facet in this state')
self.state = 1
for e in self.endpoints: e._start(turn)
for t in self.start_callbacks: t(turn)
self.start_callbacks.clear()
if not self.parent.is_alive() or self.is_inert():
self.stop(turn)
def is_inert(self):
return len(self.children) == 0 and len(self.endpoints) == 0
def _stop(self, turn, pending, is_abort):
if turn.conn is not self.conn:
raise Exception('Attempted to stop facet from wrong connection')
if self.state == 0 and not is_abort:
raise Exception('Cannot stop facet before starting it')
if self.state != 1: ## stopping is idempotent
return
self.state = 2
if self.parent:
self.parent.children.remove(self)
for child in list(self.children):
child._stop(turn, pending, is_abort)
self.children.clear()
if not is_abort:
pending.extend(self.stop_callbacks)
for endpoint in self.endpoints:
endpoint.clear(turn)
self.endpoints.clear()
if not is_abort:
if self.parent and self.parent.is_inert():
self.parent._stop(turn, pending, is_abort)
def stop(self, turn):
pending = []
self._stop(turn, pending, False)
for t in pending: t(turn)
def _abort(self, turn):
self._stop(turn, None, True)
class Endpoint(object):
def __init__(self, facet, assertion, on_add=None, on_del=None, on_msg=None):
self.facet = facet
self.assertion = assertion
self.id = None
self.on_add = on_add or _ignore
self.on_del = on_del or _ignore
self.on_msg = on_msg or _ignore
self.cache = set()
@property
def log(self):
return self.facet.log
def _start(self, turn):
self.set(turn, self.assertion)
def set(self, turn, new_assertion, on_transition=None):
if self.id is not None:
turn.conn._unmap_endpoint(turn, self, on_end=on_transition)
self.id = None
self.assertion = new_assertion
if self.assertion is not None:
self.id = _fresh_id(self.assertion)
turn.conn._map_endpoint(turn, self)
def clear(self, turn, on_cleared=None):
self.set(turn, None, on_transition=on_cleared)
def _reset(self, turn):
for captures in set(self.cache):
self._del(turn, captures)
def _add(self, turn, captures):
if captures in self.cache:
log.error('Server error: duplicate captures %r added for endpoint %r %r' % (
captures,
self.id,
self.assertion))
else:
self.cache.add(captures)
with self.facet.actor.guard(turn):
self.on_add(turn, *captures)
def _del(self, turn, captures):
if captures in self.cache:
self.cache.discard(captures)
with self.facet.actor.guard(turn):
self.on_del(turn, *captures)
else:
log.error('Server error: nonexistent captures %r removed from endpoint %r %r' % (
captures,
self.id,
self.assertion))
def _msg(self, turn, captures):
with self.facet.actor.guard(turn):
self.on_msg(turn, *captures)
class DummyEndpoint(object):
def _add(self, turn, captures): pass
def _del(self, turn, captures): pass
def _msg(self, turn, captures): pass
_dummy_endpoint = DummyEndpoint()
class Connection(object):
def __init__(self, scope):
self.scope = scope
self.endpoints = {}
self.end_callbacks = {}
self._is_connected = False
def is_connected(self):
return self._is_connected
def _each_endpoint(self):
return list(self.endpoints.values())
def turn(self):
return Turn(self)
def actor(self, name=None):
return Actor(self, name=name)
def destroy(self):
with self.turn() as t:
for ep in self._each_endpoint():
ep.clear(t)
t._reset() ## don't actually Clear the endpoints, we are about to disconnect
self._disconnect()
def _unmap_endpoint(self, turn, ep, on_end=None):
del self.endpoints[ep.id]
if on_end:
self.end_callbacks[ep.id] = on_end
turn._extend(protocol.Clear(ep.id))
def _on_end(self, turn, id):
if id in self.end_callbacks:
self.end_callbacks[id](turn)
del self.end_callbacks[id]
def _map_endpoint(self, turn, ep):
self.endpoints[ep.id] = ep
turn._extend(protocol.Assert(ep.id, ep.assertion))
def _on_disconnected(self):
self._is_connected = False
with self.turn() as t:
for ep in self._each_endpoint():
ep._reset(t)
t._reset() ## we have been disconnected, no point in keeping the actions
self._disconnect()
def _on_connected(self):
self._send(protocol.Connect(self.scope))
with self.turn() as t:
for ep in self._each_endpoint():
self._map_endpoint(t, ep)
self._is_connected = True
def _lookup(self, endpointId):
return self.endpoints.get(endpointId, _dummy_endpoint)
def _on_event(self, v):
with self.turn() as t:
self._handle_event(t, v)
def _handle_event(self, turn, v):
if protocol.Turn.isClassOf(v):
for item in protocol.Turn._items(v):
if protocol.Add.isClassOf(item): self._lookup(item[0])._add(turn, item[1])
elif protocol.Del.isClassOf(item): self._lookup(item[0])._del(turn, item[1])
elif protocol.Msg.isClassOf(item): self._lookup(item[0])._msg(turn, item[1])
elif protocol.End.isClassOf(item): self._on_end(turn, item[0])
else: log.error('Unhandled server Turn item: %r' % (item,))
return
elif protocol.Err.isClassOf(v):
self._on_error(v[0], v[1])
return
elif protocol.Ping.isClassOf(v):
self._send_bytes(_encode(protocol.Pong()))
return
else:
log.error('Unhandled server message: %r' % (v,))
def _on_error(self, detail, context):
log.error('%s: error from server: %r (context: %r)' % (
self.__class__.__qualname__, detail, context))
self._disconnect()
def _send(self, m):
return self._send_bytes(_encode(m))
def _send_bytes(self, bs, commitNeeded = False):
raise Exception('subclassresponsibility')
def _disconnect(self):
raise Exception('subclassresponsibility')
async def reconnecting_main(self, loop, on_connected=None, on_disconnected=None):
should_run = True
while should_run:
did_connect = await self.main(loop, on_connected=(on_connected or _default_on_connected))
should_run = await (on_disconnected or _default_on_disconnected)(did_connect)
@classmethod
def from_url(cls, s):
return url.connection_from_url(s)
async def _default_on_connected():
log.info('Connected')
async def _default_on_disconnected(did_connect):
if did_connect:
# Reconnect immediately
log.info('Disconnected')
else:
await asyncio.sleep(2)
return True
class _StreamConnection(Connection, asyncio.Protocol):
def __init__(self, scope):
super().__init__(scope)
self.decoder = None
self.stop_signal = None
self.transport = None
def connection_lost(self, exc):
self._on_disconnected()
def connection_made(self, transport):
self.transport = transport
self._on_connected()
def data_received(self, chunk):
self.decoder.extend(chunk)
while True:
v = self.decoder.try_next()
if v is None: break
self._on_event(v)
def _send_bytes(self, bs, commitNeeded = False):
if self.transport:
self.transport.write(bs)
if commitNeeded:
self.commitNeeded = True
def _disconnect(self):
if self.stop_signal:
self.stop_signal.get_loop().call_soon_threadsafe(
lambda: self.stop_signal.set_result(True))
async def _create_connection(self, loop):
raise Exception('subclassresponsibility')
async def main(self, loop, on_connected=None):
if self.transport is not None:
raise Exception('Cannot run connection twice!')
self.decoder = protocol.Decoder()
self.stop_signal = loop.create_future()
try:
_transport, _protocol = await self._create_connection(loop)
except OSError as e:
log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
return False
try:
if on_connected: await on_connected()
await self.stop_signal
return True
finally:
self.transport.close()
self.transport = None
self.stop_signal = None
self.decoder = None
@url.schema('tcp')
class TcpConnection(_StreamConnection):
def __init__(self, host, port, scope):
super().__init__(scope)
self.host = host
self.port = port
async def _create_connection(self, loop):
return await loop.create_connection(lambda: self, self.host, self.port)
@classmethod
def default_port(cls):
return 21369
@classmethod
def from_url(cls, s):
u = urlparse(s)
host, port = url._hostport(u.netloc, cls.default_port())
if not host: return
scope = u.fragment
return cls(host, port, scope)
@url.schema('unix')
class UnixSocketConnection(_StreamConnection):
def __init__(self, path, scope):
super().__init__(scope)
self.path = path
async def _create_connection(self, loop):
return await loop.create_unix_connection(lambda: self, self.path)
@classmethod
def from_url(cls, s):
u = urlparse(s)
return cls(u.path, u.fragment)
@url.schema('ws')
@url.schema('wss')
class WebsocketConnection(Connection):
def __init__(self, url, scope):
super().__init__(scope)
self.url = url
self.loop = None
self.ws = None
def _send_bytes(self, bs, commitNeeded = False):
if self.loop:
def _do_send():
if self.ws:
self.loop.create_task(self.ws.send(bs))
self.loop.call_soon_threadsafe(_do_send)
if commitNeeded:
self.commitNeeded = True
def _disconnect(self):
if self.loop:
def _do_disconnect():
if self.ws:
self.loop.create_task(self.ws.close())
self.loop.call_soon_threadsafe(_do_disconnect)
def __connection_error(self, e):
log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e))
return False
async def main(self, loop, on_connected=None):
if self.ws is not None:
raise Exception('Cannot run connection twice!')
self.loop = loop
try:
self.ws = await websockets.connect(self.url)
except OSError as e:
return self.__connection_error(e)
except websockets.exceptions.InvalidHandshake as e:
return self.__connection_error(e)
try:
if on_connected: await on_connected()
self._on_connected()
while True:
chunk = await self.ws.recv()
self._on_event(protocol.Decoder(chunk).next())
except websockets.exceptions.WebSocketException:
pass
finally:
self._on_disconnected()
if self.ws:
await self.ws.close()
self.loop = None
self.ws = None
return True
@classmethod
def from_url(cls, s):
u = urlparse(s)
return cls(urlunparse(u._replace(fragment='')), u.fragment)

View File

@ -1,32 +0,0 @@
import preserves
from preserves import Record
## Enrolment
Connect = Record.makeConstructor('Connect', 'scope')
## Bidirectional
Turn = Record.makeConstructor('Turn', 'items')
## Client -> Server
Assert = Record.makeConstructor('Assert', 'endpointName assertion')
Clear = Record.makeConstructor('Clear', 'endpointName')
Message = Record.makeConstructor('Message', 'body')
## Server -> Client
Add = Record.makeConstructor('Add', 'endpointName captures')
Del = Record.makeConstructor('Del', 'endpointName captures')
Msg = Record.makeConstructor('Msg', 'endpointName captures')
End = Record.makeConstructor('End', 'endpointName')
Err = Record.makeConstructor('Err', 'detail context')
## Bidirectional
Ping = Record.makeConstructor('Ping', '')
Pong = Record.makeConstructor('Pong', '')
## Standard Syndicate constructors
Observe = Record.makeConstructor('observe', 'specification')
Capture = Record.makeConstructor('capture', 'specification')
Discard = Record.makeConstructor('discard', '')
Decoder = preserves.Decoder
Encoder = preserves.Encoder

View File

@ -1,33 +0,0 @@
# URLs denoting Syndicate servers.
class InvalidSyndicateUrl(ValueError): pass
schemas = {}
def schema(schema_name):
def k(factory_class):
schemas[schema_name] = factory_class
return factory_class
return k
def _bad_url(u):
raise InvalidSyndicateUrl('Invalid Syndicate server URL', u)
def connection_from_url(u):
pieces = u.split(':', 1)
if len(pieces) != 2: _bad_url(u)
schema_name, _rest = pieces
if schema_name not in schemas: _bad_url(u)
conn = schemas[schema_name].from_url(u)
if not conn: _bad_url(u)
return conn
def _hostport(s, default_port):
try:
i = s.rindex(':')
except ValueError:
i = None
if i is not None:
return (s[:i], int(s[i+1:]))
else:
return (s, default_port)

6
syndicate/schema.py Normal file
View File

@ -0,0 +1,6 @@
from preserves.schema import load_schema_file
import pathlib
for (n, ns) in load_schema_file(pathlib.Path(__file__).parent /
'../../syndicate-protocols/schema-bundle.bin')._items().items():
globals()[n] = ns