Liberate hop from cmsg
This commit is contained in:
parent
17af172e3d
commit
c539cfd526
|
@ -8,7 +8,3 @@ server/test3
|
|||
server/test1_latency
|
||||
server/test3_latency
|
||||
depend.mk
|
||||
java/out
|
||||
java/build
|
||||
*.iml
|
||||
.idea
|
||||
|
|
|
@ -1,24 +0,0 @@
|
|||
<?xml version="1.0"?>
|
||||
<project name="Hop" default="jar">
|
||||
<path id="javac.classpath">
|
||||
<fileset dir="lib">
|
||||
<include name="**/*.jar"/>
|
||||
</fileset>
|
||||
</path>
|
||||
|
||||
<target name="build">
|
||||
<mkdir dir="build/classes"/>
|
||||
<javac destdir="build/classes" classpathref="javac.classpath" debug="true">
|
||||
<src path="hop"/>
|
||||
</javac>
|
||||
</target>
|
||||
|
||||
<target name="jar" depends="build">
|
||||
<mkdir dir="build/lib"/>
|
||||
<jar destfile="build/lib/hop.jar" basedir="build/classes" />
|
||||
</target>
|
||||
|
||||
<target name="clean">
|
||||
<delete dir="build"/>
|
||||
</target>
|
||||
</project>
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HalfQueue implements Node {
|
||||
public BlockingQueue<Object> _q;
|
||||
|
||||
public HalfQueue() {
|
||||
_q = new LinkedBlockingQueue<Object>();
|
||||
}
|
||||
|
||||
public void handle(Object message) {
|
||||
_q.add(message);
|
||||
}
|
||||
|
||||
public BlockingQueue<Object> getQueue() {
|
||||
return _q;
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class InvalidGreetingException extends IOException {
|
||||
Object _greeting;
|
||||
|
||||
public InvalidGreetingException(Object greeting) {
|
||||
_greeting = greeting;
|
||||
}
|
||||
}
|
|
@ -1,11 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface Node {
|
||||
void handle(Object message);
|
||||
}
|
|
@ -1,102 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.Flushable;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NodeContainer implements Flushable {
|
||||
public String _name;
|
||||
public Map<String, WeakReference<Node>> _directory;
|
||||
|
||||
public NodeContainer() {
|
||||
this(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
public NodeContainer(String name) {
|
||||
_name = name;
|
||||
_directory = new Hashtable<String, WeakReference<Node>>();
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return _name;
|
||||
}
|
||||
|
||||
public synchronized boolean bind(String name, Node n) {
|
||||
WeakReference<Node> ref = _directory.get(name);
|
||||
if (ref != null && ref.get() != null) {
|
||||
return false;
|
||||
}
|
||||
ref = new WeakReference<Node>(n);
|
||||
_directory.put(name, ref);
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized boolean unbind(String name) {
|
||||
if (!_directory.containsKey(name))
|
||||
return false;
|
||||
_directory.remove(name);
|
||||
return true;
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
ArrayList<Flushable> fs = new ArrayList<Flushable>();
|
||||
synchronized (this) {
|
||||
for (Map.Entry<String, WeakReference<Node>> e : _directory.entrySet()) {
|
||||
Node n = e.getValue().get();
|
||||
if (n instanceof Flushable) {
|
||||
fs.add((Flushable) n);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Flushable f : fs) {
|
||||
f.flush();
|
||||
}
|
||||
}
|
||||
|
||||
public void flush(String name) throws IOException {
|
||||
Flushable f;
|
||||
synchronized (this) {
|
||||
WeakReference<Node> ref = _directory.get(name);
|
||||
if (ref == null) return;
|
||||
Node n = ref.get();
|
||||
if (n == null) return;
|
||||
if (!(n instanceof Flushable)) return;
|
||||
f = ((Flushable) n);
|
||||
}
|
||||
f.flush();
|
||||
}
|
||||
|
||||
public synchronized void unbindReferencesTo(Node n) {
|
||||
for (Map.Entry<String, WeakReference<Node>> e : _directory.entrySet()) {
|
||||
if (e.getValue().get() == n) {
|
||||
_directory.remove(e.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Node lookup(String name) {
|
||||
WeakReference<Node> r = _directory.get(name);
|
||||
return (r == null) ? null : r.get();
|
||||
}
|
||||
|
||||
public boolean post(String sink, Object name, Object message, Object token) {
|
||||
return send(sink, SexpMessage.post(name, message, token));
|
||||
}
|
||||
|
||||
public boolean send(String name, Object message) {
|
||||
Node n = lookup(name);
|
||||
if (n == null) {
|
||||
System.err.println("Warning: sending to nonexistent node " + name + "; message " + message);
|
||||
return false;
|
||||
}
|
||||
n.handle(message);
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -1,135 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.Socket;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class Relay implements Runnable, Node, Flushable {
|
||||
NodeContainer _container;
|
||||
String _remoteName;
|
||||
Socket _sock;
|
||||
String _hostname;
|
||||
int _port;
|
||||
SexpReader _r;
|
||||
OutputStream _output;
|
||||
SexpWriter _w;
|
||||
|
||||
public Relay(NodeContainer container, String hostname) throws IOException, InterruptedException {
|
||||
this(container, hostname, 5671);
|
||||
}
|
||||
|
||||
public Relay(NodeContainer container, String hostname, int port) throws IOException, InterruptedException {
|
||||
_container = container;
|
||||
_remoteName = null;
|
||||
_hostname = hostname;
|
||||
_port = port;
|
||||
_connect();
|
||||
}
|
||||
|
||||
public String getRemoteName() {
|
||||
return _remoteName;
|
||||
}
|
||||
|
||||
public void _connect() throws IOException, InterruptedException {
|
||||
_sock = new Socket(_hostname, _port);
|
||||
_sock.setTcpNoDelay(true);
|
||||
_r = new SexpReader(new BufferedInputStream(_sock.getInputStream()));
|
||||
_output = new BufferedOutputStream(_sock.getOutputStream());
|
||||
_w = new SexpWriter(_output);
|
||||
_login();
|
||||
new Thread(this).start();
|
||||
synchronized (this) {
|
||||
while (_remoteName == null) {
|
||||
this.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void _login() throws IOException {
|
||||
SexpList greeting = _r.readList();
|
||||
if (!greeting.getBytes(0).getDataString().equals("hop")) {
|
||||
throw new InvalidGreetingException(greeting);
|
||||
}
|
||||
|
||||
_w.write(SexpMessage.subscribe(_container.getName(), null, null, null, null));
|
||||
}
|
||||
|
||||
public void handle(Object message) {
|
||||
try {
|
||||
_w.write(message);
|
||||
} catch (IOException ioe) {
|
||||
ioe.printStackTrace();
|
||||
System.err.print("Message to be written was: ");
|
||||
try {
|
||||
SexpWriter.write(System.err, message);
|
||||
} catch (IOException ioe2) {
|
||||
ioe2.printStackTrace();
|
||||
}
|
||||
System.err.println();
|
||||
}
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
_output.flush();
|
||||
}
|
||||
|
||||
public void run() {
|
||||
SexpList m = null;
|
||||
try {
|
||||
while (true) {
|
||||
m = _r.readList();
|
||||
if (m == null) {
|
||||
break;
|
||||
}
|
||||
//System.err.println("Received: " + m);
|
||||
String selector = m.getBytes(0).getDataString();
|
||||
if (selector.equals("post") && m.size() == 4) {
|
||||
_container.send(m.getBytes(1).getDataString(), m.get(2));
|
||||
} else if (selector.equals("subscribe") && m.size() == 6) {
|
||||
if (_remoteName != null) {
|
||||
System.err.println("Double bind attempted");
|
||||
} else {
|
||||
_remoteName = m.getBytes(1).getDataString();
|
||||
synchronized (this) {
|
||||
this.notifyAll();
|
||||
}
|
||||
if (_container.bind(_remoteName, this)) {
|
||||
String replySink = m.getBytes(4).getDataString();
|
||||
if (replySink.length() > 0) {
|
||||
_container.post(replySink, m.get(5), SexpMessage.subscribe_ok(_remoteName), null);
|
||||
}
|
||||
} else {
|
||||
System.err.println("Bind failed: " + _remoteName);
|
||||
}
|
||||
}
|
||||
} else if (selector.equals("unsubscribe") && m.size() == 2) {
|
||||
if (!m.getBytes(1).getDataString().equals(_remoteName)) {
|
||||
System.err.println("Unknown unbind attempted");
|
||||
} else {
|
||||
if (!_container.unbind(m.getBytes(1).getDataString())) {
|
||||
System.err.println("Unbind failed: " + m.get(1));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
System.err.print("Unknown message: ");
|
||||
SexpWriter.write(System.err, m);
|
||||
System.err.println();
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
ioe.printStackTrace();
|
||||
System.err.print("Most recent received message: ");
|
||||
try {
|
||||
SexpWriter.write(System.err, m);
|
||||
} catch (IOException ioe2) {
|
||||
ioe2.printStackTrace();
|
||||
}
|
||||
System.err.println();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,88 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.Flushable;
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ServerApi implements Flushable {
|
||||
public NodeContainer _container;
|
||||
public String _serverName;
|
||||
public String _kName;
|
||||
public HalfQueue _k;
|
||||
|
||||
public ServerApi(NodeContainer container, String serverName) {
|
||||
_container = container;
|
||||
_serverName = serverName;
|
||||
_kName = UUID.randomUUID().toString();
|
||||
_k = new HalfQueue();
|
||||
_container.bind(_kName, _k);
|
||||
}
|
||||
|
||||
public SexpList _nextReply() throws InterruptedException, SexpSyntaxError {
|
||||
Object x = _k.getQueue().take();
|
||||
if (x instanceof SexpList) return (SexpList) x;
|
||||
throw new SexpSyntaxError("Unexpected non-list");
|
||||
}
|
||||
|
||||
public void post(String sink, Object name, Object message, Object token) {
|
||||
_container.post(_serverName, sink, SexpMessage.post(name, message, token), null);
|
||||
}
|
||||
|
||||
public void send(String sink, Object message) {
|
||||
_container.post(_serverName, sink, message, null);
|
||||
}
|
||||
|
||||
public void flush() throws IOException {
|
||||
_container.flush(_serverName);
|
||||
}
|
||||
|
||||
public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException {
|
||||
send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName));
|
||||
flush();
|
||||
SexpList reply = _nextReply();
|
||||
assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok);
|
||||
return reply.get(1);
|
||||
}
|
||||
|
||||
public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, IOException {
|
||||
return subscribe(source, filter, _container.getName(), name);
|
||||
}
|
||||
|
||||
public Subscription subscribe(String source, Object filter) throws InterruptedException, IOException {
|
||||
return new Subscription(this, source, filter);
|
||||
}
|
||||
|
||||
public void unsubscribe(String source, Object token) throws IOException {
|
||||
send(source, SexpMessage.unsubscribe(token));
|
||||
flush();
|
||||
/* TODO: optional synchronous reply? */
|
||||
}
|
||||
|
||||
public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException {
|
||||
send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName));
|
||||
flush();
|
||||
SexpList reply = _nextReply();
|
||||
SexpBytes selector = reply.getBytes(0);
|
||||
if (selector.equals(SexpMessage._create_ok)) return null;
|
||||
assert selector.equals(SexpMessage._create_failed);
|
||||
return reply.get(1);
|
||||
}
|
||||
|
||||
public Object createQueue(String name) throws InterruptedException, IOException {
|
||||
return create("queue", SexpList.with(name));
|
||||
}
|
||||
|
||||
public Object createFanout(String name) throws InterruptedException, IOException {
|
||||
return create("fanout", SexpList.with(name));
|
||||
}
|
||||
|
||||
public Object createDirect(String name) throws InterruptedException, IOException {
|
||||
return create("direct", SexpList.with(name));
|
||||
}
|
||||
}
|
|
@ -1,44 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SexpBytes {
|
||||
public byte[] _bytes;
|
||||
|
||||
public SexpBytes(byte[] bytes) {
|
||||
_bytes = bytes;
|
||||
}
|
||||
|
||||
public byte[] getData() {
|
||||
return _bytes;
|
||||
}
|
||||
|
||||
public String getDataString() {
|
||||
return new String(getData());
|
||||
}
|
||||
|
||||
public void writeTo(OutputStream stream) throws IOException {
|
||||
SexpWriter.writeSimpleString(stream, _bytes);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return SexpWriter.writeString(this);
|
||||
}
|
||||
|
||||
public boolean equals(Object other) {
|
||||
return (other instanceof SexpBytes) &&
|
||||
Arrays.equals(_bytes, ((SexpBytes) other).getData());
|
||||
}
|
||||
|
||||
public int hashCode() {
|
||||
return Arrays.hashCode(_bytes);
|
||||
}
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SexpDisplayHint extends SexpBytes {
|
||||
public byte[] _hint;
|
||||
|
||||
public SexpDisplayHint(byte[] hint, byte[] body) {
|
||||
super(body);
|
||||
_hint = hint;
|
||||
}
|
||||
|
||||
public byte[] getHint() {
|
||||
return _hint;
|
||||
}
|
||||
|
||||
public void writeTo(OutputStream stream) throws IOException {
|
||||
stream.write('[');
|
||||
SexpWriter.writeSimpleString(stream, _hint);
|
||||
stream.write(']');
|
||||
super.writeTo(stream);
|
||||
}
|
||||
}
|
|
@ -1,40 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SexpList extends ArrayList<Object> {
|
||||
public SexpBytes getBytes(int index) throws SexpSyntaxError {
|
||||
Object x = get(index);
|
||||
if (x != null && !(x instanceof SexpBytes)) {
|
||||
throw new SexpSyntaxError("Unexpected non-bytes");
|
||||
}
|
||||
return (SexpBytes) get(index);
|
||||
}
|
||||
|
||||
public SexpList getList(int index) throws SexpSyntaxError {
|
||||
Object x = get(index);
|
||||
if (x != null && !(x instanceof SexpList)) {
|
||||
throw new SexpSyntaxError("Unexpected non-list");
|
||||
}
|
||||
return (SexpList) get(index);
|
||||
}
|
||||
|
||||
public static SexpList empty() {
|
||||
return new SexpList();
|
||||
}
|
||||
|
||||
public static SexpList with(Object x) {
|
||||
return empty().and(x);
|
||||
}
|
||||
|
||||
public SexpList and(Object x) {
|
||||
this.add(x);
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SexpMessage {
|
||||
public static SexpBytes _post = new SexpBytes("post".getBytes());
|
||||
public static SexpBytes _subscribe = new SexpBytes("subscribe".getBytes());
|
||||
public static SexpBytes _unsubscribe = new SexpBytes("unsubscribe".getBytes());
|
||||
public static SexpBytes _subscribe_ok = new SexpBytes("subscribe-ok".getBytes());
|
||||
public static SexpBytes _create = new SexpBytes("create".getBytes());
|
||||
public static SexpBytes _create_ok = new SexpBytes("create-ok".getBytes());
|
||||
public static SexpBytes _create_failed = new SexpBytes("create-failed".getBytes());
|
||||
|
||||
public static SexpList post(Object name, Object message, Object token) {
|
||||
SexpList m = new SexpList();
|
||||
m.add(_post);
|
||||
m.add(name);
|
||||
m.add(message);
|
||||
m.add(token);
|
||||
return m;
|
||||
}
|
||||
|
||||
public static SexpList subscribe(Object filter, String sink, Object name, String replySink, Object replyName) {
|
||||
SexpList m = new SexpList();
|
||||
m.add(_subscribe);
|
||||
m.add(filter);
|
||||
m.add(sink);
|
||||
m.add(name);
|
||||
m.add(replySink);
|
||||
m.add(replyName);
|
||||
return m;
|
||||
}
|
||||
|
||||
public static SexpList subscribe_ok(Object token) {
|
||||
SexpList m = new SexpList();
|
||||
m.add(_subscribe_ok);
|
||||
m.add(token);
|
||||
return m;
|
||||
}
|
||||
|
||||
public static SexpList unsubscribe(Object token) {
|
||||
SexpList m = new SexpList();
|
||||
m.add(_unsubscribe);
|
||||
m.add(token);
|
||||
return m;
|
||||
}
|
||||
|
||||
public static SexpList create(String nodeClassName, Object arg, String replySink, Object replyName) {
|
||||
SexpList m = new SexpList();
|
||||
m.add(_create);
|
||||
m.add(nodeClassName);
|
||||
m.add(arg);
|
||||
m.add(replySink);
|
||||
m.add(replyName);
|
||||
return m;
|
||||
}
|
||||
}
|
|
@ -1,137 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class SexpReader {
|
||||
public InputStream _input;
|
||||
|
||||
public SexpReader(InputStream input) {
|
||||
_input = input;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a sexp length-prefix from _input.
|
||||
* @return The read length, or -1 if the end of stream is reached
|
||||
* @throws IOException
|
||||
* @throws SexpSyntaxError
|
||||
*/
|
||||
public int _readLength(int lengthSoFar) throws IOException {
|
||||
int length = lengthSoFar;
|
||||
|
||||
while (true) {
|
||||
int c = _input.read();
|
||||
if (c == -1) return -1;
|
||||
if (c == ':') {
|
||||
return length;
|
||||
}
|
||||
if (!Character.isDigit(c)) {
|
||||
throw new SexpSyntaxError("Invalid length prefix");
|
||||
}
|
||||
length = length * 10 + (c - '0');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a simple length-prefixed string from _input, given either zero or the value
|
||||
* of the first digit of the length-prefix being read.
|
||||
* @param lengthSoFar either zero or the first digit of the length prefix to use
|
||||
* @return the read string
|
||||
* @throws IOException
|
||||
* @throws SexpSyntaxError
|
||||
*/
|
||||
public byte[] _readSimpleString(int lengthSoFar) throws IOException {
|
||||
int length = _readLength(lengthSoFar);
|
||||
if (length == -1) return null;
|
||||
byte[] buf = new byte[length];
|
||||
int offset = 0;
|
||||
while (length > 0) {
|
||||
int count = _input.read(buf, offset, length);
|
||||
if (count == -1) {
|
||||
throw new SexpSyntaxError("End-of-stream in the middle of a simple string");
|
||||
}
|
||||
offset += count;
|
||||
length -= count;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
public byte[] readSimpleString() throws IOException {
|
||||
return _readSimpleString(0);
|
||||
}
|
||||
|
||||
public SexpList _readList() throws IOException {
|
||||
SexpList list = new SexpList();
|
||||
while (true) {
|
||||
int c = _input.read();
|
||||
switch (c) {
|
||||
case -1:
|
||||
throw new SexpSyntaxError("Unclosed list");
|
||||
case ')':
|
||||
return list;
|
||||
default:
|
||||
list.add(_read(c));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Object _read(int c) throws IOException {
|
||||
while (true) {
|
||||
switch (c) {
|
||||
case -1:
|
||||
return null;
|
||||
case '(':
|
||||
return _readList();
|
||||
case ')':
|
||||
throw new SexpSyntaxError("Unexpected close-paren");
|
||||
case '[':
|
||||
byte[] hint = readSimpleString();
|
||||
switch (_input.read()) {
|
||||
case -1:
|
||||
throw new SexpSyntaxError("End-of-stream between display hint and body");
|
||||
case ']':
|
||||
break;
|
||||
default:
|
||||
throw new SexpSyntaxError("Unexpected character after display hint");
|
||||
}
|
||||
byte[] body = readSimpleString();
|
||||
return new SexpDisplayHint(hint, body);
|
||||
default:
|
||||
if (Character.isDigit(c)) {
|
||||
return new SexpBytes(_readSimpleString(c - '0'));
|
||||
} else if (Character.isWhitespace(c)) {
|
||||
// Skip harmless (?) whitespace
|
||||
c = _input.read();
|
||||
continue;
|
||||
} else {
|
||||
throw new SexpSyntaxError("Unexpected character: " + c);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Object read() throws IOException {
|
||||
return _read(_input.read());
|
||||
}
|
||||
|
||||
public SexpList readList() throws IOException {
|
||||
Object x = read();
|
||||
if (x != null && !(x instanceof SexpList)) {
|
||||
throw new SexpSyntaxError("Unexpected non-list");
|
||||
}
|
||||
return (SexpList) x;
|
||||
}
|
||||
|
||||
public SexpBytes readBytes() throws IOException {
|
||||
Object x = read();
|
||||
if (x != null && !(x instanceof SexpBytes)) {
|
||||
throw new SexpSyntaxError("Unexpected non-bytes");
|
||||
}
|
||||
return (SexpBytes) x;
|
||||
}
|
||||
}
|
|
@ -1,16 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Reports on a syntax problem reading an S-expression.
|
||||
*/
|
||||
public class SexpSyntaxError extends IOException {
|
||||
public SexpSyntaxError(String s) {
|
||||
super(s);
|
||||
}
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SexpWriter {
|
||||
public OutputStream _output;
|
||||
|
||||
public SexpWriter(OutputStream output) {
|
||||
_output = output;
|
||||
}
|
||||
|
||||
public static void writeSimpleString(OutputStream stream, byte[] buf) throws IOException {
|
||||
stream.write(Integer.toString(buf.length).getBytes());
|
||||
stream.write(':');
|
||||
stream.write(buf);
|
||||
}
|
||||
|
||||
public void write(Object x) throws IOException {
|
||||
if (x instanceof String) {
|
||||
writeSimpleString(_output, ((String) x).getBytes());
|
||||
return;
|
||||
}
|
||||
if (x instanceof byte[]) {
|
||||
writeSimpleString(_output, ((byte[]) x));
|
||||
return;
|
||||
}
|
||||
if (x instanceof SexpBytes) {
|
||||
((SexpBytes) x).writeTo(_output);
|
||||
return;
|
||||
}
|
||||
if (x instanceof List) {
|
||||
_output.write('(');
|
||||
for (Object v : ((List<Object>) x)) {
|
||||
write(v);
|
||||
}
|
||||
_output.write(')');
|
||||
return;
|
||||
}
|
||||
if (x == null) {
|
||||
_output.write("0:".getBytes());
|
||||
return;
|
||||
}
|
||||
throw new SexpSyntaxError("Unsupported sexp object type");
|
||||
}
|
||||
|
||||
public static void write(OutputStream output, Object x) throws IOException {
|
||||
new SexpWriter(output).write(x);
|
||||
}
|
||||
|
||||
public static String writeString(Object x) {
|
||||
try {
|
||||
ByteArrayOutputStream o = new ByteArrayOutputStream();
|
||||
write(o, x);
|
||||
return new String(o.toByteArray());
|
||||
} catch (IOException ioe) {
|
||||
return x.toString();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class Subscription {
|
||||
public ServerApi _api;
|
||||
public String _source;
|
||||
public Object _filter;
|
||||
public String _consumerName;
|
||||
public HalfQueue _consumer;
|
||||
public Object _subscriptionToken;
|
||||
|
||||
public Subscription(ServerApi api, String source, Object filter) throws InterruptedException, IOException {
|
||||
_api = api;
|
||||
_source = source;
|
||||
_filter = filter;
|
||||
_consumerName = UUID.randomUUID().toString();
|
||||
_consumer = new HalfQueue();
|
||||
_api._container.bind(_consumerName, _consumer);
|
||||
_subscriptionToken = _api.subscribe(source, filter, _consumerName);
|
||||
}
|
||||
|
||||
public BlockingQueue<Object> getQueue() {
|
||||
return _consumer.getQueue();
|
||||
}
|
||||
|
||||
public void unsubscribe() throws IOException {
|
||||
_api.unsubscribe(_source, _subscriptionToken);
|
||||
}
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class Test1 {
|
||||
public static void main(String[] args) {
|
||||
try {
|
||||
run(args[0]);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void run(String hostname) throws IOException, InterruptedException {
|
||||
NodeContainer nc = new NodeContainer();
|
||||
|
||||
System.out.println("Hostname: " + hostname);
|
||||
System.out.println("Container: " + nc.getName());
|
||||
|
||||
Relay r = new Relay(nc, hostname);
|
||||
ServerApi api = new ServerApi(nc, r.getRemoteName());
|
||||
|
||||
api.createQueue("q1");
|
||||
Subscription sub = api.subscribe("q1", null);
|
||||
long startTime = 0;
|
||||
int count = 0;
|
||||
while (true) {
|
||||
Object x = sub.getQueue().take();
|
||||
if (startTime == 0) {
|
||||
startTime = System.currentTimeMillis();
|
||||
}
|
||||
count++;
|
||||
if ((count % 100000) == 0) {
|
||||
long now = System.currentTimeMillis();
|
||||
double delta = (now - startTime) / 1000.0;
|
||||
System.out.println("Received "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,44 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class Test3 {
|
||||
public static void main(String[] args) {
|
||||
try {
|
||||
run(args[0]);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void run(String hostname) throws IOException, InterruptedException {
|
||||
NodeContainer nc = new NodeContainer();
|
||||
|
||||
System.out.println("Hostname: " + hostname);
|
||||
|
||||
System.out.println("Container: " + nc.getName());
|
||||
|
||||
Relay r = new Relay(nc, hostname);
|
||||
ServerApi api = new ServerApi(nc, r.getRemoteName());
|
||||
|
||||
api.createQueue("q1");
|
||||
long startTime = System.currentTimeMillis();
|
||||
int count = 0;
|
||||
for (int i = 0; i < 10000000; i++) {
|
||||
api.post("q1", null, Integer.toString(i), null);
|
||||
count++;
|
||||
if ((count % 100000) == 0) {
|
||||
api.flush();
|
||||
long now = System.currentTimeMillis();
|
||||
double delta = (now - startTime) / 1000.0;
|
||||
System.out.println("Sent "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,71 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TestPingPong {
|
||||
public static void main(final String[] args) {
|
||||
try {
|
||||
new Thread(new Runnable() { public void run() {
|
||||
try {
|
||||
run1(args[0]);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} }).start();
|
||||
run2(args[0]);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void run1(String hostname) throws IOException, InterruptedException {
|
||||
NodeContainer nc = new NodeContainer();
|
||||
|
||||
System.out.println("Hostname: " + hostname);
|
||||
System.out.println("Container: " + nc.getName());
|
||||
|
||||
Relay r = new Relay(nc, hostname);
|
||||
ServerApi api = new ServerApi(nc, r.getRemoteName());
|
||||
|
||||
api.createQueue("req");
|
||||
Subscription sub = api.subscribe("req", null);
|
||||
while (true) {
|
||||
Object x = sub.getQueue().take();
|
||||
//System.out.println("Message: " + x);
|
||||
api.post("rep", "reply", SexpList.with("ok").and(x), null);
|
||||
api.flush();
|
||||
}
|
||||
}
|
||||
|
||||
public static void run2(String hostname) throws IOException, InterruptedException {
|
||||
NodeContainer nc = new NodeContainer();
|
||||
|
||||
System.out.println("Hostname: " + hostname);
|
||||
System.out.println("Container: " + nc.getName());
|
||||
|
||||
Relay r = new Relay(nc, hostname);
|
||||
ServerApi api = new ServerApi(nc, r.getRemoteName());
|
||||
|
||||
api.createQueue("req");
|
||||
api.createQueue("rep");
|
||||
Subscription sub = api.subscribe("rep", null);
|
||||
long startTime = System.currentTimeMillis();
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
api.post("req", "request", Integer.toString(i), null);
|
||||
api.flush();
|
||||
sub.getQueue().take();
|
||||
int j = i + 1;
|
||||
if ((j % 100) == 0) {
|
||||
long now = System.currentTimeMillis();
|
||||
double delta = (now - startTime) / 1000.0;
|
||||
System.out.println("Message " + j + ": " + (j / delta) + " Hz");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,69 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TestScale {
|
||||
static AtomicLong counter = new AtomicLong();
|
||||
|
||||
public static void main(final String[] args) {
|
||||
try {
|
||||
final String hostname = args[0];
|
||||
int count = Integer.parseInt(args[1]);
|
||||
System.out.println("Hostname: " + hostname);
|
||||
for (int i = 0; i < count; i++) {
|
||||
new Thread(new Runnable() { public void run() {
|
||||
try {
|
||||
runConnection(hostname);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} }).start();
|
||||
Thread.sleep(100);
|
||||
}
|
||||
while (true) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
long startCount = counter.longValue();
|
||||
Thread.sleep(1000);
|
||||
long now = System.currentTimeMillis();
|
||||
long countNow = counter.longValue();
|
||||
report(startTime, startCount, now, countNow);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void report(long t0, long c0, long t1, long c1) {
|
||||
double dc = c1 - c0;
|
||||
double dt = (t1 - t0) / 1000.0;
|
||||
double rate = dc / dt;
|
||||
System.out.println(dc + " messages in " + dt + "s = " + rate + " Hz");
|
||||
}
|
||||
|
||||
public static void runConnection(String hostname) throws IOException, InterruptedException {
|
||||
NodeContainer nc = new NodeContainer();
|
||||
String qName = nc.getName() + "q";
|
||||
System.out.println("Queue: " + qName);
|
||||
|
||||
Relay r = new Relay(nc, hostname);
|
||||
ServerApi api = new ServerApi(nc, r.getRemoteName());
|
||||
|
||||
api.createQueue(qName);
|
||||
Subscription sub = api.subscribe(qName, null);
|
||||
while (true) {
|
||||
Object in = "a";
|
||||
api.post(qName, "", in, null);
|
||||
api.flush();
|
||||
Object out = sub.getQueue().take();
|
||||
assert in.equals(out);
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,82 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
|
||||
*/
|
||||
|
||||
package hop;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class TestSexpIO extends TestCase {
|
||||
public Object read(String s) throws IOException {
|
||||
return new SexpReader(new ByteArrayInputStream(s.getBytes())).read();
|
||||
}
|
||||
|
||||
public byte[] write(Object x) throws IOException {
|
||||
ByteArrayOutputStream o = new ByteArrayOutputStream();
|
||||
new SexpWriter(o).write(x);
|
||||
return o.toByteArray();
|
||||
}
|
||||
|
||||
public void assertBytesEqual(String expected, byte[] actual) {
|
||||
assertBytesEqual(expected.getBytes(), actual);
|
||||
}
|
||||
|
||||
public void assertBytesEqual(byte[] expected, byte[] actual) {
|
||||
assertEquals(expected.length, actual.length);
|
||||
for (int i = 0; i < expected.length; i++) {
|
||||
assertEquals(expected[i], actual[i]);
|
||||
}
|
||||
}
|
||||
|
||||
public void testEndOfStream() throws IOException {
|
||||
assertNull(read(""));
|
||||
}
|
||||
|
||||
public void testEmptyList() throws IOException {
|
||||
assertEquals(new ArrayList<Object>(), read("()"));
|
||||
}
|
||||
|
||||
public void testSimpleString() throws IOException {
|
||||
assertBytesEqual("hello", ((SexpBytes) read("5:hello")).getData());
|
||||
}
|
||||
|
||||
public void testDisplayHint() throws IOException {
|
||||
SexpDisplayHint v = (SexpDisplayHint) read("[1:h]1:b");
|
||||
assertBytesEqual("b", v.getData());
|
||||
assertBytesEqual("h", v.getHint());
|
||||
assertBytesEqual("[1:h]1:b", write(v));
|
||||
}
|
||||
|
||||
public void testSimpleList() throws IOException {
|
||||
List<Object> l = (List<Object>) read("(1:a1:b1:c)");
|
||||
assertEquals(3, l.size());
|
||||
assertBytesEqual("a", ((SexpBytes) l.get(0)).getData());
|
||||
assertBytesEqual("b", ((SexpBytes) l.get(1)).getData());
|
||||
assertBytesEqual("c", ((SexpBytes) l.get(2)).getData());
|
||||
}
|
||||
|
||||
public void testNestedList() throws IOException {
|
||||
List<Object> l = (List<Object>) read("(1:a(1:b1:c)())");
|
||||
assertEquals(3, l.size());
|
||||
assertBytesEqual("a", ((SexpBytes) l.get(0)).getData());
|
||||
List<Object> k = (List<Object>) l.get(1);
|
||||
assertEquals(2, k.size());
|
||||
assertBytesEqual("b", ((SexpBytes) k.get(0)).getData());
|
||||
assertBytesEqual("c", ((SexpBytes) k.get(1)).getData());
|
||||
assertEquals(new ArrayList<Object>(), l.get(2));
|
||||
assertBytesEqual("(1:b1:c)", write(k));
|
||||
assertBytesEqual("(1:a(1:b1:c)())", write(l));
|
||||
}
|
||||
|
||||
public void testNullWrite() throws IOException {
|
||||
assertBytesEqual("0:", write(null));
|
||||
}
|
||||
}
|
Binary file not shown.
Loading…
Reference in New Issue