commit 5da0fcc3a2e482e567bfa8ddadeadbc77a5407ed Author: Tony Garnock-Jones Date: Tue Mar 6 18:08:18 2012 -0500 Liberate hop from cmsg at rev 17af172e3d072b71282d8a4340611e3776d0ac46 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f6fba88 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +out +build +*.iml +*.ipr +*.iws +.idea diff --git a/build.xml b/build.xml new file mode 100644 index 0000000..3b8a7e3 --- /dev/null +++ b/build.xml @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/hop/HalfQueue.java b/hop/HalfQueue.java new file mode 100644 index 0000000..03a5893 --- /dev/null +++ b/hop/HalfQueue.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + */ +public class HalfQueue implements Node { + public BlockingQueue _q; + + public HalfQueue() { + _q = new LinkedBlockingQueue(); + } + + public void handle(Object message) { + _q.add(message); + } + + public BlockingQueue getQueue() { + return _q; + } +} diff --git a/hop/InvalidGreetingException.java b/hop/InvalidGreetingException.java new file mode 100644 index 0000000..420e46a --- /dev/null +++ b/hop/InvalidGreetingException.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; + +/** + */ +public class InvalidGreetingException extends IOException { + Object _greeting; + + public InvalidGreetingException(Object greeting) { + _greeting = greeting; + } +} diff --git a/hop/Node.java b/hop/Node.java new file mode 100644 index 0000000..e4e9ac0 --- /dev/null +++ b/hop/Node.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +/** + */ +public interface Node { + void handle(Object message); +} diff --git a/hop/NodeContainer.java b/hop/NodeContainer.java new file mode 100644 index 0000000..789201f --- /dev/null +++ b/hop/NodeContainer.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.Flushable; +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.*; + +/** + */ +public class NodeContainer implements Flushable { + public String _name; + public Map> _directory; + + public NodeContainer() { + this(UUID.randomUUID().toString()); + } + + public NodeContainer(String name) { + _name = name; + _directory = new Hashtable>(); + } + + public String getName() { + return _name; + } + + public synchronized boolean bind(String name, Node n) { + WeakReference ref = _directory.get(name); + if (ref != null && ref.get() != null) { + return false; + } + ref = new WeakReference(n); + _directory.put(name, ref); + return true; + } + + public synchronized boolean unbind(String name) { + if (!_directory.containsKey(name)) + return false; + _directory.remove(name); + return true; + } + + public void flush() throws IOException { + ArrayList fs = new ArrayList(); + synchronized (this) { + for (Map.Entry> e : _directory.entrySet()) { + Node n = e.getValue().get(); + if (n instanceof Flushable) { + fs.add((Flushable) n); + } + } + } + for (Flushable f : fs) { + f.flush(); + } + } + + public void flush(String name) throws IOException { + Flushable f; + synchronized (this) { + WeakReference ref = _directory.get(name); + if (ref == null) return; + Node n = ref.get(); + if (n == null) return; + if (!(n instanceof Flushable)) return; + f = ((Flushable) n); + } + f.flush(); + } + + public synchronized void unbindReferencesTo(Node n) { + for (Map.Entry> e : _directory.entrySet()) { + if (e.getValue().get() == n) { + _directory.remove(e.getKey()); + } + } + } + + public synchronized Node lookup(String name) { + WeakReference r = _directory.get(name); + return (r == null) ? null : r.get(); + } + + public boolean post(String sink, Object name, Object message, Object token) { + return send(sink, SexpMessage.post(name, message, token)); + } + + public boolean send(String name, Object message) { + Node n = lookup(name); + if (n == null) { + System.err.println("Warning: sending to nonexistent node " + name + "; message " + message); + return false; + } + n.handle(message); + return true; + } +} diff --git a/hop/Relay.java b/hop/Relay.java new file mode 100644 index 0000000..698be96 --- /dev/null +++ b/hop/Relay.java @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.*; +import java.net.Socket; + +/** + */ +public class Relay implements Runnable, Node, Flushable { + NodeContainer _container; + String _remoteName; + Socket _sock; + String _hostname; + int _port; + SexpReader _r; + OutputStream _output; + SexpWriter _w; + + public Relay(NodeContainer container, String hostname) throws IOException, InterruptedException { + this(container, hostname, 5671); + } + + public Relay(NodeContainer container, String hostname, int port) throws IOException, InterruptedException { + _container = container; + _remoteName = null; + _hostname = hostname; + _port = port; + _connect(); + } + + public String getRemoteName() { + return _remoteName; + } + + public void _connect() throws IOException, InterruptedException { + _sock = new Socket(_hostname, _port); + _sock.setTcpNoDelay(true); + _r = new SexpReader(new BufferedInputStream(_sock.getInputStream())); + _output = new BufferedOutputStream(_sock.getOutputStream()); + _w = new SexpWriter(_output); + _login(); + new Thread(this).start(); + synchronized (this) { + while (_remoteName == null) { + this.wait(); + } + } + } + + public void _login() throws IOException { + SexpList greeting = _r.readList(); + if (!greeting.getBytes(0).getDataString().equals("hop")) { + throw new InvalidGreetingException(greeting); + } + + _w.write(SexpMessage.subscribe(_container.getName(), null, null, null, null)); + } + + public void handle(Object message) { + try { + _w.write(message); + } catch (IOException ioe) { + ioe.printStackTrace(); + System.err.print("Message to be written was: "); + try { + SexpWriter.write(System.err, message); + } catch (IOException ioe2) { + ioe2.printStackTrace(); + } + System.err.println(); + } + } + + public void flush() throws IOException { + _output.flush(); + } + + public void run() { + SexpList m = null; + try { + while (true) { + m = _r.readList(); + if (m == null) { + break; + } + //System.err.println("Received: " + m); + String selector = m.getBytes(0).getDataString(); + if (selector.equals("post") && m.size() == 4) { + _container.send(m.getBytes(1).getDataString(), m.get(2)); + } else if (selector.equals("subscribe") && m.size() == 6) { + if (_remoteName != null) { + System.err.println("Double bind attempted"); + } else { + _remoteName = m.getBytes(1).getDataString(); + synchronized (this) { + this.notifyAll(); + } + if (_container.bind(_remoteName, this)) { + String replySink = m.getBytes(4).getDataString(); + if (replySink.length() > 0) { + _container.post(replySink, m.get(5), SexpMessage.subscribe_ok(_remoteName), null); + } + } else { + System.err.println("Bind failed: " + _remoteName); + } + } + } else if (selector.equals("unsubscribe") && m.size() == 2) { + if (!m.getBytes(1).getDataString().equals(_remoteName)) { + System.err.println("Unknown unbind attempted"); + } else { + if (!_container.unbind(m.getBytes(1).getDataString())) { + System.err.println("Unbind failed: " + m.get(1)); + } + } + } else { + System.err.print("Unknown message: "); + SexpWriter.write(System.err, m); + System.err.println(); + } + } + } catch (IOException ioe) { + ioe.printStackTrace(); + System.err.print("Most recent received message: "); + try { + SexpWriter.write(System.err, m); + } catch (IOException ioe2) { + ioe2.printStackTrace(); + } + System.err.println(); + } + } +} diff --git a/hop/ServerApi.java b/hop/ServerApi.java new file mode 100644 index 0000000..bf72c6a --- /dev/null +++ b/hop/ServerApi.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.Flushable; +import java.io.IOException; +import java.util.UUID; + +/** + */ +public class ServerApi implements Flushable { + public NodeContainer _container; + public String _serverName; + public String _kName; + public HalfQueue _k; + + public ServerApi(NodeContainer container, String serverName) { + _container = container; + _serverName = serverName; + _kName = UUID.randomUUID().toString(); + _k = new HalfQueue(); + _container.bind(_kName, _k); + } + + public SexpList _nextReply() throws InterruptedException, SexpSyntaxError { + Object x = _k.getQueue().take(); + if (x instanceof SexpList) return (SexpList) x; + throw new SexpSyntaxError("Unexpected non-list"); + } + + public void post(String sink, Object name, Object message, Object token) { + _container.post(_serverName, sink, SexpMessage.post(name, message, token), null); + } + + public void send(String sink, Object message) { + _container.post(_serverName, sink, message, null); + } + + public void flush() throws IOException { + _container.flush(_serverName); + } + + public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException { + send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName)); + flush(); + SexpList reply = _nextReply(); + assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok); + return reply.get(1); + } + + public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, IOException { + return subscribe(source, filter, _container.getName(), name); + } + + public Subscription subscribe(String source, Object filter) throws InterruptedException, IOException { + return new Subscription(this, source, filter); + } + + public void unsubscribe(String source, Object token) throws IOException { + send(source, SexpMessage.unsubscribe(token)); + flush(); + /* TODO: optional synchronous reply? */ + } + + public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException { + send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName)); + flush(); + SexpList reply = _nextReply(); + SexpBytes selector = reply.getBytes(0); + if (selector.equals(SexpMessage._create_ok)) return null; + assert selector.equals(SexpMessage._create_failed); + return reply.get(1); + } + + public Object createQueue(String name) throws InterruptedException, IOException { + return create("queue", SexpList.with(name)); + } + + public Object createFanout(String name) throws InterruptedException, IOException { + return create("fanout", SexpList.with(name)); + } + + public Object createDirect(String name) throws InterruptedException, IOException { + return create("direct", SexpList.with(name)); + } +} diff --git a/hop/SexpBytes.java b/hop/SexpBytes.java new file mode 100644 index 0000000..1385e56 --- /dev/null +++ b/hop/SexpBytes.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +/** + */ +public class SexpBytes { + public byte[] _bytes; + + public SexpBytes(byte[] bytes) { + _bytes = bytes; + } + + public byte[] getData() { + return _bytes; + } + + public String getDataString() { + return new String(getData()); + } + + public void writeTo(OutputStream stream) throws IOException { + SexpWriter.writeSimpleString(stream, _bytes); + } + + public String toString() { + return SexpWriter.writeString(this); + } + + public boolean equals(Object other) { + return (other instanceof SexpBytes) && + Arrays.equals(_bytes, ((SexpBytes) other).getData()); + } + + public int hashCode() { + return Arrays.hashCode(_bytes); + } +} diff --git a/hop/SexpDisplayHint.java b/hop/SexpDisplayHint.java new file mode 100644 index 0000000..0ce41b1 --- /dev/null +++ b/hop/SexpDisplayHint.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; +import java.io.OutputStream; + +/** + */ +public class SexpDisplayHint extends SexpBytes { + public byte[] _hint; + + public SexpDisplayHint(byte[] hint, byte[] body) { + super(body); + _hint = hint; + } + + public byte[] getHint() { + return _hint; + } + + public void writeTo(OutputStream stream) throws IOException { + stream.write('['); + SexpWriter.writeSimpleString(stream, _hint); + stream.write(']'); + super.writeTo(stream); + } +} diff --git a/hop/SexpList.java b/hop/SexpList.java new file mode 100644 index 0000000..11d11c5 --- /dev/null +++ b/hop/SexpList.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.util.ArrayList; + +/** + */ +public class SexpList extends ArrayList { + public SexpBytes getBytes(int index) throws SexpSyntaxError { + Object x = get(index); + if (x != null && !(x instanceof SexpBytes)) { + throw new SexpSyntaxError("Unexpected non-bytes"); + } + return (SexpBytes) get(index); + } + + public SexpList getList(int index) throws SexpSyntaxError { + Object x = get(index); + if (x != null && !(x instanceof SexpList)) { + throw new SexpSyntaxError("Unexpected non-list"); + } + return (SexpList) get(index); + } + + public static SexpList empty() { + return new SexpList(); + } + + public static SexpList with(Object x) { + return empty().and(x); + } + + public SexpList and(Object x) { + this.add(x); + return this; + } +} diff --git a/hop/SexpMessage.java b/hop/SexpMessage.java new file mode 100644 index 0000000..32c3a42 --- /dev/null +++ b/hop/SexpMessage.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +/** + */ +public class SexpMessage { + public static SexpBytes _post = new SexpBytes("post".getBytes()); + public static SexpBytes _subscribe = new SexpBytes("subscribe".getBytes()); + public static SexpBytes _unsubscribe = new SexpBytes("unsubscribe".getBytes()); + public static SexpBytes _subscribe_ok = new SexpBytes("subscribe-ok".getBytes()); + public static SexpBytes _create = new SexpBytes("create".getBytes()); + public static SexpBytes _create_ok = new SexpBytes("create-ok".getBytes()); + public static SexpBytes _create_failed = new SexpBytes("create-failed".getBytes()); + + public static SexpList post(Object name, Object message, Object token) { + SexpList m = new SexpList(); + m.add(_post); + m.add(name); + m.add(message); + m.add(token); + return m; + } + + public static SexpList subscribe(Object filter, String sink, Object name, String replySink, Object replyName) { + SexpList m = new SexpList(); + m.add(_subscribe); + m.add(filter); + m.add(sink); + m.add(name); + m.add(replySink); + m.add(replyName); + return m; + } + + public static SexpList subscribe_ok(Object token) { + SexpList m = new SexpList(); + m.add(_subscribe_ok); + m.add(token); + return m; + } + + public static SexpList unsubscribe(Object token) { + SexpList m = new SexpList(); + m.add(_unsubscribe); + m.add(token); + return m; + } + + public static SexpList create(String nodeClassName, Object arg, String replySink, Object replyName) { + SexpList m = new SexpList(); + m.add(_create); + m.add(nodeClassName); + m.add(arg); + m.add(replySink); + m.add(replyName); + return m; + } +} diff --git a/hop/SexpReader.java b/hop/SexpReader.java new file mode 100644 index 0000000..0177fe7 --- /dev/null +++ b/hop/SexpReader.java @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; +import java.io.InputStream; + +public class SexpReader { + public InputStream _input; + + public SexpReader(InputStream input) { + _input = input; + } + + /** + * Reads a sexp length-prefix from _input. + * @return The read length, or -1 if the end of stream is reached + * @throws IOException + * @throws SexpSyntaxError + */ + public int _readLength(int lengthSoFar) throws IOException { + int length = lengthSoFar; + + while (true) { + int c = _input.read(); + if (c == -1) return -1; + if (c == ':') { + return length; + } + if (!Character.isDigit(c)) { + throw new SexpSyntaxError("Invalid length prefix"); + } + length = length * 10 + (c - '0'); + } + } + + /** + * Reads a simple length-prefixed string from _input, given either zero or the value + * of the first digit of the length-prefix being read. + * @param lengthSoFar either zero or the first digit of the length prefix to use + * @return the read string + * @throws IOException + * @throws SexpSyntaxError + */ + public byte[] _readSimpleString(int lengthSoFar) throws IOException { + int length = _readLength(lengthSoFar); + if (length == -1) return null; + byte[] buf = new byte[length]; + int offset = 0; + while (length > 0) { + int count = _input.read(buf, offset, length); + if (count == -1) { + throw new SexpSyntaxError("End-of-stream in the middle of a simple string"); + } + offset += count; + length -= count; + } + return buf; + } + + public byte[] readSimpleString() throws IOException { + return _readSimpleString(0); + } + + public SexpList _readList() throws IOException { + SexpList list = new SexpList(); + while (true) { + int c = _input.read(); + switch (c) { + case -1: + throw new SexpSyntaxError("Unclosed list"); + case ')': + return list; + default: + list.add(_read(c)); + break; + } + } + } + + public Object _read(int c) throws IOException { + while (true) { + switch (c) { + case -1: + return null; + case '(': + return _readList(); + case ')': + throw new SexpSyntaxError("Unexpected close-paren"); + case '[': + byte[] hint = readSimpleString(); + switch (_input.read()) { + case -1: + throw new SexpSyntaxError("End-of-stream between display hint and body"); + case ']': + break; + default: + throw new SexpSyntaxError("Unexpected character after display hint"); + } + byte[] body = readSimpleString(); + return new SexpDisplayHint(hint, body); + default: + if (Character.isDigit(c)) { + return new SexpBytes(_readSimpleString(c - '0')); + } else if (Character.isWhitespace(c)) { + // Skip harmless (?) whitespace + c = _input.read(); + continue; + } else { + throw new SexpSyntaxError("Unexpected character: " + c); + } + } + } + } + + public Object read() throws IOException { + return _read(_input.read()); + } + + public SexpList readList() throws IOException { + Object x = read(); + if (x != null && !(x instanceof SexpList)) { + throw new SexpSyntaxError("Unexpected non-list"); + } + return (SexpList) x; + } + + public SexpBytes readBytes() throws IOException { + Object x = read(); + if (x != null && !(x instanceof SexpBytes)) { + throw new SexpSyntaxError("Unexpected non-bytes"); + } + return (SexpBytes) x; + } +} \ No newline at end of file diff --git a/hop/SexpSyntaxError.java b/hop/SexpSyntaxError.java new file mode 100644 index 0000000..14c335c --- /dev/null +++ b/hop/SexpSyntaxError.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; + +/** + * Reports on a syntax problem reading an S-expression. + */ +public class SexpSyntaxError extends IOException { + public SexpSyntaxError(String s) { + super(s); + } +} diff --git a/hop/SexpWriter.java b/hop/SexpWriter.java new file mode 100644 index 0000000..91e1db0 --- /dev/null +++ b/hop/SexpWriter.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +/** + */ +public class SexpWriter { + public OutputStream _output; + + public SexpWriter(OutputStream output) { + _output = output; + } + + public static void writeSimpleString(OutputStream stream, byte[] buf) throws IOException { + stream.write(Integer.toString(buf.length).getBytes()); + stream.write(':'); + stream.write(buf); + } + + public void write(Object x) throws IOException { + if (x instanceof String) { + writeSimpleString(_output, ((String) x).getBytes()); + return; + } + if (x instanceof byte[]) { + writeSimpleString(_output, ((byte[]) x)); + return; + } + if (x instanceof SexpBytes) { + ((SexpBytes) x).writeTo(_output); + return; + } + if (x instanceof List) { + _output.write('('); + for (Object v : ((List) x)) { + write(v); + } + _output.write(')'); + return; + } + if (x == null) { + _output.write("0:".getBytes()); + return; + } + throw new SexpSyntaxError("Unsupported sexp object type"); + } + + public static void write(OutputStream output, Object x) throws IOException { + new SexpWriter(output).write(x); + } + + public static String writeString(Object x) { + try { + ByteArrayOutputStream o = new ByteArrayOutputStream(); + write(o, x); + return new String(o.toByteArray()); + } catch (IOException ioe) { + return x.toString(); + } + } +} diff --git a/hop/Subscription.java b/hop/Subscription.java new file mode 100644 index 0000000..0f09010 --- /dev/null +++ b/hop/Subscription.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; + +/** + */ +public class Subscription { + public ServerApi _api; + public String _source; + public Object _filter; + public String _consumerName; + public HalfQueue _consumer; + public Object _subscriptionToken; + + public Subscription(ServerApi api, String source, Object filter) throws InterruptedException, IOException { + _api = api; + _source = source; + _filter = filter; + _consumerName = UUID.randomUUID().toString(); + _consumer = new HalfQueue(); + _api._container.bind(_consumerName, _consumer); + _subscriptionToken = _api.subscribe(source, filter, _consumerName); + } + + public BlockingQueue getQueue() { + return _consumer.getQueue(); + } + + public void unsubscribe() throws IOException { + _api.unsubscribe(_source, _subscriptionToken); + } +} diff --git a/hop/Test1.java b/hop/Test1.java new file mode 100644 index 0000000..45e3332 --- /dev/null +++ b/hop/Test1.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; + +/** + */ +public class Test1 { + public static void main(String[] args) { + try { + run(args[0]); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void run(String hostname) throws IOException, InterruptedException { + NodeContainer nc = new NodeContainer(); + + System.out.println("Hostname: " + hostname); + System.out.println("Container: " + nc.getName()); + + Relay r = new Relay(nc, hostname); + ServerApi api = new ServerApi(nc, r.getRemoteName()); + + api.createQueue("q1"); + Subscription sub = api.subscribe("q1", null); + long startTime = 0; + int count = 0; + while (true) { + Object x = sub.getQueue().take(); + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } + count++; + if ((count % 100000) == 0) { + long now = System.currentTimeMillis(); + double delta = (now - startTime) / 1000.0; + System.out.println("Received "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz"); + } + } + } +} diff --git a/hop/Test3.java b/hop/Test3.java new file mode 100644 index 0000000..63512ef --- /dev/null +++ b/hop/Test3.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; + +/** + */ +public class Test3 { + public static void main(String[] args) { + try { + run(args[0]); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void run(String hostname) throws IOException, InterruptedException { + NodeContainer nc = new NodeContainer(); + + System.out.println("Hostname: " + hostname); + + System.out.println("Container: " + nc.getName()); + + Relay r = new Relay(nc, hostname); + ServerApi api = new ServerApi(nc, r.getRemoteName()); + + api.createQueue("q1"); + long startTime = System.currentTimeMillis(); + int count = 0; + for (int i = 0; i < 10000000; i++) { + api.post("q1", null, Integer.toString(i), null); + count++; + if ((count % 100000) == 0) { + api.flush(); + long now = System.currentTimeMillis(); + double delta = (now - startTime) / 1000.0; + System.out.println("Sent "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz"); + } + } + } +} diff --git a/hop/TestPingPong.java b/hop/TestPingPong.java new file mode 100644 index 0000000..d8de0b6 --- /dev/null +++ b/hop/TestPingPong.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; + +/** + */ +public class TestPingPong { + public static void main(final String[] args) { + try { + new Thread(new Runnable() { public void run() { + try { + run1(args[0]); + } catch (Exception e) { + e.printStackTrace(); + } + } }).start(); + run2(args[0]); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void run1(String hostname) throws IOException, InterruptedException { + NodeContainer nc = new NodeContainer(); + + System.out.println("Hostname: " + hostname); + System.out.println("Container: " + nc.getName()); + + Relay r = new Relay(nc, hostname); + ServerApi api = new ServerApi(nc, r.getRemoteName()); + + api.createQueue("req"); + Subscription sub = api.subscribe("req", null); + while (true) { + Object x = sub.getQueue().take(); + //System.out.println("Message: " + x); + api.post("rep", "reply", SexpList.with("ok").and(x), null); + api.flush(); + } + } + + public static void run2(String hostname) throws IOException, InterruptedException { + NodeContainer nc = new NodeContainer(); + + System.out.println("Hostname: " + hostname); + System.out.println("Container: " + nc.getName()); + + Relay r = new Relay(nc, hostname); + ServerApi api = new ServerApi(nc, r.getRemoteName()); + + api.createQueue("req"); + api.createQueue("rep"); + Subscription sub = api.subscribe("rep", null); + long startTime = System.currentTimeMillis(); + for (int i = 0; i < 100000; i++) { + api.post("req", "request", Integer.toString(i), null); + api.flush(); + sub.getQueue().take(); + int j = i + 1; + if ((j % 100) == 0) { + long now = System.currentTimeMillis(); + double delta = (now - startTime) / 1000.0; + System.out.println("Message " + j + ": " + (j / delta) + " Hz"); + } + } + } +} diff --git a/hop/TestScale.java b/hop/TestScale.java new file mode 100644 index 0000000..b921299 --- /dev/null +++ b/hop/TestScale.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + */ +public class TestScale { + static AtomicLong counter = new AtomicLong(); + + public static void main(final String[] args) { + try { + final String hostname = args[0]; + int count = Integer.parseInt(args[1]); + System.out.println("Hostname: " + hostname); + for (int i = 0; i < count; i++) { + new Thread(new Runnable() { public void run() { + try { + runConnection(hostname); + } catch (Exception e) { + e.printStackTrace(); + } + } }).start(); + Thread.sleep(100); + } + while (true) { + long startTime = System.currentTimeMillis(); + long startCount = counter.longValue(); + Thread.sleep(1000); + long now = System.currentTimeMillis(); + long countNow = counter.longValue(); + report(startTime, startCount, now, countNow); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void report(long t0, long c0, long t1, long c1) { + double dc = c1 - c0; + double dt = (t1 - t0) / 1000.0; + double rate = dc / dt; + System.out.println(dc + " messages in " + dt + "s = " + rate + " Hz"); + } + + public static void runConnection(String hostname) throws IOException, InterruptedException { + NodeContainer nc = new NodeContainer(); + String qName = nc.getName() + "q"; + System.out.println("Queue: " + qName); + + Relay r = new Relay(nc, hostname); + ServerApi api = new ServerApi(nc, r.getRemoteName()); + + api.createQueue(qName); + Subscription sub = api.subscribe(qName, null); + while (true) { + Object in = "a"; + api.post(qName, "", in, null); + api.flush(); + Object out = sub.getQueue().take(); + assert in.equals(out); + counter.incrementAndGet(); + } + } +} diff --git a/hop/TestSexpIO.java b/hop/TestSexpIO.java new file mode 100644 index 0000000..3acd4a6 --- /dev/null +++ b/hop/TestSexpIO.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import junit.framework.TestCase; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + */ +public class TestSexpIO extends TestCase { + public Object read(String s) throws IOException { + return new SexpReader(new ByteArrayInputStream(s.getBytes())).read(); + } + + public byte[] write(Object x) throws IOException { + ByteArrayOutputStream o = new ByteArrayOutputStream(); + new SexpWriter(o).write(x); + return o.toByteArray(); + } + + public void assertBytesEqual(String expected, byte[] actual) { + assertBytesEqual(expected.getBytes(), actual); + } + + public void assertBytesEqual(byte[] expected, byte[] actual) { + assertEquals(expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], actual[i]); + } + } + + public void testEndOfStream() throws IOException { + assertNull(read("")); + } + + public void testEmptyList() throws IOException { + assertEquals(new ArrayList(), read("()")); + } + + public void testSimpleString() throws IOException { + assertBytesEqual("hello", ((SexpBytes) read("5:hello")).getData()); + } + + public void testDisplayHint() throws IOException { + SexpDisplayHint v = (SexpDisplayHint) read("[1:h]1:b"); + assertBytesEqual("b", v.getData()); + assertBytesEqual("h", v.getHint()); + assertBytesEqual("[1:h]1:b", write(v)); + } + + public void testSimpleList() throws IOException { + List l = (List) read("(1:a1:b1:c)"); + assertEquals(3, l.size()); + assertBytesEqual("a", ((SexpBytes) l.get(0)).getData()); + assertBytesEqual("b", ((SexpBytes) l.get(1)).getData()); + assertBytesEqual("c", ((SexpBytes) l.get(2)).getData()); + } + + public void testNestedList() throws IOException { + List l = (List) read("(1:a(1:b1:c)())"); + assertEquals(3, l.size()); + assertBytesEqual("a", ((SexpBytes) l.get(0)).getData()); + List k = (List) l.get(1); + assertEquals(2, k.size()); + assertBytesEqual("b", ((SexpBytes) k.get(0)).getData()); + assertBytesEqual("c", ((SexpBytes) k.get(1)).getData()); + assertEquals(new ArrayList(), l.get(2)); + assertBytesEqual("(1:b1:c)", write(k)); + assertBytesEqual("(1:a(1:b1:c)())", write(l)); + } + + public void testNullWrite() throws IOException { + assertBytesEqual("0:", write(null)); + } +} diff --git a/lib/junit-4.8.2.jar b/lib/junit-4.8.2.jar new file mode 100644 index 0000000..5b4bb84 Binary files /dev/null and b/lib/junit-4.8.2.jar differ