Java chassis and client

This commit is contained in:
Tony Garnock-Jones 2011-01-05 21:29:28 -05:00
parent bfd7e24957
commit a3f5e89db8
16 changed files with 870 additions and 0 deletions

26
java/hop/HalfQueue.java Normal file
View File

@ -0,0 +1,26 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
*/
public class HalfQueue implements Node {
public BlockingQueue<Object> _q;
public HalfQueue() {
_q = new LinkedBlockingQueue<Object>();
}
public void handle(Object message) {
_q.add(message);
}
public BlockingQueue<Object> getQueue() {
return _q;
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
/**
*/
public class InvalidGreetingException extends IOException {
Object _greeting;
public InvalidGreetingException(Object greeting) {
_greeting = greeting;
}
}

11
java/hop/Node.java Normal file
View File

@ -0,0 +1,11 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
/**
*/
public interface Node {
void handle(Object message);
}

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.lang.ref.WeakReference;
import java.util.Hashtable;
import java.util.Map;
import java.util.UUID;
/**
*/
public class NodeContainer {
public String _name;
public Map<String, WeakReference<Node>> _directory;
public NodeContainer() {
this(UUID.randomUUID().toString());
}
public NodeContainer(String name) {
_name = name;
_directory = new Hashtable<String, WeakReference<Node>>();
}
public String getName() {
return _name;
}
public synchronized boolean bind(String name, Node n) {
if (_directory.containsKey(name))
return false;
_directory.put(name, new WeakReference<Node>(n));
return true;
}
public synchronized boolean unbind(String name) {
if (!_directory.containsKey(name))
return false;
_directory.remove(name);
return true;
}
public synchronized void unbindReferencesTo(Node n) {
for (Map.Entry<String, WeakReference<Node>> e : _directory.entrySet()) {
if (e.getValue().get() == n) {
_directory.remove(e.getKey());
}
}
}
public synchronized Node lookup(String name) {
WeakReference<Node> r = _directory.get(name);
return (r == null) ? null : r.get();
}
public boolean post(String sink, Object name, Object message, Object token) {
return send(sink, SexpMessage.post(name, message, token));
}
public boolean send(String name, Object message) {
Node n = lookup(name);
if (n == null) {
System.err.println("Warning: sending to nonexistent node " + name + "; message " + message);
return false;
}
n.handle(message);
return true;
}
}

131
java/hop/Relay.java Normal file
View File

@ -0,0 +1,131 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
*/
public class Relay implements Runnable, Node {
NodeContainer _container;
String _remoteName;
Socket _sock;
String _hostname;
int _port;
SexpReader _r;
OutputStream _output;
SexpWriter _w;
public Relay(NodeContainer container, String hostname) throws IOException, InterruptedException {
this(container, hostname, 5671);
}
public Relay(NodeContainer container, String hostname, int port) throws IOException, InterruptedException {
_container = container;
_remoteName = null;
_hostname = hostname;
_port = port;
_connect();
}
public String getRemoteName() {
return _remoteName;
}
public void _connect() throws IOException, InterruptedException {
_sock = new Socket(_hostname, _port);
_r = new SexpReader(_sock.getInputStream());
_output = _sock.getOutputStream();
_w = new SexpWriter(_output);
_login();
new Thread(this).start();
synchronized (this) {
while (_remoteName == null) {
this.wait();
}
}
}
public void _login() throws IOException {
SexpList greeting = _r.readList();
if (!greeting.getBytes(0).getDataString().equals("hop")) {
throw new InvalidGreetingException(greeting);
}
_w.write(SexpMessage.subscribe(_container.getName(), null, null, null, null));
}
public void handle(Object message) {
try {
_w.write(message);
} catch (IOException ioe) {
ioe.printStackTrace();
System.err.print("Message to be written was: ");
try {
SexpWriter.write(System.err, message);
} catch (IOException ioe2) {
ioe2.printStackTrace();
}
System.err.println();
}
}
public void run() {
SexpList m = null;
try {
while (true) {
m = _r.readList();
if (m == null) {
break;
}
System.err.println("Received: " + m);
String selector = m.getBytes(0).getDataString();
if (selector.equals("post") && m.size() == 4) {
_container.send(m.getBytes(1).getDataString(), m.get(2));
} else if (selector.equals("subscribe") && m.size() == 6) {
if (_remoteName != null) {
System.err.println("Double bind attempted");
} else {
_remoteName = m.getBytes(1).getDataString();
synchronized (this) {
this.notifyAll();
}
if (_container.bind(_remoteName, this)) {
_container.post(m.getBytes(4).getDataString(), m.get(5), SexpMessage.subscribe_ok(_remoteName), null);
} else {
System.err.println("Bind failed: " + _remoteName);
}
}
} else if (selector.equals("unsubscribe") && m.size() == 2) {
if (!m.getBytes(1).getDataString().equals(_remoteName)) {
System.err.println("Unknown unbind attempted");
} else {
if (!_container.unbind(m.getBytes(1).getDataString())) {
System.err.println("Unbind failed: " + m.get(1));
}
}
} else {
System.err.print("Unknown message: ");
SexpWriter.write(System.err, m);
System.err.println();
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
System.err.print("Most recent received message: ");
try {
SexpWriter.write(System.err, m);
} catch (IOException ioe2) {
ioe2.printStackTrace();
}
System.err.println();
}
}
}

79
java/hop/ServerApi.java Normal file
View File

@ -0,0 +1,79 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.util.UUID;
/**
*/
public class ServerApi {
public NodeContainer _container;
public String _serverName;
public String _kName;
public HalfQueue _k;
public ServerApi(NodeContainer container, String serverName) {
_container = container;
_serverName = serverName;
_kName = UUID.randomUUID().toString();
_k = new HalfQueue();
_container.bind(_kName, _k);
}
public SexpList _nextReply() throws InterruptedException, SexpSyntaxError {
Object x = _k.getQueue().take();
if (x instanceof SexpList) return (SexpList) x;
throw new SexpSyntaxError("Unexpected non-list");
}
public void post(String sink, Object name, Object message, Object token) {
_container.post(_serverName, sink, SexpMessage.post(name, message, token), null);
}
public void send(String sink, Object message) {
_container.post(_serverName, sink, message, null);
}
public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, SexpSyntaxError {
send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName));
SexpList reply = _nextReply();
assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok);
return reply.get(1);
}
public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, SexpSyntaxError {
return subscribe(source, filter, _container.getName(), name);
}
public Subscription subscribe(String source, Object filter) throws InterruptedException, SexpSyntaxError {
return new Subscription(this, source, filter);
}
public void unsubscribe(String source, Object token) {
send(source, SexpMessage.unsubscribe(token));
/* TODO: optional synchronous reply? */
}
public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, SexpSyntaxError {
send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName));
SexpList reply = _nextReply();
String selector = reply.getBytes(0).getDataString();
if (selector.equals(SexpMessage._create_ok)) return null;
assert selector.equals(SexpMessage._create_failed);
return reply.get(1);
}
public Object createQueue(String name) throws InterruptedException, SexpSyntaxError {
return create("queue", SexpList.with(name));
}
public Object createFanout(String name) throws InterruptedException, SexpSyntaxError {
return create("fanout", SexpList.with(name));
}
public Object createDirect(String name) throws InterruptedException, SexpSyntaxError {
return create("direct", SexpList.with(name));
}
}

