commit e6cdf8127f4ebe7c9258124f7bd46a69ff3f126b Author: Tony Garnock-Jones Date: Tue Nov 20 19:45:27 2018 +0000 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cea2e7b --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +*.pyc +.coverage +htmlcov/ +build/ +dist/ +*.egg-info/ +pyenv/ +/preserves \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..07657f0 --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +test: + @echo no tests yet, lol + +clean: + rm -rf htmlcov + find . -iname __pycache__ -o -iname '*.pyc' | xargs rm -rf + rm -f .coverage + rm -rf preserves.egg-info build dist + +# sudo apt install python3-wheel twine +publish: clean + python3 setup.py sdist bdist_wheel + twine upload dist/* + +veryclean: clean + rm -rf pyenv diff --git a/chat.py b/chat.py new file mode 100644 index 0000000..052569f --- /dev/null +++ b/chat.py @@ -0,0 +1,60 @@ +import sys +import asyncio +import random +import threading +import syndicate.mini.core as S + +Present = S.Record.makeConstructor('Present', 'who') +Says = S.Record.makeConstructor('Says', 'who what') + +if len(sys.argv) == 3: + conn = S.TcpConnection(sys.argv[1], int(sys.argv[2])) +elif len(sys.argv) == 2: + conn = S.WebsocketConnection(sys.argv[1]) +elif len(sys.argv) == 1: + conn = S.WebsocketConnection('ws://localhost:8000/broker') +else: + sys.stderr.write(b'Usage: chat.py [ HOST PORT | WEBSOCKETURL ]\n') + sys.exit(1) + +## Courtesy of http://listofrandomnames.com/ :-) +names = ['Daria', 'Kendra', 'Danny', 'Rufus', 'Diana', 'Arnetta', 'Dominick', 'Melonie', 'Regan', + 'Glenda', 'Janet', 'Luci', 'Ronnie', 'Vita', 'Amie', 'Stefani', 'Catherine', 'Grady', + 'Terrance', 'Rey', 'Fay', 'Shantae', 'Carlota', 'Judi', 'Crissy', 'Tasha', 'Jordan', + 'Rolande', 'Buster', 'Diamond', 'Dallas', 'Lissa', 'Yang', 'Charlena', 'Brooke', 'Haydee', + 'Griselda', 'Kasie', 'Clara', 'Claudie', 'Darell', 'Emery', 'Barbera', 'Chong', 'Karin', + 'Veronica', 'Karly', 'Shaunda', 'Nigel', 'Cleo'] + +me = random.choice(names) + '_' + str(random.randint(10, 1000)) + +S.Endpoint(conn, Present(me)) + +S.Endpoint(conn, S.Observe(Present(S.CAPTURE)), + on_add=lambda who: print(who, 'joined'), + on_del=lambda who: print(who, 'left')) + +S.Endpoint(conn, S.Observe(Says(S.CAPTURE, S.CAPTURE)), + on_msg=lambda who, what: print(who, 'said', repr(what))) + +async def reconnect(loop): + while True: + await conn.main(loop) + if not conn: break + print('-'*50, 'Disconnected') + await asyncio.sleep(2) + if not conn: break + +def accept_input(loop): + global conn + while True: + line = sys.stdin.readline() + if not line: + conn.destroy() + conn = None + break + conn.send(Says(me, line.strip())) + +loop = asyncio.get_event_loop() +threading.Thread(target=lambda: accept_input(loop)).start() +loop.run_until_complete(reconnect(loop)) +loop.close() diff --git a/environment b/environment new file mode 100644 index 0000000..b38208a --- /dev/null +++ b/environment @@ -0,0 +1,3 @@ +[ -d pyenv ] || virtualenv -p python3 pyenv +. pyenv/bin/activate +pip install -r requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..14774b4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +websockets diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..84f5571 --- /dev/null +++ b/setup.py @@ -0,0 +1,24 @@ +try: + from setuptools import setup +except ImportError: + from distutils.core import setup + +setup( + name="mini-syndicate", + version="0.0.0", + author="Tony Garnock-Jones", + author_email="tonyg@leastfixedpoint.com", + license="GNU General Public License v3 or later (GPLv3+)", + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries :: Python Modules", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Programming Language :: Python :: 3", + ], + packages=["syndicate"], + url="https://github.com/syndicate-lang/mini-syndicate-py", + description="Syndicate-like library for integrating Python with the Syndicate ecosystem", + install_requires=['websockets', 'preserves'], + python_requires=">=3.6, <4", +) diff --git a/syndicate/__init__.py b/syndicate/__init__.py new file mode 100644 index 0000000..69e3be5 --- /dev/null +++ b/syndicate/__init__.py @@ -0,0 +1 @@ +__path__ = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/syndicate/mini/__init__.py b/syndicate/mini/__init__.py new file mode 100644 index 0000000..69e3be5 --- /dev/null +++ b/syndicate/mini/__init__.py @@ -0,0 +1 @@ +__path__ = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/syndicate/mini/core.py b/syndicate/mini/core.py new file mode 100644 index 0000000..0bbc743 --- /dev/null +++ b/syndicate/mini/core.py @@ -0,0 +1,225 @@ +import asyncio +import secrets +import logging +import websockets + +log = logging.getLogger(__name__) + +import syndicate.mini.protocol as protocol + +from syndicate.mini.protocol import Capture, Discard, Observe +CAPTURE = Capture(Discard()) + +from preserves import * + +_instance_id = secrets.token_urlsafe(8) +_uuid_counter = 0 + +def uuid(prefix='__@syndicate'): + global _uuid_counter + c = _uuid_counter + _uuid_counter = c + 1 + return prefix + '_' + _instance_id + '_' + str(c) + +def _ignore(*args, **kwargs): + pass + +class Endpoint(object): + def __init__(self, conn, assertion, id=None, on_add=None, on_del=None, on_msg=None): + self.conn = conn + self.assertion = assertion + self.id = id or uuid('sub' if Observe.isClassOf(assertion) else 'pub') + self.on_add = on_add or _ignore + self.on_del = on_del or _ignore + self.on_msg = on_msg or _ignore + self.cache = set() + self.conn._update_endpoint(self) + + def set(self, new_assertion): + self.assertion = new_assertion + if self.conn: + self.conn._update_endpoint(self) + + def send(self, message): + '''Shortcut to Connection.send.''' + if self.conn: + self.conn.send(message) + + def destroy(self): + if self.conn: + self.conn._clear_endpoint(self) + self.conn = None + + def _reset(self): + for captures in set(self.cache): + self._del(captures) + + def _add(self, captures): + if captures not in self.cache: + self.cache.add(captures) + self.on_add(*captures) + + def _del(self, captures): + if captures in self.cache: + self.cache.discard(captures) + self.on_del(*captures) + + def _msg(self, captures): + self.on_msg(*captures) + +class DummyEndpoint(object): + def _add(self, captures): pass + def _del(self, captures): pass + def _msg(self, captures): pass + +_dummy_endpoint = DummyEndpoint() + +class Connection(object): + def __init__(self): + self.endpoints = {} + + def _each_endpoint(self): + return list(self.endpoints.values()) + + def destroy(self): + for ep in self._each_endpoint(): + ep.destroy() + self._disconnect() + + def _encode(self, event): + e = protocol.Encoder() + e.append(event) + return e.contents() + + def _update_endpoint(self, ep): + self.endpoints[ep.id] = ep + self._send(self._encode(protocol.Assert(ep.id, ep.assertion))) + + def _clear_endpoint(self, ep): + if ep.id in self.endpoints: + del self.endpoints[ep.id] + self._send(self._encode(protocol.Clear(ep.id))) + + def send(self, message): + self._send(self._encode(protocol.Message(message))) + + def _on_disconnected(self): + for ep in self._each_endpoint(): + ep._reset() + self._disconnect() + + def _on_connected(self): + for ep in self._each_endpoint(): + self._update_endpoint(ep) + + def _lookup(self, endpointId): + return self.endpoints.get(endpointId, _dummy_endpoint) + + def _on_event(self, v): + if protocol.Add.isClassOf(v): return self._lookup(v[0])._add(v[1]) + if protocol.Del.isClassOf(v): return self._lookup(v[0])._del(v[1]) + if protocol.Msg.isClassOf(v): return self._lookup(v[0])._msg(v[1]) + + def _send(self, bs): + raise Exception('subclassresponsibility') + + def _disconnect(self): + raise Exception('subclassresponsibility') + +class TcpConnection(Connection, asyncio.Protocol): + def __init__(self, host, port): + super().__init__() + self.host = host + self.port = port + self.decoder = None + self.stop_signal = None + self.transport = None + + def connection_lost(self, exc): + self._on_disconnected() + + def connection_made(self, transport): + self.transport = transport + self._on_connected() + + def data_received(self, chunk): + self.decoder.extend(chunk) + while True: + v = self.decoder.try_next() + if v is None: break + self._on_event(v) + + def _send(self, bs): + if self.transport: + self.transport.write(bs) + + def _disconnect(self): + if self.stop_signal: + self.stop_signal.set_result(True) + + async def main(self, loop): + if self.transport is not None: + raise Exception('Cannot run connection twice!') + + self.decoder = protocol.Decoder() + self.stop_signal = loop.create_future() + try: + _transport, _protocol = await loop.create_connection( + lambda: self, + self.host, + self.port) + except OSError: + log.error('Could not connect to broker', exc_info=True) + return False + + try: + await self.stop_signal + return True + finally: + self.transport.close() + self.transport = None + self.stop_signal = None + self.decoder = None + +class WebsocketConnection(Connection): + def __init__(self, url): + super().__init__() + self.url = url + self.loop = None + self.ws = None + + def _send(self, bs): + if self.ws: + self.loop.call_soon_threadsafe(lambda: self.loop.create_task(self.ws.send(bs))) + + def _disconnect(self): + if self.ws: + self.loop.call_soon_threadsafe(lambda: self.loop.create_task(self.ws.close())) + + async def main(self, loop): + if self.ws is not None: + raise Exception('Cannot run connection twice!') + + self.loop = loop + + try: + async with websockets.connect(self.url) as ws: + self.ws = ws + self._on_connected() + try: + while True: + chunk = await loop.create_task(ws.recv()) + self._on_event(protocol.Decoder(chunk).next()) + except websockets.exceptions.ConnectionClosed: + pass + except OSError: + log.error('Could not connect to broker', exc_info=True) + return False + finally: + self._on_disconnected() + + if self.ws: + await self.ws.close() + self.loop = None + self.ws = None + return True diff --git a/syndicate/mini/protocol.py b/syndicate/mini/protocol.py new file mode 100644 index 0000000..456d537 --- /dev/null +++ b/syndicate/mini/protocol.py @@ -0,0 +1,32 @@ +import preserves +from preserves import Record, Symbol + +## Client -> Broker +Assert = Record.makeConstructor('Assert', 'endpointName assertion') +Clear = Record.makeConstructor('Clear', 'endpointName') +Message = Record.makeConstructor('Message', 'body') + +## Broker -> Client +Add = Record.makeConstructor('Add', 'endpointName captures') +Del = Record.makeConstructor('Del', 'endpointName captures') +Msg = Record.makeConstructor('Msg', 'endpointName captures') + +## Standard Syndicate constructors +Observe = Record.makeConstructor('observe', 'specification') +Capture = Record.makeConstructor('capture', 'specification') +Discard = Record.makeConstructor('discard', '') + +class Decoder(preserves.Decoder): + def __init__(self, *args, **kwargs): + super(Decoder, self).__init__(*args, **kwargs) + _init_shortforms(self) + +class Encoder(preserves.Encoder): + def __init__(self, *args, **kwargs): + super(Encoder, self).__init__(*args, **kwargs) + _init_shortforms(self) + +def _init_shortforms(c): + c.set_shortform(0, Discard.constructorInfo.key) + c.set_shortform(1, Capture.constructorInfo.key) + c.set_shortform(2, Observe.constructorInfo.key)