Initial commit

This commit is contained in:
Tony Garnock-Jones 2018-11-20 19:45:27 +00:00
commit e6cdf8127f
10 changed files with 371 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
*.pyc
.coverage
htmlcov/
build/
dist/
*.egg-info/
pyenv/
/preserves

16
Makefile Normal file
View File

@ -0,0 +1,16 @@
test:
@echo no tests yet, lol
clean:
rm -rf htmlcov
find . -iname __pycache__ -o -iname '*.pyc' | xargs rm -rf
rm -f .coverage
rm -rf preserves.egg-info build dist
# sudo apt install python3-wheel twine
publish: clean
python3 setup.py sdist bdist_wheel
twine upload dist/*
veryclean: clean
rm -rf pyenv

60
chat.py Normal file
View File

@ -0,0 +1,60 @@
import sys
import asyncio
import random
import threading
import syndicate.mini.core as S
Present = S.Record.makeConstructor('Present', 'who')
Says = S.Record.makeConstructor('Says', 'who what')
if len(sys.argv) == 3:
conn = S.TcpConnection(sys.argv[1], int(sys.argv[2]))
elif len(sys.argv) == 2:
conn = S.WebsocketConnection(sys.argv[1])
elif len(sys.argv) == 1:
conn = S.WebsocketConnection('ws://localhost:8000/broker')
else:
sys.stderr.write(b'Usage: chat.py [ HOST PORT | WEBSOCKETURL ]\n')
sys.exit(1)
## Courtesy of http://listofrandomnames.com/ :-)
names = ['Daria', 'Kendra', 'Danny', 'Rufus', 'Diana', 'Arnetta', 'Dominick', 'Melonie', 'Regan',
'Glenda', 'Janet', 'Luci', 'Ronnie', 'Vita', 'Amie', 'Stefani', 'Catherine', 'Grady',
'Terrance', 'Rey', 'Fay', 'Shantae', 'Carlota', 'Judi', 'Crissy', 'Tasha', 'Jordan',
'Rolande', 'Buster', 'Diamond', 'Dallas', 'Lissa', 'Yang', 'Charlena', 'Brooke', 'Haydee',
'Griselda', 'Kasie', 'Clara', 'Claudie', 'Darell', 'Emery', 'Barbera', 'Chong', 'Karin',
'Veronica', 'Karly', 'Shaunda', 'Nigel', 'Cleo']
me = random.choice(names) + '_' + str(random.randint(10, 1000))
S.Endpoint(conn, Present(me))
S.Endpoint(conn, S.Observe(Present(S.CAPTURE)),
on_add=lambda who: print(who, 'joined'),
on_del=lambda who: print(who, 'left'))
S.Endpoint(conn, S.Observe(Says(S.CAPTURE, S.CAPTURE)),
on_msg=lambda who, what: print(who, 'said', repr(what)))
async def reconnect(loop):
while True:
await conn.main(loop)
if not conn: break
print('-'*50, 'Disconnected')
await asyncio.sleep(2)
if not conn: break
def accept_input(loop):
global conn
while True:
line = sys.stdin.readline()
if not line:
conn.destroy()
conn = None
break
conn.send(Says(me, line.strip()))
loop = asyncio.get_event_loop()
threading.Thread(target=lambda: accept_input(loop)).start()
loop.run_until_complete(reconnect(loop))
loop.close()

3
environment Normal file
View File

@ -0,0 +1,3 @@
[ -d pyenv ] || virtualenv -p python3 pyenv
. pyenv/bin/activate
pip install -r requirements.txt

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
websockets

24
setup.py Normal file
View File

@ -0,0 +1,24 @@
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
setup(
name="mini-syndicate",
version="0.0.0",
author="Tony Garnock-Jones",
author_email="tonyg@leastfixedpoint.com",
license="GNU General Public License v3 or later (GPLv3+)",
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Topic :: Software Development :: Libraries :: Python Modules",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Programming Language :: Python :: 3",
],
packages=["syndicate"],
url="https://github.com/syndicate-lang/mini-syndicate-py",
description="Syndicate-like library for integrating Python with the Syndicate ecosystem",
install_requires=['websockets', 'preserves'],
python_requires=">=3.6, <4",
)

1
syndicate/__init__.py Normal file
View File

@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)

View File

@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)

225
syndicate/mini/core.py Normal file
View File

@ -0,0 +1,225 @@
import asyncio
import secrets
import logging
import websockets
log = logging.getLogger(__name__)
import syndicate.mini.protocol as protocol
from syndicate.mini.protocol import Capture, Discard, Observe
CAPTURE = Capture(Discard())
from preserves import *
_instance_id = secrets.token_urlsafe(8)
_uuid_counter = 0
def uuid(prefix='__@syndicate'):
global _uuid_counter
c = _uuid_counter
_uuid_counter = c + 1
return prefix + '_' + _instance_id + '_' + str(c)
def _ignore(*args, **kwargs):
pass
class Endpoint(object):
def __init__(self, conn, assertion, id=None, on_add=None, on_del=None, on_msg=None):
self.conn = conn
self.assertion = assertion
self.id = id or uuid('sub' if Observe.isClassOf(assertion) else 'pub')
self.on_add = on_add or _ignore
self.on_del = on_del or _ignore
self.on_msg = on_msg or _ignore
self.cache = set()
self.conn._update_endpoint(self)
def set(self, new_assertion):
self.assertion = new_assertion
if self.conn:
self.conn._update_endpoint(self)
def send(self, message):
'''Shortcut to Connection.send.'''
if self.conn:
self.conn.send(message)
def destroy(self):
if self.conn:
self.conn._clear_endpoint(self)
self.conn = None
def _reset(self):
for captures in set(self.cache):
self._del(captures)
def _add(self, captures):
if captures not in self.cache:
self.cache.add(captures)
self.on_add(*captures)
def _del(self, captures):
if captures in self.cache:
self.cache.discard(captures)
self.on_del(*captures)
def _msg(self, captures):
self.on_msg(*captures)
class DummyEndpoint(object):
def _add(self, captures): pass
def _del(self, captures): pass
def _msg(self, captures): pass
_dummy_endpoint = DummyEndpoint()
class Connection(object):
def __init__(self):
self.endpoints = {}
def _each_endpoint(self):
return list(self.endpoints.values())
def destroy(self):
for ep in self._each_endpoint():
ep.destroy()
self._disconnect()
def _encode(self, event):
e = protocol.Encoder()
e.append(event)
return e.contents()
def _update_endpoint(self, ep):
self.endpoints[ep.id] = ep
self._send(self._encode(protocol.Assert(ep.id, ep.assertion)))
def _clear_endpoint(self, ep):
if ep.id in self.endpoints:
del self.endpoints[ep.id]
self._send(self._encode(protocol.Clear(ep.id)))
def send(self, message):
self._send(self._encode(protocol.Message(message)))
def _on_disconnected(self):
for ep in self._each_endpoint():
ep._reset()
self._disconnect()
def _on_connected(self):
for ep in self._each_endpoint():
self._update_endpoint(ep)
def _lookup(self, endpointId):
return self.endpoints.get(endpointId, _dummy_endpoint)
def _on_event(self, v):
if protocol.Add.isClassOf(v): return self._lookup(v[0])._add(v[1])
if protocol.Del.isClassOf(v): return self._lookup(v[0])._del(v[1])
if protocol.Msg.isClassOf(v): return self._lookup(v[0])._msg(v[1])
def _send(self, bs):
raise Exception('subclassresponsibility')
def _disconnect(self):
raise Exception('subclassresponsibility')
class TcpConnection(Connection, asyncio.Protocol):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
self.decoder = None
self.stop_signal = None
self.transport = None
def connection_lost(self, exc):
self._on_disconnected()
def connection_made(self, transport):
self.transport = transport
self._on_connected()
def data_received(self, chunk):
self.decoder.extend(chunk)
while True:
v = self.decoder.try_next()
if v is None: break
self._on_event(v)
def _send(self, bs):
if self.transport:
self.transport.write(bs)
def _disconnect(self):
if self.stop_signal:
self.stop_signal.set_result(True)
async def main(self, loop):
if self.transport is not None:
raise Exception('Cannot run connection twice!')
self.decoder = protocol.Decoder()
self.stop_signal = loop.create_future()
try:
_transport, _protocol = await loop.create_connection(
lambda: self,
self.host,
self.port)
except OSError:
log.error('Could not connect to broker', exc_info=True)
return False
try:
await self.stop_signal
return True
finally:
self.transport.close()
self.transport = None
self.stop_signal = None
self.decoder = None
class WebsocketConnection(Connection):
def __init__(self, url):
super().__init__()
self.url = url
self.loop = None
self.ws = None
def _send(self, bs):
if self.ws:
self.loop.call_soon_threadsafe(lambda: self.loop.create_task(self.ws.send(bs)))
def _disconnect(self):
if self.ws:
self.loop.call_soon_threadsafe(lambda: self.loop.create_task(self.ws.close()))
async def main(self, loop):
if self.ws is not None:
raise Exception('Cannot run connection twice!')
self.loop = loop
try:
async with websockets.connect(self.url) as ws:
self.ws = ws
self._on_connected()
try:
while True:
chunk = await loop.create_task(ws.recv())
self._on_event(protocol.Decoder(chunk).next())
except websockets.exceptions.ConnectionClosed:
pass
except OSError:
log.error('Could not connect to broker', exc_info=True)
return False
finally:
self._on_disconnected()
if self.ws:
await self.ws.close()
self.loop = None
self.ws = None
return True

View File

@ -0,0 +1,32 @@
import preserves
from preserves import Record, Symbol
## Client -> Broker
Assert = Record.makeConstructor('Assert', 'endpointName assertion')
Clear = Record.makeConstructor('Clear', 'endpointName')
Message = Record.makeConstructor('Message', 'body')
## Broker -> Client
Add = Record.makeConstructor('Add', 'endpointName captures')
Del = Record.makeConstructor('Del', 'endpointName captures')
Msg = Record.makeConstructor('Msg', 'endpointName captures')
## Standard Syndicate constructors
Observe = Record.makeConstructor('observe', 'specification')
Capture = Record.makeConstructor('capture', 'specification')
Discard = Record.makeConstructor('discard', '')
class Decoder(preserves.Decoder):
def __init__(self, *args, **kwargs):
super(Decoder, self).__init__(*args, **kwargs)
_init_shortforms(self)
class Encoder(preserves.Encoder):
def __init__(self, *args, **kwargs):
super(Encoder, self).__init__(*args, **kwargs)
_init_shortforms(self)
def _init_shortforms(c):
c.set_shortform(0, Discard.constructorInfo.key)
c.set_shortform(1, Capture.constructorInfo.key)
c.set_shortform(2, Observe.constructorInfo.key)