hop-2012/experiments/python/hop/relay.py

89 lines
3.1 KiB
Python

## Copyright 2010, 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
##
## This file is part of Hop.
##
## Hop is free software: you can redistribute it and/or modify it
## under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Hop is distributed in the hope that it will be useful, but WITHOUT
## ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
## or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
## License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Hop. If not, see <http://www.gnu.org/licenses/>.
import threading
import socket
import namespace
import sexp
from dispatch import HopRelayMixin
class HopRelay(HopRelayMixin):
def __init__(self, in_ch, out_ch, ns = None, peer_address = None):
self.in_ch = in_ch
self.out_ch = out_ch
self.peer_address = peer_address
self.thread = threading.Thread(target = self.relay_main)
self.namespace = ns if ns else namespace.default_namespace
self.lock = threading.Lock()
self.thread.start()
def write(self, x):
try:
self.lock.acquire()
sexp.write_sexp(self.out_ch, x)
self.out_ch.flush()
finally:
self.lock.release()
def error(self, message, details):
self.write(['error', message, details])
def handle_hop(self, msg):
self.write(msg)
def inbound_hop_post(self, name, body, token):
self.namespace.send(name, body)
def inbound_hop_subscribe(self, filter, sink, name, replysink, replyname):
if self.namespace.bind(filter, self):
self.namespace.post(replysink, replyname, ['subscribe-ok', filter], '')
def inbound_hop_unsubscribe(self, token):
self.namespace.unbind(token)
def relay_main(self):
self.write(['subscribe', self.namespace.nodename, '', '', '', ''])
try:
while True:
try:
m = sexp.read_sexp(self.in_ch)
except sexp.SyntaxError, e:
self.error('Syntax error', ["http://people.csail.mit.edu/rivest/Sexp.txt"])
return
self.inbound_hop(m)
except EOFError:
pass
finally:
self.in_ch.close()
self.out_ch.close()
class TcpRelayServer:
def __init__(self, host = '0.0.0.0', port = 5671):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((host, port))
self.server_socket.listen(4)
self.thread = threading.Thread(target = self.listen_main)
self.thread.start()
def listen_main(self):
while True:
conn, addr = self.server_socket.accept()
conn.send(sexp.format(['hop']))
HopRelay(conn.makefile(mode = 'r'), conn.makefile(mode = 'w'), peer_address = addr)