From 03a8d91ed83694a4ff220b85d5e305b78f4ca0d9 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 31 May 2012 23:31:51 +0100 Subject: [PATCH] More python hops --- experiments/python/Makefile | 4 ++ experiments/python/hop/__init__.py | 9 +++ experiments/python/hop/dispatch.py | 50 +++++++++++++++ experiments/python/hop/factory.py | 43 +++++++++++++ experiments/python/hop/namespace.py | 54 ++++++++++++++++ experiments/python/hop/queue.py | 58 +++++++++++++++++ experiments/python/hop/relay.py | 88 ++++++++++++++++++++++++++ experiments/python/hop/sexp.py | 18 +++++- experiments/python/hop/subscription.py | 31 +++++++++ 9 files changed, 352 insertions(+), 3 deletions(-) create mode 100644 experiments/python/Makefile create mode 100644 experiments/python/hop/dispatch.py create mode 100644 experiments/python/hop/factory.py create mode 100644 experiments/python/hop/namespace.py create mode 100644 experiments/python/hop/queue.py create mode 100644 experiments/python/hop/relay.py create mode 100644 experiments/python/hop/subscription.py diff --git a/experiments/python/Makefile b/experiments/python/Makefile new file mode 100644 index 0000000..ce41bd9 --- /dev/null +++ b/experiments/python/Makefile @@ -0,0 +1,4 @@ +all: + +clean: + rm -f hop/*.pyc diff --git a/experiments/python/hop/__init__.py b/experiments/python/hop/__init__.py index e69de29..2569695 100644 --- a/experiments/python/hop/__init__.py +++ b/experiments/python/hop/__init__.py @@ -0,0 +1,9 @@ +from dispatch import * +from namespace import * +from relay import * +import factory +import queue + +import logging +logging.basicConfig(level = logging.DEBUG) +TcpRelayServer() diff --git a/experiments/python/hop/dispatch.py b/experiments/python/hop/dispatch.py new file mode 100644 index 0000000..7b9fde4 --- /dev/null +++ b/experiments/python/hop/dispatch.py @@ -0,0 +1,50 @@ +## Copyright 2010, 2011, 2012 Tony Garnock-Jones . +## +## This file is part of Hop. +## +## Hop is free software: you can redistribute it and/or modify it +## under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## Hop is distributed in the hope that it will be useful, but WITHOUT +## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +## License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Hop. If not, see . + +import logging + +class HopNode: + def handle_hop(self, msg): + raw_dispatch(self, 'hop_', msg) + + def error(self, message, details): + logging.error('%r: %s: %r' % (self, message, details)) + +class HopRelayMixin: + def inbound_hop(self, msg): + raw_dispatch(self, 'inbound_hop_', msg) + +def raw_dispatch(self, prefix, msg): + if type(msg) is not list or len(msg) < 1: + self.error('Invalid message', []) + return + + raw_selector = msg[0] + selector = prefix + raw_selector + args = msg[1:] + handler = getattr(self, selector, None) + + if not handler: + self.error('Unsupported message or arity', [raw_selector]) + return + + try: + handler(*args) + except Exception: + import traceback + traceback.print_exc() + self.error('Exception handling message', [raw_selector]) diff --git a/experiments/python/hop/factory.py b/experiments/python/hop/factory.py new file mode 100644 index 0000000..8d60d3f --- /dev/null +++ b/experiments/python/hop/factory.py @@ -0,0 +1,43 @@ +## Copyright 2010, 2011, 2012 Tony Garnock-Jones . +## +## This file is part of Hop. +## +## Hop is free software: you can redistribute it and/or modify it +## under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## Hop is distributed in the hope that it will be useful, but WITHOUT +## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +## License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Hop. If not, see . + +import dispatch +import namespace + +class Factory(dispatch.HopNode): + def __init__(self): + self.classes = {} + + def register(self, classname, classval): + self.classes[classname] = classval + + def hop_create(self, classname, arg, replysink, replyname): + if classname not in self.classes: + namespace.post(replysink, replyname, + ['create-failed', ['factory', 'class-not-found']], '') + return + + try: + node = self.classes[classname](arg) + namespace.post(replysink, replyname, ['create-ok', node.node_info()], '') + except Exception, e: + import traceback + traceback.print_exc() + namespace.post(replysink, replyname, ['create-failed', ['constructor', str(e)]], '') + +default_factory = Factory() +namespace.bind('factory', default_factory) diff --git a/experiments/python/hop/namespace.py b/experiments/python/hop/namespace.py new file mode 100644 index 0000000..2c6ea56 --- /dev/null +++ b/experiments/python/hop/namespace.py @@ -0,0 +1,54 @@ +## Copyright 2010, 2011, 2012 Tony Garnock-Jones . +## +## This file is part of Hop. +## +## Hop is free software: you can redistribute it and/or modify it +## under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## Hop is distributed in the hope that it will be useful, but WITHOUT +## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +## License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Hop. If not, see . + +import uuid +import logging + +class Namespace: + def __init__(self): + self.nodename = str(uuid.uuid4()) + self.bindings = {} + + def bind(self, name, node): + if name in self.bindings: + return False + self.bindings[name] = node + return True + + def unbind(self, name): + del self.bindings[name] + + def send(self, name, msg): + ## logging.debug('SENDING TO %s: %r' % (name, msg)) + if name: + if name in self.bindings: + self.bindings[name].handle_hop(msg) + return True + else: + logging.warning("Send to unbound name: %s" % (name,)) + return False + else: + return False + + def post(self, sink, name, body, token): + return self.send(sink, ['post', name, body, token]) + +default_namespace = Namespace() +bind = default_namespace.bind +unbind = default_namespace.unbind +send = default_namespace.send +post = default_namespace.post diff --git a/experiments/python/hop/queue.py b/experiments/python/hop/queue.py new file mode 100644 index 0000000..5519a62 --- /dev/null +++ b/experiments/python/hop/queue.py @@ -0,0 +1,58 @@ +## Copyright 2010, 2011, 2012 Tony Garnock-Jones . +## +## This file is part of Hop. +## +## Hop is free software: you can redistribute it and/or modify it +## under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## Hop is distributed in the hope that it will be useful, but WITHOUT +## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +## License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Hop. If not, see . + +import threading +import Queue as builtin_Queue + +import namespace +import factory +import dispatch +import subscription + +Q = builtin_Queue.Queue + +class Queue(dispatch.HopNode): + def __init__(self, arg): + self.backlog = Q() + self.waiters = Q() + self.thread = threading.Thread(target = self.queue_main) + self.thread.start() + if not namespace.bind(arg[0], self): + raise Exception("duplicate name") + + def node_info(self): + return [] + + def hop_subscribe(self, filter, sink, name, replysink, replyname): + sub = subscription.Subscription(filter, sink, name) + self.waiters.put(sub) + namespace.post(replysink, replyname, ['subscribe-ok', sub.uuid], '') + + def hop_post(self, name, body, token): + self.backlog.put(body) + + def queue_main(self): + while True: + body = self.backlog.get() + while True: + waiter = self.waiters.get() + if not waiter.deliver(body): + continue + self.waiters.put(waiter) + break + +factory.default_factory.register('queue', Queue) diff --git a/experiments/python/hop/relay.py b/experiments/python/hop/relay.py new file mode 100644 index 0000000..6108afa --- /dev/null +++ b/experiments/python/hop/relay.py @@ -0,0 +1,88 @@ +## Copyright 2010, 2011, 2012 Tony Garnock-Jones . +## +## This file is part of Hop. +## +## Hop is free software: you can redistribute it and/or modify it +## under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## Hop is distributed in the hope that it will be useful, but WITHOUT +## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +## License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Hop. If not, see . + +import threading +import socket + +import namespace +import sexp +from dispatch import HopRelayMixin + +class HopRelay(HopRelayMixin): + def __init__(self, in_ch, out_ch, ns = None, peer_address = None): + self.in_ch = in_ch + self.out_ch = out_ch + self.peer_address = peer_address + self.thread = threading.Thread(target = self.relay_main) + self.namespace = ns if ns else namespace.default_namespace + self.lock = threading.Lock() + self.thread.start() + + def write(self, x): + try: + self.lock.acquire() + sexp.write_sexp(self.out_ch, x) + self.out_ch.flush() + finally: + self.lock.release() + + def error(self, message, details): + self.write(['error', message, details]) + + def handle_hop(self, msg): + self.write(msg) + + def inbound_hop_post(self, name, body, token): + self.namespace.send(name, body) + + def inbound_hop_subscribe(self, filter, sink, name, replysink, replyname): + if self.namespace.bind(filter, self): + self.namespace.post(replysink, replyname, ['subscribe-ok', filter], '') + + def inbound_hop_unsubscribe(self, token): + self.namespace.unbind(token) + + def relay_main(self): + self.write(['subscribe', self.namespace.nodename, '', '', '', '']) + try: + while True: + try: + m = sexp.read_sexp(self.in_ch) + except sexp.SyntaxError, e: + self.error('Syntax error', ["http://people.csail.mit.edu/rivest/Sexp.txt"]) + return + self.inbound_hop(m) + except EOFError: + pass + finally: + self.in_ch.close() + self.out_ch.close() + +class TcpRelayServer: + def __init__(self, host = '0.0.0.0', port = 5671): + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((host, port)) + self.server_socket.listen(4) + self.thread = threading.Thread(target = self.listen_main) + self.thread.start() + + def listen_main(self): + while True: + conn, addr = self.server_socket.accept() + conn.send(sexp.format(['hop'])) + HopRelay(conn.makefile(mode = 'r'), conn.makefile(mode = 'w'), peer_address = addr) diff --git a/experiments/python/hop/sexp.py b/experiments/python/hop/sexp.py index bed979f..c617ab4 100644 --- a/experiments/python/hop/sexp.py +++ b/experiments/python/hop/sexp.py @@ -42,9 +42,14 @@ def write_sexp(f, sexp): write_sexp_str(f, sexp[1]) return +def next(f, n): + chunk = f.read(n) + if len(chunk) < n: raise EOFError("reading sexp") + return chunk + def skipws(f): while True: - c = f.read(1) + c = next(f, 1) if not c.isspace(): return c def read_sexp(f): @@ -64,8 +69,8 @@ def read_sexp(f): if c.isdigit(): size = ord(c) - 48 while True: - c = f.read(1) - if c == ':': return f.read(size) + c = next(f, 1) + if c == ':': return next(f, size) if not c.isdigit(): raise SyntaxError("Illegal character in byte vector length") size = size * 10 + ord(c) - 48 if c == ')': @@ -74,3 +79,10 @@ def read_sexp(f): def parse(s): return read_sexp(StringIO.StringIO(s)) + +def format(x): + f = StringIO.StringIO() + write_sexp(f, x) + v = f.getvalue() + f.close() + return v diff --git a/experiments/python/hop/subscription.py b/experiments/python/hop/subscription.py new file mode 100644 index 0000000..3695ae1 --- /dev/null +++ b/experiments/python/hop/subscription.py @@ -0,0 +1,31 @@ +## Copyright 2010, 2011, 2012 Tony Garnock-Jones . +## +## This file is part of Hop. +## +## Hop is free software: you can redistribute it and/or modify it +## under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## Hop is distributed in the hope that it will be useful, but WITHOUT +## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +## License for more details. +## +## You should have received a copy of the GNU General Public License +## along with Hop. If not, see . + +import uuid + +import namespace + +class Subscription: + def __init__(self, filter, sink, name): + self.live = True + self.filter = filter + self.sink = sink + self.name = name + self.uuid = str(uuid.uuid4()) + + def deliver(self, body): + return self.live and namespace.post(self.sink, self.name, body, self.uuid)