34
java/hop/SexpBytes.java Normal file
View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
import java.io.OutputStream;
/**
*/
public class SexpBytes {
public byte[] _bytes;
public SexpBytes(byte[] bytes) {
_bytes = bytes;
}
public byte[] getData() {
return _bytes;
}
public String getDataString() {
return new String(getData());
}
public void writeTo(OutputStream stream) throws IOException {
SexpWriter.writeSimpleString(stream, _bytes);
}
public String toString() {
return SexpWriter.writeString(this);
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
import java.io.OutputStream;
/**
*/
public class SexpDisplayHint extends SexpBytes {
public byte[] _hint;
public SexpDisplayHint(byte[] hint, byte[] body) {
super(body);
_hint = hint;
}
public byte[] getHint() {
return _hint;
}
public void writeTo(OutputStream stream) throws IOException {
stream.write('[');
SexpWriter.writeSimpleString(stream, _hint);
stream.write(']');
super.writeTo(stream);
}
}

40
java/hop/SexpList.java Normal file
View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.util.ArrayList;
/**
*/
public class SexpList extends ArrayList<Object> {
public SexpBytes getBytes(int index) throws SexpSyntaxError {
Object x = get(index);
if (x != null && !(x instanceof SexpBytes)) {
throw new SexpSyntaxError("Unexpected non-bytes");
}
return (SexpBytes) get(index);
}
public SexpList getList(int index) throws SexpSyntaxError {
Object x = get(index);
if (x != null && !(x instanceof SexpList)) {
throw new SexpSyntaxError("Unexpected non-list");
}
return (SexpList) get(index);
}
public static SexpList empty() {
return new SexpList();
}
public static SexpList with(Object x) {
return empty().and(x);
}
public SexpList and(Object x) {
this.add(x);
return this;
}
}

61
java/hop/SexpMessage.java Normal file
View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
/**
*/
public class SexpMessage {
public static SexpBytes _post = new SexpBytes("post".getBytes());
public static SexpBytes _subscribe = new SexpBytes("subscribe".getBytes());
public static SexpBytes _unsubscribe = new SexpBytes("unsubscribe".getBytes());
public static SexpBytes _subscribe_ok = new SexpBytes("subscribe-ok".getBytes());
public static SexpBytes _create = new SexpBytes("create".getBytes());
public static SexpBytes _create_ok = new SexpBytes("create-ok".getBytes());
public static SexpBytes _create_failed = new SexpBytes("create-failed".getBytes());
public static SexpList post(Object name, Object message, Object token) {
SexpList m = new SexpList();
m.add(_post);
m.add(name);
m.add(message);
m.add(token);
return m;
}
public static SexpList subscribe(Object filter, String sink, Object name, String replySink, Object replyName) {
SexpList m = new SexpList();
m.add(_subscribe);
m.add(filter);
m.add(sink);
m.add(name);
m.add(replySink);
m.add(replyName);
return m;
}
public static SexpList subscribe_ok(Object token) {
SexpList m = new SexpList();
m.add(_subscribe_ok);
m.add(token);
return m;
}
public static SexpList unsubscribe(Object token) {
SexpList m = new SexpList();
m.add(_unsubscribe);
m.add(token);
return m;
}
public static SexpList create(String nodeClassName, Object arg, String replySink, Object replyName) {
SexpList m = new SexpList();
m.add(_create);
m.add(nodeClassName);
m.add(arg);
m.add(replySink);
m.add(replyName);
return m;
}
}

131
java/hop/SexpReader.java Normal file
View File

@ -0,0 +1,131 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
import java.io.InputStream;
public class SexpReader {
public InputStream _input;
public SexpReader(InputStream input) {
_input = input;
}
/**
* Reads a sexp length-prefix from _input.
* @return The read length, or -1 if the end of stream is reached
* @throws IOException
* @throws SexpSyntaxError
*/
public int _readLength(int lengthSoFar) throws IOException {
int length = lengthSoFar;
while (true) {
int c = _input.read();
if (c == -1) return -1;
if (c == ':') {
return length;
}
if (!Character.isDigit(c)) {
throw new SexpSyntaxError("Invalid length prefix");
}
length = length * 10 + (c - '0');
}
}
/**
* Reads a simple length-prefixed string from _input, given either zero or the value
* of the first digit of the length-prefix being read.
* @param lengthSoFar either zero or the first digit of the length prefix to use
* @return the read string
* @throws IOException
* @throws SexpSyntaxError
*/
public byte[] _readSimpleString(int lengthSoFar) throws IOException {
int length = _readLength(lengthSoFar);
if (length == -1) return null;
byte[] buf = new byte[length];
int offset = 0;
while (length > 0) {
int count = _input.read(buf, offset, length);
if (count == -1) {
throw new SexpSyntaxError("End-of-stream in the middle of a simple string");
}
offset += count;
length -= count;
}
return buf;
}
public byte[] readSimpleString() throws IOException {
return _readSimpleString(0);
}
public SexpList _readList() throws IOException {
SexpList list = new SexpList();
while (true) {
int c = _input.read();
switch (c) {
case -1:
throw new SexpSyntaxError("Unclosed list");
case ')':
return list;
default:
list.add(_read(c));
break;
}
}
}
public Object _read(int c) throws IOException {
switch (c) {
case -1:
return null;
case '(':
return _readList();
case ')':
throw new SexpSyntaxError("Unexpected close-paren");
case '[':
byte[] hint = readSimpleString();
switch (_input.read()) {
case -1:
throw new SexpSyntaxError("End-of-stream between display hint and body");
case ']':
break;
default:
throw new SexpSyntaxError("Unexpected character after display hint");
}
byte[] body = readSimpleString();
return new SexpDisplayHint(hint, body);
default:
if (Character.isDigit(c)) {
return new SexpBytes(_readSimpleString(c - '0'));
} else {
throw new SexpSyntaxError("Unexpected character");
}
}
}
public Object read() throws IOException {
return _read(_input.read());
}
public SexpList readList() throws IOException {
Object x = read();
if (x != null && !(x instanceof SexpList)) {
throw new SexpSyntaxError("Unexpected non-list");
}
return (SexpList) x;
}
public SexpBytes readBytes() throws IOException {
Object x = read();
if (x != null && !(x instanceof SexpBytes)) {
throw new SexpSyntaxError("Unexpected non-bytes");
}
return (SexpBytes) x;
}
}

View File

@ -0,0 +1,16 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
/**
* Reports on a syntax problem reading an S-expression.
*/
public class SexpSyntaxError extends IOException {
public SexpSyntaxError(String s) {
super(s);
}
}

68
java/hop/SexpWriter.java Normal file
View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
/**
*/
public class SexpWriter {
public OutputStream _output;
public SexpWriter(OutputStream output) {
_output = output;
}
public static void writeSimpleString(OutputStream stream, byte[] buf) throws IOException {
stream.write(Integer.toString(buf.length).getBytes());
stream.write(':');
stream.write(buf);
}
public void write(Object x) throws IOException {
if (x instanceof String) {
writeSimpleString(_output, ((String) x).getBytes());
return;
}
if (x instanceof byte[]) {
writeSimpleString(_output, ((byte[]) x));
return;
}
if (x instanceof SexpBytes) {
((SexpBytes) x).writeTo(_output);
return;
}
if (x instanceof List) {
_output.write('(');
for (Object v : ((List<Object>) x)) {
write(v);
}
_output.write(')');
return;
}
if (x == null) {
_output.write("0:".getBytes());
return;
}
throw new SexpSyntaxError("Unsupported sexp object type");
}
public static void write(OutputStream output, Object x) throws IOException {
new SexpWriter(output).write(x);
}
public static String writeString(Object x) {
try {
ByteArrayOutputStream o = new ByteArrayOutputStream();
write(o, x);
return new String(o.toByteArray());
} catch (IOException ioe) {
return x.toString();
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
/**
*/
public class Subscription {
public ServerApi _api;
public String _source;
public Object _filter;
public String _consumerName;
public HalfQueue _consumer;
public Object _subscriptionToken;
public Subscription(ServerApi api, String source, Object filter) throws InterruptedException, SexpSyntaxError {
_api = api;
_source = source;
_filter = filter;
_consumerName = UUID.randomUUID().toString();
_consumer = new HalfQueue();
_api._container.bind(_consumerName, _consumer);
_subscriptionToken = _api.subscribe(source, filter, _consumerName);
}
public BlockingQueue<Object> getQueue() {
return _consumer.getQueue();
}
public void unsubscribe() {
_api.unsubscribe(_source, _subscriptionToken);
}
}

36
java/hop/Test1.java Normal file
View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
/**
*/
public class Test1 {
public static void main(String[] args) {
try {
run(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void run(String hostname) throws IOException, InterruptedException {
NodeContainer nc = new NodeContainer();
System.out.println("Hostname: " + hostname);
System.out.println("Container: " + nc.getName());
Relay r = new Relay(nc, hostname);
ServerApi api = new ServerApi(nc, r.getRemoteName());
api.createQueue("q1");
Subscription sub = api.subscribe("q1", null);
while (true) {
Object x = sub.getQueue().take();
System.out.println("Message: " + x);
}
}
}

82
java/hop/TestSexpIO.java Normal file
View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import junit.framework.TestCase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*/
public class TestSexpIO extends TestCase {
public Object read(String s) throws IOException {
return new SexpReader(new ByteArrayInputStream(s.getBytes())).read();
}
public byte[] write(Object x) throws IOException {
ByteArrayOutputStream o = new ByteArrayOutputStream();
new SexpWriter(o).write(x);
return o.toByteArray();
}
public void assertBytesEqual(String expected, byte[] actual) {
assertBytesEqual(expected.getBytes(), actual);
}
public void assertBytesEqual(byte[] expected, byte[] actual) {
assertEquals(expected.length, actual.length);
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], actual[i]);
}
}
public void testEndOfStream() throws IOException {
assertNull(read(""));
}
public void testEmptyList() throws IOException {
assertEquals(new ArrayList<Object>(), read("()"));
}
public void testSimpleString() throws IOException {
assertBytesEqual("hello", ((SexpBytes) read("5:hello")).getData());
}
public void testDisplayHint() throws IOException {
SexpDisplayHint v = (SexpDisplayHint) read("[1:h]1:b");
assertBytesEqual("b", v.getData());
assertBytesEqual("h", v.getHint());
assertBytesEqual("[1:h]1:b", write(v));
}
public void testSimpleList() throws IOException {
List<Object> l = (List<Object>) read("(1:a1:b1:c)");
assertEquals(3, l.size());
assertBytesEqual("a", ((SexpBytes) l.get(0)).getData());
assertBytesEqual("b", ((SexpBytes) l.get(1)).getData());
assertBytesEqual("c", ((SexpBytes) l.get(2)).getData());
}
public void testNestedList() throws IOException {
List<Object> l = (List<Object>) read("(1:a(1:b1:c)())");
assertEquals(3, l.size());
assertBytesEqual("a", ((SexpBytes) l.get(0)).getData());
List<Object> k = (List<Object>) l.get(1);
assertEquals(2, k.size());
assertBytesEqual("b", ((SexpBytes) k.get(0)).getData());
assertBytesEqual("c", ((SexpBytes) k.get(1)).getData());
assertEquals(new ArrayList<Object>(), l.get(2));
assertBytesEqual("(1:b1:c)", write(k));
assertBytesEqual("(1:a(1:b1:c)())", write(l));
}
public void testNullWrite() throws IOException {
assertBytesEqual("0:", write(null));
}
}