diff --git a/syndicate/mini/core.py b/syndicate/mini/core.py index 23cd98d..44e9f84 100644 --- a/syndicate/mini/core.py +++ b/syndicate/mini/core.py @@ -25,13 +25,15 @@ def _ignore(*args, **kwargs): pass class Endpoint(object): - def __init__(self, conn, assertion, id=None, on_add=None, on_del=None, on_msg=None): + def __init__(self, conn, assertion, id=None, + on_add=None, on_del=None, on_msg=None, on_end=None): self.conn = conn self.assertion = assertion self.id = id or uuid('sub' if Observe.isClassOf(assertion) else 'pub') self.on_add = on_add or _ignore self.on_del = on_del or _ignore self.on_msg = on_msg or _ignore + self.on_end = on_end or _ignore self.cache = set() self.conn._update_endpoint(self) @@ -77,10 +79,14 @@ class Endpoint(object): def _msg(self, captures): self.on_msg(*captures) + def _end(self): + self.on_end() + class DummyEndpoint(object): def _add(self, captures): pass def _del(self, captures): pass def _msg(self, captures): pass + def _end(self): pass _dummy_endpoint = DummyEndpoint() @@ -149,6 +155,7 @@ class Connection(object): if protocol.Add.isClassOf(v): return self._push_work(lambda: self._lookup(v[0])._add(v[1])) if protocol.Del.isClassOf(v): return self._push_work(lambda: self._lookup(v[0])._del(v[1])) if protocol.Msg.isClassOf(v): return self._push_work(lambda: self._lookup(v[0])._msg(v[1])) + if protocol.End.isClassOf(v): return self._push_work(lambda: self._lookup(v[0])._end()) if protocol.Commit.isClassOf(v): return self._commit_work() if protocol.Err.isClassOf(v): return self._on_error(v[0]) if protocol.Ping.isClassOf(v): self._send(self._encode(protocol.Pong())) diff --git a/syndicate/mini/protocol.py b/syndicate/mini/protocol.py index 2a2cc9a..fc8a4c7 100644 --- a/syndicate/mini/protocol.py +++ b/syndicate/mini/protocol.py @@ -17,6 +17,7 @@ Message = Record.makeConstructor('Message', 'body') Add = Record.makeConstructor('Add', 'endpointName captures') Del = Record.makeConstructor('Del', 'endpointName captures') Msg = Record.makeConstructor('Msg', 'endpointName captures') +End = Record.makeConstructor('End', 'endpointName') Err = Record.makeConstructor('Err', 'detail') ## Bidirectional