From 3b34b86bf1c8916afc07f4c7b7a8b18b03071661 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 23 Jun 2019 21:43:52 +0100 Subject: [PATCH] Actors; Facets; logging; error handling --- chat.py | 13 +-- ovlinfo.py | 18 +--- setup.py | 2 +- syndicate/mini/core.py | 240 +++++++++++++++++++++++++++++++++++++---- 4 files changed, 233 insertions(+), 40 deletions(-) diff --git a/chat.py b/chat.py index 3df4253..648c35f 100644 --- a/chat.py +++ b/chat.py @@ -33,12 +33,13 @@ names = ['Daria', 'Kendra', 'Danny', 'Rufus', 'Diana', 'Arnetta', 'Dominick', 'M me = random.choice(names) + '_' + str(random.randint(10, 1000)) with conn.turn() as t: - S.Endpoint(t, Present(me)) - S.Endpoint(t, S.Observe(Present(S.CAPTURE)), - on_add=lambda t, who: print(who, 'joined'), - on_del=lambda t, who: print(who, 'left')) - S.Endpoint(t, S.Observe(Says(S.CAPTURE, S.CAPTURE)), - on_msg=lambda t, who, what: print(who, 'said', repr(what))) + with conn.actor().react(t) as facet: + facet.add(Present(me)) + facet.add(S.Observe(Present(S.CAPTURE)), + on_add=lambda t, who: print(who, 'joined'), + on_del=lambda t, who: print(who, 'left')) + facet.add(S.Observe(Says(S.CAPTURE, S.CAPTURE)), + on_msg=lambda t, who, what: print(who, 'said', repr(what))) async def on_connected(): print('-'*50, 'Connected') diff --git a/ovlinfo.py b/ovlinfo.py index aa9d044..f3e0c8c 100644 --- a/ovlinfo.py +++ b/ovlinfo.py @@ -19,22 +19,14 @@ def summarise_uplinks(): print(repr(uplinks)) with conn.turn() as t: - S.Endpoint(t, S.Observe(OverlayLink(S.CAPTURE, S.CAPTURE)), - on_add=add_uplink, - on_del=del_uplink) - -async def on_connected(): - print('-'*50, 'Connected') -async def on_disconnected(did_connect): - if did_connect: - print('-'*50, 'Disconnected') - else: - await asyncio.sleep(2) - return True + with conn.actor().react(t) as facet: + facet.add(S.Observe(OverlayLink(S.CAPTURE, S.CAPTURE)), + on_add=add_uplink, + on_del=del_uplink) loop = asyncio.get_event_loop() loop.set_debug(True) -loop.run_until_complete(conn.reconnecting_main(loop, on_connected, on_disconnected)) +loop.run_until_complete(conn.reconnecting_main(loop)) loop.stop() loop.run_forever() loop.close() diff --git a/setup.py b/setup.py index 820f052..b3920b7 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ except ImportError: setup( name="mini-syndicate", - version="0.0.4", + version="0.0.5", author="Tony Garnock-Jones", author_email="tonyg@leastfixedpoint.com", license="GNU General Public License v3 or later (GPLv3+)", diff --git a/syndicate/mini/core.py b/syndicate/mini/core.py index 8bc3eb8..cc3bdc7 100644 --- a/syndicate/mini/core.py +++ b/syndicate/mini/core.py @@ -37,23 +37,33 @@ class Turn(object): self.conn = conn self.items = [] + def _ensure(self, what): + if self.items is None: + raise Exception('Attempt to %s a completed Turn' % (what,)) + def _extend(self, item): + self._ensure('extend') self.items.append(item) def _reset(self): + self._ensure('reset') self.items.clear() def _commit(self): - if self.items: - self.conn._send(protocol.Turn(self.items)) - self._reset() + self._ensure('commit') + if self.items: self.conn._send(protocol.Turn(self.items)) + self.items = None def send(self, message): self._extend(protocol.Message(message)) + def merge_into(self, other): + self._ensure('merge from') + other._ensure('merge into') + if self.items: other.items.extend(self.items) + self.items.clear() + def __enter__(self): - if self.items: - raise Exception('Cannot reenter with statement for Turn') return self def __exit__(self, t, v, tb): @@ -63,16 +73,189 @@ class Turn(object): def _fresh_id(assertion): return uuid('sub' if Observe.isClassOf(assertion) else 'pub') +class FacetSetupContext(object): + def __init__(self, turn, facet): + self.turn = turn + self.facet = facet + + def __enter__(self): + return self.facet + + def __exit__(self, t, v, tb): + if t is None: + self.facet.start(self.turn) + else: + self.facet.actor.kill(self.turn, (t, v, tb)) + +_next_actor_number = 0 +def _next_actor_name(): + global _next_actor_number + n = _next_actor_number + _next_actor_number = n + 1 + return str(n) + +class Actor(object): + def __init__(self, conn, name=None): + self.conn = conn + self.name = name or _next_actor_name() + self.children = set() + self.alive = True + self._log = None + + @property + def log(self): + if self._log is None: + self._log = logging.getLogger('syndicate.mini.Actor.%s' % (self.name,)) + return self._log + + def react(self, turn): + return FacetSetupContext(turn, Facet(self.conn, self, self)) + + def _add_child(self, facet): + if not self.alive: + raise Exception('Cannot add facet because actor is termianted') + self.children.add(facet) + + def guard(self, turn): + return ActorGuard(turn, self) + + def is_inert(self): + return len(self.children) == 0 + + def _stop(self, turn, pending, is_abort): + for child in list(self.children): + child._stop(turn, pending, is_abort) + self.children.clear() + + def stop(self, turn): + self.alive = False + pending = [] + self._stop(turn, pending, False) + for t in pending: t(turn) + + def kill(self, turn, exc_info): + log.error('%r%s died', self, repr(self.name) if self.name else '', exc_info=exc_info) + for child in list(self.children): + child._abort(turn) + self.children.clear() + +class ActorGuard(object): + def __init__(self, turn, actor): + self.turn = turn + self.actor = actor + + def __enter__(self): + pass + + def __exit__(self, t, v, tb): + if t is not None: + self.actor.kill(self.turn, (t, v, tb)) + return True ## suppress exception + +class Facet(object): + def __init__(self, conn, actor, parent): + self.conn = conn + self.actor = actor + self.parent = parent + if parent: + parent._add_child(self) + self.children = set() + self.start_callbacks = [] + self.stop_callbacks = [] + self.endpoints = [] + self.state = 0 + + @property + def log(self): + return self.actor.log + + def _ensure_state(self, wanted_state, message): + if self.state != wanted_state: + raise Exception(message) + + def _add_child(self, facet): + self._ensure_state(1, 'Cannot add child facet in this state') + self.children.add(facet) + + def react(self, turn): + self._ensure_state(1, 'Cannot create subfacet in this state') + return FacetSetupContext(turn, Facet(self.conn, self.actor, self)) + + def add(self, *args, **kwargs): + self._ensure_state(0, 'Cannot add endpoint to facet in this state') + endpoint = Endpoint(self, *args, **kwargs) + self.endpoints.append(endpoint) + return endpoint + + def on_start(self, callback): + self._ensure_state(0, 'Cannot add on_start callback to facet in this state') + self.start_callbacks.append(callback) + + def on_stop(self, callback): + self._ensure_state(0, 'Cannot add on_stop callback to facet in this state') + self.stop_callbacks.append(callback) + + def start(self, turn): + self._ensure_state(0, 'Cannot start facet in this state') + self.state = 1 + for e in self.endpoints: e._start(turn) + for t in self.start_callbacks: t(turn) + self.start_callbacks.clear() + + def is_inert(self): + return len(self.children) == 0 and len(self.endpoints) == 0 + + def _stop(self, turn, pending, is_abort): + if turn.conn is not self.conn: + raise Exception('Attempted to stop facet from wrong connection') + if self.state == 0 and not is_abort: + raise Exception('Cannot stop facet before starting it') + if self.state != 1: ## stopping is idempotent + return + self.state = 2 + + if self.parent: + self.parent.children.remove(self) + + for child in list(self.children): + child._stop(turn, pending, is_abort) + self.children.clear() + + if not is_abort: + pending.extend(self.stop_callbacks) + + for endpoint in self.endpoints: + endpoint.clear(turn) + self.endpoints.clear() + + if not is_abort: + if self.parent and self.parent.is_inert(): + self.parent._stop(turn, pending, is_abort) + + def stop(self, turn): + pending = [] + self._stop(turn, pending, False) + for t in pending: t(turn) + + def _abort(self, turn): + self._stop(turn, None, True) + class Endpoint(object): - def __init__(self, turn, assertion, - on_add=None, on_del=None, on_msg=None): - self.assertion = None + def __init__(self, facet, assertion, on_add=None, on_del=None, on_msg=None): + self.facet = facet + self.assertion = assertion self.id = None self.on_add = on_add or _ignore self.on_del = on_del or _ignore self.on_msg = on_msg or _ignore self.cache = set() - self.set(turn, assertion) + + @property + def log(self): + return self.facet.log + + def _start(self, turn): + self.set(turn, self.assertion) def set(self, turn, new_assertion, on_transition=None): if self.id is not None: @@ -98,12 +281,14 @@ class Endpoint(object): self.assertion)) else: self.cache.add(captures) - self.on_add(turn, *captures) + with self.facet.actor.guard(turn): + self.on_add(turn, *captures) def _del(self, turn, captures): if captures in self.cache: self.cache.discard(captures) - self.on_del(turn, *captures) + with self.facet.actor.guard(turn): + self.on_del(turn, *captures) else: log.error('Server error: nonexistent captures %r removed from endpoint %r %r' % ( captures, @@ -111,7 +296,8 @@ class Endpoint(object): self.assertion)) def _msg(self, turn, captures): - self.on_msg(turn, *captures) + with self.facet.actor.guard(turn): + self.on_msg(turn, *captures) class DummyEndpoint(object): def _add(self, turn, captures): pass @@ -125,6 +311,10 @@ class Connection(object): self.scope = scope self.endpoints = {} self.end_callbacks = {} + self._is_connected = False + + def is_connected(self): + return self._is_connected def _each_endpoint(self): return list(self.endpoints.values()) @@ -132,6 +322,9 @@ class Connection(object): def turn(self): return Turn(self) + def actor(self, name=None): + return Actor(self, name=name) + def destroy(self): with self.turn() as t: for ep in self._each_endpoint(): @@ -155,6 +348,7 @@ class Connection(object): turn._extend(protocol.Assert(ep.id, ep.assertion)) def _on_disconnected(self): + self._is_connected = False with self.turn() as t: for ep in self._each_endpoint(): ep._reset(t) @@ -166,6 +360,7 @@ class Connection(object): with self.turn() as t: for ep in self._each_endpoint(): self._map_endpoint(t, ep) + self._is_connected = True def _lookup(self, endpointId): return self.endpoints.get(endpointId, _dummy_endpoint) @@ -209,19 +404,24 @@ class Connection(object): async def reconnecting_main(self, loop, on_connected=None, on_disconnected=None): should_run = True while should_run: - did_connect = await self.main(loop, on_connected=on_connected) - if on_disconnected: - should_run = await on_disconnected(did_connect) - else: - if did_connect: - pass # Reconnect immediately - else: - await asyncio.sleep(2) + did_connect = await self.main(loop, on_connected=(on_connected or _default_on_connected)) + should_run = await (on_disconnected or _default_on_disconnected)(did_connect) @classmethod def from_url(cls, s): return url.connection_from_url(s) +async def _default_on_connected(): + log.info('Connected') + +async def _default_on_disconnected(did_connect): + if did_connect: + # Reconnect immediately + log.info('Disconnected') + else: + await asyncio.sleep(2) + return True + class _StreamConnection(Connection, asyncio.Protocol): def __init__(self, scope): super().__init__(scope)