diff --git a/syndicate/__init__.py b/syndicate/__init__.py index 69e3be5..9afe956 100644 --- a/syndicate/__init__.py +++ b/syndicate/__init__.py @@ -1 +1,4 @@ __path__ = __import__('pkgutil').extend_path(__path__, __name__) + +# This is 'import *' in order to effectively re-export preserves as part of this module's API. +from preserves import * diff --git a/syndicate/actor.py b/syndicate/actor.py new file mode 100644 index 0000000..a64bdcb --- /dev/null +++ b/syndicate/actor.py @@ -0,0 +1,346 @@ +import asyncio +import inspect +import logging +import sys +import traceback + +from .idgen import IdGenerator + +log = logging.getLogger(__name__) + +_next_actor_number = IdGenerator() +_next_handle = IdGenerator() +_next_facet_id = IdGenerator() + +def start_actor_system(boot_proc): + loop = asyncio.get_event_loop() + loop.set_debug(True) + queue_task(lambda: Actor(boot_proc), loop = loop) + loop.run_forever() + loop.close() + +def adjust_engine_inhabitant_count(delta): + loop = asyncio.get_running_loop() + if not hasattr(loop, '__syndicate_inhabitant_count'): + loop.__syndicate_inhabitant_count = 0 + loop.__syndicate_inhabitant_count = loop.__syndicate_inhabitant_count + delta + if loop.__syndicate_inhabitant_count == 0: + log.debug('Inhabitant count reached zero') + loop.stop() + +class Actor: + def __init__(self, boot_proc, name = None, initial_assertions = {}, daemon = False): + self.name = name or 'a' + str(next(_next_actor_number)) + self._daemon = daemon + if not daemon: + adjust_engine_inhabitant_count(1) + self.root = Facet(self, None) + self.exit_reason = None # None -> running, True -> terminated OK, exn -> error + self.exit_hooks = [] + self._log = None + Turn.run(Facet(self, self.root, initial_assertions = initial_assertions), + stop_if_inert_after(boot_proc)) + + def __repr__(self): + return '' % (self.name,) + + @property + def daemon(self): + return self._daemon + + @daemon.setter + def daemon(self, value): + if self._daemon != value: + self._daemon = value + adjust_engine_inhabitant_count(-1 if value else 1) + + @property + def alive(self): + return self.exit_reason is None + + @property + def log(self): + if self._log is None: + self._log = logging.getLogger('syndicate.Actor.%s' % (self.name,)) + return self._log + + def at_exit(self, hook): + self.exit_hooks.append(hook) + + def terminate(self, turn, exit_reason): + if self.exit_reason is not None: return + self.exit_reason = exit_reason + if exit_reason != True: + self.log.error('crashed: %s' % (exit_reason,)) + for h in self.exit_hooks: + h(turn) + def finish_termination(): + Turn.run(self, + lambda turn: self.root._terminate(turn, exit_reason == True), + zombie_turn = True) + if not self._daemon: + adjust_engine_inhabitant_count(-1) + queue_task(finish_termination) + +class Facet: + def __init__(self, actor, parent, initial_assertions = {}): + self.id = next(_next_facet_id) + self.actor = actor + self.parent = parent + if parent: + parent.children.add(self) + self.children = set() + self.outbound = initial_assertions + self.shutdown_actions = [] + self.alive = True + self.inert_check_preventers = 0 + + @property + def log(self): + return self.actor.log + + def _repr_labels(self): + pieces = [] + f = self + while f.parent is not None: + pieces.append(str(f.id)) + f = f.parent + pieces.append(self.actor.name) + pieces.reverse() + return ':'.join(pieces) + + def __repr__(self): + return '' % (self._repr_labels(),) + + def on_stop(self, a): + self.shutdown_actions.append(a) + + def isinert(self): + return len(self.children) == 0 and len(self.outbound) == 0 and self.inert_check_preventers == 0 + + def prevent_inert_check(self): + armed = True + self.inert_check_preventers = self.inert_check_preventers + 1 + def disarm(): + nonlocal armed + if not armed: return + armed = False + self.inert_check_preventers = self.inert_check_preventers - 1 + return disarm + + def _terminate(self, turn, orderly): + if not self.alive: return + self.alive = False + + parent = self.parent + if parent: + parent.children.remove(self) + + with ActiveFacet(turn, self): + for child in list(self.children): + child._terminate(turn, orderly) + if orderly: + for h in self.shutdown_actions: + h(turn) + for e in self.outbound.values(): + turn._retract(e) + + if orderly: + if parent: + if parent.isinert(): + Turn.run(parent, lambda turn: parent._terminate(turn, True)) + else: + Turn.run(self.actor.root, + lambda turn: self.actor.terminate(turn, True), + zombie_turn = True) + +class ActiveFacet: + def __init__(self, turn, facet): + self.turn = turn + self.outer_facet = None + self.inner_facet = facet + + def __enter__(self): + self.outer_facet = self.turn.facet + self.turn.facet = self.inner_facet + return None + + def __exit__(self, t, v, tb): + self.turn.facet = self.outer_facet + self.outer_facet = None + +async def ensure_awaitable(value): + if inspect.isawaitable(value): + return await value + else: + return value + +def queue_task(thunk, loop = asyncio): + async def task(): + await ensure_awaitable(thunk()) + return loop.create_task(task()) + +class Turn: + @classmethod + def run(cls, facet, action, zombie_turn = False): + if not zombie_turn: + if not facet.actor.alive: return + if not facet.alive: return + turn = cls(facet) + try: + action(turn) + except: + ei = sys.exc_info() + self.log.error('%s', ''.join(traceback.format_exception(*ei))) + Turn.run(facet.actor.root, lambda turn: facet.actor.terminate(turn, ei[1])) + else: + turn._deliver() + + def __init__(self, facet): + self.facet = facet + self.queues = {} + + @property + def log(self): + return self.facet.actor.log + + def ref(self, entity): + return Ref(self.facet, entity) + + def facet(self, boot_proc): + new_facet = Facet(self.facet.actor, self.facet) + with ActiveFacet(self, new_facet): + stop_if_inert_after(boot_proc)(self) + return new_facet + + def stop(self, facet = None, continuation = None): + if facet is None: + facet = self.facet + def action(turn): + facet._terminate(turn, True) + if continuation is not None: + continuation(turn) + self._enqueue(facet.parent, action) + + def spawn(self, boot_proc, name = None, initial_assertions = None, daemon = False): + def action(turn): + new_outbound = {} + if initial_assertions is not None: + for handle in initial_assertions: + new_outbound[handle] = self.facet.outbound[handle] + del self.facet.outbound[handle] + queue_task(lambda: Actor(boot_proc, + name = name, + initial_assertions = new_outbound, + daemon = daemon)) + self._enqueue(self.facet, action) + + def stop_actor(self): + self._enqueue(self.facet.actor.root, lambda turn: self.facet.actor.terminate(turn, True)) + + def crash(self, exn): + self._enqueue(self.facet.actor.root, lambda turn: self.facet.actor.terminate(turn, exn)) + + def publish(self, ref, assertion): + handle = next(_next_handle) + self._publish(ref, assertion, handle) + return handle + + def _publish(self, ref, assertion, handle): + # TODO: attenuation + e = OutboundAssertion(handle, ref) + self.facet.outbound[handle] = e + def action(turn): + e.established = True + ref.entity.on_publish(turn, assertion, handle) + self._enqueue(ref.facet, action) + + def retract(self, handle): + if handle is not None: + e = self.facet.outbound.get(handle, None) + if e is not None: + self._retract(e) + + def replace(self, ref, handle, assertion): + new_handle = None if assertion is None else self.publish(ref, assertion) + self.retract(handle) + return new_handle + + def _retract(self, e): + del self.facet.outbound[e.handle] + def action(turn): + if e.established: + e.established = False + e.ref.entity.on_retract(turn, e.handle) + self._enqueue(e.ref.facet, action) + + def sync(self, ref, k): + class SyncContinuation(Entity): + def on_message(self, turn, _value): + k(turn) + self._sync(ref, self.ref(SyncContinuation())) + + def _sync(self, ref, peer): + self._enqueue(ref.facet, lambda turn: ref.entity.on_sync(turn, peer)) + + def send(self, ref, message): + # TODO: attenuation + def action(turn): + ref.entity.on_message(turn, message) + self._enqueue(ref.facet, action) + + def _enqueue(self, target_facet, action): + if target_facet not in self.queues: + self.queues[target_facet] = [] + self.queues[target_facet].append(action) + + def _deliver(self): + for (facet, q) in self.queues.items(): + # Stupid python scoping bites again + def make_deliver_q(facet, q): # gratuitous + def deliver_q(turn): + for action in q: + action(turn) + return lambda: Turn.run(facet, deliver_q) + queue_task(make_deliver_q(facet, q)) + self.queues = {} + +def stop_if_inert_after(action): + def wrapped_action(turn): + action(turn) + def check_action(turn): + if (turn.facet.parent is not None and not turn.facet.parent.alive) \ + or turn.facet.isinert(): + turn.stop() + turn._enqueue(turn.facet, check_action) + return wrapped_action + +class Ref: + def __init__(self, facet, entity): + self.facet = facet + self.entity = entity + + def __repr__(self): + return '' % (self.facet._repr_labels(), self.entity) + +class OutboundAssertion: + def __init__(self, handle, ref): + self.handle = handle + self.ref = ref + self.established = False + +# Can act as a mixin +class Entity: + def on_publish(self, turn, v, handle): + pass + + def on_retract(self, turn, handle): + pass + + def on_message(self, turn, v): + pass + + def on_sync(self, turn, peer): + turn.send(peer, True) + +_inert_entity = Entity() diff --git a/syndicate/during.py b/syndicate/during.py new file mode 100644 index 0000000..215f880 --- /dev/null +++ b/syndicate/during.py @@ -0,0 +1,41 @@ +from . import actor + +def _ignore(*args, **kwargs): + pass + +def _default_sync(turn, peer): + turn.send(peer, True) + +class During(actor.Entity): + def __init__(self, turn, on_add=None, on_msg=None, on_sync=None, name=None): + self.ref = turn.ref(self) + self.retract_handlers = {} + self._on_add = on_add or _ignore + self._on_msg = on_msg or _ignore + self._on_sync = on_sync or _default_sync + self.name = name + self.flatten_arg = True + + def __repr__(self): + if self.name is None: + return super().__repr__() + return self.name + + def _wrap(self, v): + return v if self.flatten_arg and isinstance(v, tuple) else (v,) + + def on_publish(self, turn, v, handle): + retract_handler = self._on_add(turn, *self._wrap(v)) + if retract_handler is not None: + self.retract_handlers[handle] = retract_handler + + def on_retract(self, turn, handle): + if handle in self.retract_handlers: + self.retract_handlers[handle](turn) + del self.retract_handlers[handle] + + def on_message(self, turn, v): + self._on_msg(turn, *self._wrap(v)) + + def on_sync(self, turn, peer): + self._on_sync(turn, peer) diff --git a/syndicate/idgen.py b/syndicate/idgen.py new file mode 100644 index 0000000..92d4921 --- /dev/null +++ b/syndicate/idgen.py @@ -0,0 +1,11 @@ +class IdGenerator: + def __init__(self, initial_value = 0): + self.next = initial_value + + def __iter__(self): + return self + + def __next__(self): + n = self.next + self.next = n + 1 + return n diff --git a/syndicate/mini/__init__.py b/syndicate/mini/__init__.py deleted file mode 100644 index 69e3be5..0000000 --- a/syndicate/mini/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__path__ = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/syndicate/mini/core.py b/syndicate/mini/core.py deleted file mode 100644 index c897b78..0000000 --- a/syndicate/mini/core.py +++ /dev/null @@ -1,589 +0,0 @@ -import asyncio -import secrets -import logging -import websockets -import re -from urllib.parse import urlparse, urlunparse - -log = logging.getLogger(__name__) - -import syndicate.mini.protocol as protocol -import syndicate.mini.url as url - -from syndicate.mini.protocol import Capture, Discard, Observe -CAPTURE = Capture(Discard()) - -# This is 'import *' in order to effectively re-export preserves as part of this module's API. -from preserves import * - -def _encode(event): - e = protocol.Encoder() - e.append(event) - return e.contents() - -_instance_id = secrets.token_urlsafe(8) -_uuid_counter = 0 - -def uuid(prefix='__@syndicate'): - global _uuid_counter - c = _uuid_counter - _uuid_counter = c + 1 - return prefix + '_' + _instance_id + '_' + str(c) - -def _ignore(*args, **kwargs): - pass - -class Turn(object): - def __init__(self, conn): - self.conn = conn - self.items = [] - - def _ensure(self, what): - if self.items is None: - raise Exception('Attempt to %s a completed Turn' % (what,)) - - def _extend(self, item): - self._ensure('extend') - self.items.append(item) - - def _reset(self): - self._ensure('reset') - self.items.clear() - - def _commit(self): - self._ensure('commit') - if self.items: self.conn._send(protocol.Turn(self.items)) - self.items = None - - def send(self, message): - self._extend(protocol.Message(message)) - - def merge_into(self, other): - self._ensure('merge from') - other._ensure('merge into') - if self.items: other.items.extend(self.items) - self.items.clear() - - def __enter__(self): - return self - - def __exit__(self, t, v, tb): - if t is None: - self._commit() - -def _fresh_id(assertion): - return uuid('sub' if Observe.isClassOf(assertion) else 'pub') - -class FacetSetupContext(object): - def __init__(self, turn, facet): - self.turn = turn - self.facet = facet - - def __enter__(self): - return self.facet - - def __exit__(self, t, v, tb): - if t is None: - self.facet.start(self.turn) - else: - self.facet.actor.kill(self.turn, (t, v, tb)) - -_next_actor_number = 0 -def _next_actor_name(): - global _next_actor_number - n = _next_actor_number - _next_actor_number = n + 1 - return str(n) - -class Actor(object): - def __init__(self, conn, name=None): - self.conn = conn - self.name = name or _next_actor_name() - self.children = set() - self.alive = True - self._log = None - - @property - def log(self): - if self._log is None: - self._log = logging.getLogger('syndicate.mini.Actor.%s' % (self.name,)) - return self._log - - def is_alive(self): - return self.alive - - def react(self, turn): - return FacetSetupContext(turn, Facet(self.conn, self, self)) - - def _add_child(self, facet): - if not self.alive: - raise Exception('Cannot add facet because actor is termianted') - self.children.add(facet) - - def guard(self, turn): - return ActorGuard(turn, self) - - def is_inert(self): - return len(self.children) == 0 - - def _stop(self, turn, pending, is_abort): - for child in list(self.children): - child._stop(turn, pending, is_abort) - self.children.clear() - - def stop(self, turn): - self.alive = False - pending = [] - self._stop(turn, pending, False) - for t in pending: t(turn) - - def kill(self, turn, exc_info): - log.error('%r%s died', self, repr(self.name) if self.name else '', exc_info=exc_info) - for child in list(self.children): - child._abort(turn) - self.children.clear() - -class ActorGuard(object): - def __init__(self, turn, actor): - self.turn = turn - self.actor = actor - - def __enter__(self): - pass - - def __exit__(self, t, v, tb): - if t is not None: - self.actor.kill(self.turn, (t, v, tb)) - return True ## suppress exception - -class Facet(object): - def __init__(self, conn, actor, parent): - self.conn = conn - self.actor = actor - self.parent = parent - if parent: - parent._add_child(self) - self.children = set() - self.start_callbacks = [] - self.stop_callbacks = [] - self.endpoints = [] - self.state = 0 - - @property - def log(self): - return self.actor.log - - def is_alive(self): - return self.state == 1 - - def _ensure_state(self, wanted_state, message): - if self.state != wanted_state: - raise Exception(message, self.state) - - def _add_child(self, facet): - self._ensure_state(1, 'Cannot add child facet in this state') - self.children.add(facet) - - def react(self, turn): - self._ensure_state(1, 'Cannot create subfacet in this state') - return FacetSetupContext(turn, Facet(self.conn, self.actor, self)) - - def add(self, *args, **kwargs): - self._ensure_state(0, 'Cannot add endpoint to facet in this state') - endpoint = Endpoint(self, *args, **kwargs) - self.endpoints.append(endpoint) - return endpoint - - def on_start(self, callback): - self._ensure_state(0, 'Cannot add on_start callback to facet in this state') - self.start_callbacks.append(callback) - - def on_stop(self, callback): - self._ensure_state(0, 'Cannot add on_stop callback to facet in this state') - self.stop_callbacks.append(callback) - - def start(self, turn): - self._ensure_state(0, 'Cannot start facet in this state') - self.state = 1 - for e in self.endpoints: e._start(turn) - for t in self.start_callbacks: t(turn) - self.start_callbacks.clear() - if not self.parent.is_alive() or self.is_inert(): - self.stop(turn) - - def is_inert(self): - return len(self.children) == 0 and len(self.endpoints) == 0 - - def _stop(self, turn, pending, is_abort): - if turn.conn is not self.conn: - raise Exception('Attempted to stop facet from wrong connection') - if self.state == 0 and not is_abort: - raise Exception('Cannot stop facet before starting it') - if self.state != 1: ## stopping is idempotent - return - self.state = 2 - - if self.parent: - self.parent.children.remove(self) - - for child in list(self.children): - child._stop(turn, pending, is_abort) - self.children.clear() - - if not is_abort: - pending.extend(self.stop_callbacks) - - for endpoint in self.endpoints: - endpoint.clear(turn) - self.endpoints.clear() - - if not is_abort: - if self.parent and self.parent.is_inert(): - self.parent._stop(turn, pending, is_abort) - - def stop(self, turn): - pending = [] - self._stop(turn, pending, False) - for t in pending: t(turn) - - def _abort(self, turn): - self._stop(turn, None, True) - -class Endpoint(object): - def __init__(self, facet, assertion, on_add=None, on_del=None, on_msg=None): - self.facet = facet - self.assertion = assertion - self.id = None - self.on_add = on_add or _ignore - self.on_del = on_del or _ignore - self.on_msg = on_msg or _ignore - self.cache = set() - - @property - def log(self): - return self.facet.log - - def _start(self, turn): - self.set(turn, self.assertion) - - def set(self, turn, new_assertion, on_transition=None): - if self.id is not None: - turn.conn._unmap_endpoint(turn, self, on_end=on_transition) - self.id = None - self.assertion = new_assertion - if self.assertion is not None: - self.id = _fresh_id(self.assertion) - turn.conn._map_endpoint(turn, self) - - def clear(self, turn, on_cleared=None): - self.set(turn, None, on_transition=on_cleared) - - def _reset(self, turn): - for captures in set(self.cache): - self._del(turn, captures) - - def _add(self, turn, captures): - if captures in self.cache: - log.error('Server error: duplicate captures %r added for endpoint %r %r' % ( - captures, - self.id, - self.assertion)) - else: - self.cache.add(captures) - with self.facet.actor.guard(turn): - self.on_add(turn, *captures) - - def _del(self, turn, captures): - if captures in self.cache: - self.cache.discard(captures) - with self.facet.actor.guard(turn): - self.on_del(turn, *captures) - else: - log.error('Server error: nonexistent captures %r removed from endpoint %r %r' % ( - captures, - self.id, - self.assertion)) - - def _msg(self, turn, captures): - with self.facet.actor.guard(turn): - self.on_msg(turn, *captures) - -class DummyEndpoint(object): - def _add(self, turn, captures): pass - def _del(self, turn, captures): pass - def _msg(self, turn, captures): pass - -_dummy_endpoint = DummyEndpoint() - -class Connection(object): - def __init__(self, scope): - self.scope = scope - self.endpoints = {} - self.end_callbacks = {} - self._is_connected = False - - def is_connected(self): - return self._is_connected - - def _each_endpoint(self): - return list(self.endpoints.values()) - - def turn(self): - return Turn(self) - - def actor(self, name=None): - return Actor(self, name=name) - - def destroy(self): - with self.turn() as t: - for ep in self._each_endpoint(): - ep.clear(t) - t._reset() ## don't actually Clear the endpoints, we are about to disconnect - self._disconnect() - - def _unmap_endpoint(self, turn, ep, on_end=None): - del self.endpoints[ep.id] - if on_end: - self.end_callbacks[ep.id] = on_end - turn._extend(protocol.Clear(ep.id)) - - def _on_end(self, turn, id): - if id in self.end_callbacks: - self.end_callbacks[id](turn) - del self.end_callbacks[id] - - def _map_endpoint(self, turn, ep): - self.endpoints[ep.id] = ep - turn._extend(protocol.Assert(ep.id, ep.assertion)) - - def _on_disconnected(self): - self._is_connected = False - with self.turn() as t: - for ep in self._each_endpoint(): - ep._reset(t) - t._reset() ## we have been disconnected, no point in keeping the actions - self._disconnect() - - def _on_connected(self): - self._send(protocol.Connect(self.scope)) - with self.turn() as t: - for ep in self._each_endpoint(): - self._map_endpoint(t, ep) - self._is_connected = True - - def _lookup(self, endpointId): - return self.endpoints.get(endpointId, _dummy_endpoint) - - def _on_event(self, v): - with self.turn() as t: - self._handle_event(t, v) - - def _handle_event(self, turn, v): - if protocol.Turn.isClassOf(v): - for item in protocol.Turn._items(v): - if protocol.Add.isClassOf(item): self._lookup(item[0])._add(turn, item[1]) - elif protocol.Del.isClassOf(item): self._lookup(item[0])._del(turn, item[1]) - elif protocol.Msg.isClassOf(item): self._lookup(item[0])._msg(turn, item[1]) - elif protocol.End.isClassOf(item): self._on_end(turn, item[0]) - else: log.error('Unhandled server Turn item: %r' % (item,)) - return - elif protocol.Err.isClassOf(v): - self._on_error(v[0], v[1]) - return - elif protocol.Ping.isClassOf(v): - self._send_bytes(_encode(protocol.Pong())) - return - else: - log.error('Unhandled server message: %r' % (v,)) - - def _on_error(self, detail, context): - log.error('%s: error from server: %r (context: %r)' % ( - self.__class__.__qualname__, detail, context)) - self._disconnect() - - def _send(self, m): - return self._send_bytes(_encode(m)) - - def _send_bytes(self, bs, commitNeeded = False): - raise Exception('subclassresponsibility') - - def _disconnect(self): - raise Exception('subclassresponsibility') - - async def reconnecting_main(self, loop, on_connected=None, on_disconnected=None): - should_run = True - while should_run: - did_connect = await self.main(loop, on_connected=(on_connected or _default_on_connected)) - should_run = await (on_disconnected or _default_on_disconnected)(did_connect) - - @classmethod - def from_url(cls, s): - return url.connection_from_url(s) - -async def _default_on_connected(): - log.info('Connected') - -async def _default_on_disconnected(did_connect): - if did_connect: - # Reconnect immediately - log.info('Disconnected') - else: - await asyncio.sleep(2) - return True - -class _StreamConnection(Connection, asyncio.Protocol): - def __init__(self, scope): - super().__init__(scope) - self.decoder = None - self.stop_signal = None - self.transport = None - - def connection_lost(self, exc): - self._on_disconnected() - - def connection_made(self, transport): - self.transport = transport - self._on_connected() - - def data_received(self, chunk): - self.decoder.extend(chunk) - while True: - v = self.decoder.try_next() - if v is None: break - self._on_event(v) - - def _send_bytes(self, bs, commitNeeded = False): - if self.transport: - self.transport.write(bs) - if commitNeeded: - self.commitNeeded = True - - def _disconnect(self): - if self.stop_signal: - self.stop_signal.get_loop().call_soon_threadsafe( - lambda: self.stop_signal.set_result(True)) - - async def _create_connection(self, loop): - raise Exception('subclassresponsibility') - - async def main(self, loop, on_connected=None): - if self.transport is not None: - raise Exception('Cannot run connection twice!') - - self.decoder = protocol.Decoder() - self.stop_signal = loop.create_future() - try: - _transport, _protocol = await self._create_connection(loop) - except OSError as e: - log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e)) - return False - - try: - if on_connected: await on_connected() - await self.stop_signal - return True - finally: - self.transport.close() - self.transport = None - self.stop_signal = None - self.decoder = None - -@url.schema('tcp') -class TcpConnection(_StreamConnection): - def __init__(self, host, port, scope): - super().__init__(scope) - self.host = host - self.port = port - - async def _create_connection(self, loop): - return await loop.create_connection(lambda: self, self.host, self.port) - - @classmethod - def default_port(cls): - return 21369 - - @classmethod - def from_url(cls, s): - u = urlparse(s) - host, port = url._hostport(u.netloc, cls.default_port()) - if not host: return - scope = u.fragment - return cls(host, port, scope) - -@url.schema('unix') -class UnixSocketConnection(_StreamConnection): - def __init__(self, path, scope): - super().__init__(scope) - self.path = path - - async def _create_connection(self, loop): - return await loop.create_unix_connection(lambda: self, self.path) - - @classmethod - def from_url(cls, s): - u = urlparse(s) - return cls(u.path, u.fragment) - -@url.schema('ws') -@url.schema('wss') -class WebsocketConnection(Connection): - def __init__(self, url, scope): - super().__init__(scope) - self.url = url - self.loop = None - self.ws = None - - def _send_bytes(self, bs, commitNeeded = False): - if self.loop: - def _do_send(): - if self.ws: - self.loop.create_task(self.ws.send(bs)) - self.loop.call_soon_threadsafe(_do_send) - if commitNeeded: - self.commitNeeded = True - - def _disconnect(self): - if self.loop: - def _do_disconnect(): - if self.ws: - self.loop.create_task(self.ws.close()) - self.loop.call_soon_threadsafe(_do_disconnect) - - def __connection_error(self, e): - log.error('%s: Could not connect to server: %s' % (self.__class__.__qualname__, e)) - return False - - async def main(self, loop, on_connected=None): - if self.ws is not None: - raise Exception('Cannot run connection twice!') - - self.loop = loop - - try: - self.ws = await websockets.connect(self.url) - except OSError as e: - return self.__connection_error(e) - except websockets.exceptions.InvalidHandshake as e: - return self.__connection_error(e) - - try: - if on_connected: await on_connected() - self._on_connected() - while True: - chunk = await self.ws.recv() - self._on_event(protocol.Decoder(chunk).next()) - except websockets.exceptions.WebSocketException: - pass - finally: - self._on_disconnected() - - if self.ws: - await self.ws.close() - self.loop = None - self.ws = None - return True - - @classmethod - def from_url(cls, s): - u = urlparse(s) - return cls(urlunparse(u._replace(fragment='')), u.fragment) diff --git a/syndicate/mini/protocol.py b/syndicate/mini/protocol.py deleted file mode 100644 index b7d8e59..0000000 --- a/syndicate/mini/protocol.py +++ /dev/null @@ -1,32 +0,0 @@ -import preserves -from preserves import Record - -## Enrolment -Connect = Record.makeConstructor('Connect', 'scope') - -## Bidirectional -Turn = Record.makeConstructor('Turn', 'items') - -## Client -> Server -Assert = Record.makeConstructor('Assert', 'endpointName assertion') -Clear = Record.makeConstructor('Clear', 'endpointName') -Message = Record.makeConstructor('Message', 'body') - -## Server -> Client -Add = Record.makeConstructor('Add', 'endpointName captures') -Del = Record.makeConstructor('Del', 'endpointName captures') -Msg = Record.makeConstructor('Msg', 'endpointName captures') -End = Record.makeConstructor('End', 'endpointName') -Err = Record.makeConstructor('Err', 'detail context') - -## Bidirectional -Ping = Record.makeConstructor('Ping', '') -Pong = Record.makeConstructor('Pong', '') - -## Standard Syndicate constructors -Observe = Record.makeConstructor('observe', 'specification') -Capture = Record.makeConstructor('capture', 'specification') -Discard = Record.makeConstructor('discard', '') - -Decoder = preserves.Decoder -Encoder = preserves.Encoder diff --git a/syndicate/mini/url.py b/syndicate/mini/url.py deleted file mode 100644 index a5aa2fe..0000000 --- a/syndicate/mini/url.py +++ /dev/null @@ -1,33 +0,0 @@ -# URLs denoting Syndicate servers. - -class InvalidSyndicateUrl(ValueError): pass - -schemas = {} - -def schema(schema_name): - def k(factory_class): - schemas[schema_name] = factory_class - return factory_class - return k - -def _bad_url(u): - raise InvalidSyndicateUrl('Invalid Syndicate server URL', u) - -def connection_from_url(u): - pieces = u.split(':', 1) - if len(pieces) != 2: _bad_url(u) - schema_name, _rest = pieces - if schema_name not in schemas: _bad_url(u) - conn = schemas[schema_name].from_url(u) - if not conn: _bad_url(u) - return conn - -def _hostport(s, default_port): - try: - i = s.rindex(':') - except ValueError: - i = None - if i is not None: - return (s[:i], int(s[i+1:])) - else: - return (s, default_port) diff --git a/syndicate/schema.py b/syndicate/schema.py new file mode 100644 index 0000000..d636d34 --- /dev/null +++ b/syndicate/schema.py @@ -0,0 +1,6 @@ +from preserves.schema import load_schema_file +import pathlib + +for (n, ns) in load_schema_file(pathlib.Path(__file__).parent / + '../../syndicate-protocols/schema-bundle.bin')._items().items(): + globals()[n] = ns