Merge remote branch 'j/master'

This commit is contained in:
Tony Garnock-Jones 2012-05-10 16:55:36 -04:00
commit 3a87aa0d27
22 changed files with 1402 additions and 0 deletions

6
java/.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
out
build
*.iml
*.ipr
*.iws
.idea

24
java/build.xml Normal file
View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<project name="Hop" default="jar">
<path id="javac.classpath">
<fileset dir="lib">
<include name="**/*.jar"/>
</fileset>
</path>
<target name="build">
<mkdir dir="build/classes"/>
<javac destdir="build/classes" classpathref="javac.classpath" debug="true">
<src path="src"/>
</javac>
</target>
<target name="jar" depends="build">
<mkdir dir="build/lib"/>
<jar destfile="build/lib/hop.jar" basedir="build/classes" />
</target>
<target name="clean">
<delete dir="build"/>
</target>
</project>

BIN
java/lib/junit-4.8.2.jar Normal file

Binary file not shown.

View File

@ -0,0 +1,39 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
*/
public class HalfQueue implements Node {
public BlockingQueue<Object> _q;
public HalfQueue() {
_q = new LinkedBlockingQueue<Object>();
}
public void handle(Object message) {
_q.add(message);
}
public BlockingQueue<Object> getQueue() {
return _q;
}
}

View File

@ -0,0 +1,30 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
/**
*/
public class InvalidGreetingException extends IOException {
Object _greeting;
public InvalidGreetingException(Object greeting) {
_greeting = greeting;
}
}

24
java/src/hop/Node.java Normal file
View File

@ -0,0 +1,24 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
/**
*/
public interface Node {
void handle(Object message);
}

View File

@ -0,0 +1,115 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.Flushable;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.*;
/**
*/
public class NodeContainer implements Flushable {
public String _name;
public Map<String, WeakReference<Node>> _directory;
public NodeContainer() {
this(UUID.randomUUID().toString());
}
public NodeContainer(String name) {
_name = name;
_directory = new Hashtable<String, WeakReference<Node>>();
}
public String getName() {
return _name;
}
public synchronized boolean bind(String name, Node n) {
WeakReference<Node> ref = _directory.get(name);
if (ref != null && ref.get() != null) {
return false;
}
ref = new WeakReference<Node>(n);
_directory.put(name, ref);
return true;
}
public synchronized boolean unbind(String name) {
if (!_directory.containsKey(name))
return false;
_directory.remove(name);
return true;
}
public void flush() throws IOException {
ArrayList<Flushable> fs = new ArrayList<Flushable>();
synchronized (this) {
for (Map.Entry<String, WeakReference<Node>> e : _directory.entrySet()) {
Node n = e.getValue().get();
if (n instanceof Flushable) {
fs.add((Flushable) n);
}
}
}
for (Flushable f : fs) {
f.flush();
}
}
public void flush(String name) throws IOException {
Flushable f;
synchronized (this) {
WeakReference<Node> ref = _directory.get(name);
if (ref == null) return;
Node n = ref.get();
if (n == null) return;
if (!(n instanceof Flushable)) return;
f = ((Flushable) n);
}
f.flush();
}
public synchronized void unbindReferencesTo(Node n) {
for (Map.Entry<String, WeakReference<Node>> e : _directory.entrySet()) {
if (e.getValue().get() == n) {
_directory.remove(e.getKey());
}
}
}
public synchronized Node lookup(String name) {
WeakReference<Node> r = _directory.get(name);
return (r == null) ? null : r.get();
}
public boolean post(String sink, Object name, Object message, Object token) {
return send(sink, SexpMessage.post(name, message, token));
}
public boolean send(String name, Object message) {
Node n = lookup(name);
if (n == null) {
System.err.println("Warning: sending to nonexistent node " + name + "; message " + message);
return false;
}
n.handle(message);
return true;
}
}

148
java/src/hop/Relay.java Normal file
View File

@ -0,0 +1,148 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.*;
import java.net.Socket;
/**
*/
public class Relay implements Runnable, Node, Flushable {
NodeContainer _container;
String _remoteName;
Socket _sock;
String _hostname;
int _port;
SexpReader _r;
OutputStream _output;
SexpWriter _w;
public Relay(NodeContainer container, String hostname) throws IOException, InterruptedException {
this(container, hostname, 5671);
}
public Relay(NodeContainer container, String hostname, int port) throws IOException, InterruptedException {
_container = container;
_remoteName = null;
_hostname = hostname;
_port = port;
_connect();
}
public String getRemoteName() {
return _remoteName;
}
public void _connect() throws IOException, InterruptedException {
_sock = new Socket(_hostname, _port);
_sock.setTcpNoDelay(true);
_r = new SexpReader(new BufferedInputStream(_sock.getInputStream()));
_output = new BufferedOutputStream(_sock.getOutputStream());
_w = new SexpWriter(_output);
_login();
new Thread(this).start();
synchronized (this) {
while (_remoteName == null) {
this.wait();
}
}
}
public void _login() throws IOException {
SexpList greeting = _r.readList();
if (!greeting.getBytes(0).getDataString().equals("hop")) {
throw new InvalidGreetingException(greeting);
}
_w.write(SexpMessage.subscribe(_container.getName(), null, null, null, null));
}
public void handle(Object message) {
try {
_w.write(message);
} catch (IOException ioe) {
ioe.printStackTrace();
System.err.print("Message to be written was: ");
try {
SexpWriter.write(System.err, message);
} catch (IOException ioe2) {
ioe2.printStackTrace();
}
System.err.println();
}
}
public void flush() throws IOException {
_output.flush();
}
public void run() {
SexpList m = null;
try {
while (true) {
m = _r.readList();
if (m == null) {
break;
}
//System.err.println("Received: " + m);
String selector = m.getBytes(0).getDataString();
if (selector.equals("post") && m.size() == 4) {
_container.send(m.getBytes(1).getDataString(), m.get(2));
} else if (selector.equals("subscribe") && m.size() == 6) {
if (_remoteName != null) {
System.err.println("Double bind attempted");
} else {
_remoteName = m.getBytes(1).getDataString();
synchronized (this) {
this.notifyAll();
}
if (_container.bind(_remoteName, this)) {
String replySink = m.getBytes(4).getDataString();
if (replySink.length() > 0) {
_container.post(replySink, m.get(5), SexpMessage.subscribe_ok(_remoteName), null);
}
} else {
System.err.println("Bind failed: " + _remoteName);
}
}
} else if (selector.equals("unsubscribe") && m.size() == 2) {
if (!m.getBytes(1).getDataString().equals(_remoteName)) {
System.err.println("Unknown unbind attempted");
} else {
if (!_container.unbind(m.getBytes(1).getDataString())) {
System.err.println("Unbind failed: " + m.get(1));
}
}
} else {
System.err.print("Unknown message: ");
SexpWriter.write(System.err, m);
System.err.println();
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
System.err.print("Most recent received message: ");
try {
SexpWriter.write(System.err, m);
} catch (IOException ioe2) {
ioe2.printStackTrace();
}
System.err.println();
}
}
}

101
java/src/hop/ServerApi.java Normal file
View File

@ -0,0 +1,101 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.Flushable;
import java.io.IOException;
import java.util.UUID;
/**
*/
public class ServerApi implements Flushable {
public NodeContainer _container;
public String _serverName;
public String _kName;
public HalfQueue _k;
public ServerApi(NodeContainer container, String serverName) {
_container = container;
_serverName = serverName;
_kName = UUID.randomUUID().toString();
_k = new HalfQueue();
_container.bind(_kName, _k);
}
public SexpList _nextReply() throws InterruptedException, SexpSyntaxError {
Object x = _k.getQueue().take();
if (x instanceof SexpList) return (SexpList) x;
throw new SexpSyntaxError("Unexpected non-list");
}
public void post(String sink, Object name, Object message, Object token) {
_container.post(_serverName, sink, SexpMessage.post(name, message, token), null);
}
public void send(String sink, Object message) {
_container.post(_serverName, sink, message, null);
}
public void flush() throws IOException {
_container.flush(_serverName);
}
public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException {
send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName));
flush();
SexpList reply = _nextReply();
assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok);
return reply.get(1);
}
public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, IOException {
return subscribe(source, filter, _container.getName(), name);
}
public Subscription subscribe(String source, Object filter) throws InterruptedException, IOException {
return new Subscription(this, source, filter);
}
public void unsubscribe(String source, Object token) throws IOException {
send(source, SexpMessage.unsubscribe(token));
flush();
/* TODO: optional synchronous reply? */
}
public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException {
send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName));
flush();
SexpList reply = _nextReply();
SexpBytes selector = reply.getBytes(0);
if (selector.equals(SexpMessage._create_ok)) return null;
assert selector.equals(SexpMessage._create_failed);
return reply.get(1);
}
public Object createQueue(String name) throws InterruptedException, IOException {
return create("queue", SexpList.with(name));
}
public Object createFanout(String name) throws InterruptedException, IOException {
return create("fanout", SexpList.with(name));
}
public Object createDirect(String name) throws InterruptedException, IOException {
return create("direct", SexpList.with(name));
}
}

View File

@ -0,0 +1,57 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
/**
*/
public class SexpBytes {
public byte[] _bytes;
public SexpBytes(byte[] bytes) {
_bytes = bytes;
}
public byte[] getData() {
return _bytes;
}
public String getDataString() {
return new String(getData());
}
public void writeTo(OutputStream stream) throws IOException {
SexpWriter.writeSimpleString(stream, _bytes);
}
public String toString() {
return SexpWriter.writeString(this);
}
public boolean equals(Object other) {
return (other instanceof SexpBytes) &&
Arrays.equals(_bytes, ((SexpBytes) other).getData());
}
public int hashCode() {
return Arrays.hashCode(_bytes);
}
}

View File

@ -0,0 +1,43 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
import java.io.OutputStream;
/**
*/
public class SexpDisplayHint extends SexpBytes {
public byte[] _hint;
public SexpDisplayHint(byte[] hint, byte[] body) {
super(body);
_hint = hint;
}
public byte[] getHint() {
return _hint;
}
public void writeTo(OutputStream stream) throws IOException {
stream.write('[');
SexpWriter.writeSimpleString(stream, _hint);
stream.write(']');
super.writeTo(stream);
}
}

View File

@ -0,0 +1,53 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.util.ArrayList;
/**
*/
public class SexpList extends ArrayList<Object> {
public SexpBytes getBytes(int index) throws SexpSyntaxError {
Object x = get(index);
if (x != null && !(x instanceof SexpBytes)) {
throw new SexpSyntaxError("Unexpected non-bytes");
}
return (SexpBytes) get(index);
}
public SexpList getList(int index) throws SexpSyntaxError {
Object x = get(index);
if (x != null && !(x instanceof SexpList)) {
throw new SexpSyntaxError("Unexpected non-list");
}
return (SexpList) get(index);
}
public static SexpList empty() {
return new SexpList();
}
public static SexpList with(Object x) {
return empty().and(x);
}
public SexpList and(Object x) {
this.add(x);
return this;
}
}

View File

@ -0,0 +1,74 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
/**
*/
public class SexpMessage {
public static SexpBytes _post = new SexpBytes("post".getBytes());
public static SexpBytes _subscribe = new SexpBytes("subscribe".getBytes());
public static SexpBytes _unsubscribe = new SexpBytes("unsubscribe".getBytes());
public static SexpBytes _subscribe_ok = new SexpBytes("subscribe-ok".getBytes());
public static SexpBytes _create = new SexpBytes("create".getBytes());
public static SexpBytes _create_ok = new SexpBytes("create-ok".getBytes());
public static SexpBytes _create_failed = new SexpBytes("create-failed".getBytes());
public static SexpList post(Object name, Object message, Object token) {
SexpList m = new SexpList();
m.add(_post);
m.add(name);
m.add(message);
m.add(token);
return m;
}
public static SexpList subscribe(Object filter, String sink, Object name, String replySink, Object replyName) {
SexpList m = new SexpList();
m.add(_subscribe);
m.add(filter);
m.add(sink);
m.add(name);
m.add(replySink);
m.add(replyName);
return m;
}
public static SexpList subscribe_ok(Object token) {
SexpList m = new SexpList();
m.add(_subscribe_ok);
m.add(token);
return m;
}
public static SexpList unsubscribe(Object token) {
SexpList m = new SexpList();
m.add(_unsubscribe);
m.add(token);
return m;
}
public static SexpList create(String nodeClassName, Object arg, String replySink, Object replyName) {
SexpList m = new SexpList();
m.add(_create);
m.add(nodeClassName);
m.add(arg);
m.add(replySink);
m.add(replyName);
return m;
}
}

View File

@ -0,0 +1,150 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
import java.io.InputStream;
public class SexpReader {
public InputStream _input;
public SexpReader(InputStream input) {
_input = input;
}
/**
* Reads a sexp length-prefix from _input.
* @return The read length, or -1 if the end of stream is reached
* @throws IOException
* @throws SexpSyntaxError
*/
public int _readLength(int lengthSoFar) throws IOException {
int length = lengthSoFar;
while (true) {
int c = _input.read();
if (c == -1) return -1;
if (c == ':') {
return length;
}
if (!Character.isDigit(c)) {
throw new SexpSyntaxError("Invalid length prefix");
}
length = length * 10 + (c - '0');
}
}
/**
* Reads a simple length-prefixed string from _input, given either zero or the value
* of the first digit of the length-prefix being read.
* @param lengthSoFar either zero or the first digit of the length prefix to use
* @return the read string
* @throws IOException
* @throws SexpSyntaxError
*/
public byte[] _readSimpleString(int lengthSoFar) throws IOException {
int length = _readLength(lengthSoFar);
if (length == -1) return null;
byte[] buf = new byte[length];
int offset = 0;
while (length > 0) {
int count = _input.read(buf, offset, length);
if (count == -1) {
throw new SexpSyntaxError("End-of-stream in the middle of a simple string");
}
offset += count;
length -= count;
}
return buf;
}
public byte[] readSimpleString() throws IOException {
return _readSimpleString(0);
}
public SexpList _readList() throws IOException {
SexpList list = new SexpList();
while (true) {
int c = _input.read();
switch (c) {
case -1:
throw new SexpSyntaxError("Unclosed list");
case ')':
return list;
default:
list.add(_read(c));
break;
}
}
}
public Object _read(int c) throws IOException {
while (true) {
switch (c) {
case -1:
return null;
case '(':
return _readList();
case ')':
throw new SexpSyntaxError("Unexpected close-paren");
case '[':
byte[] hint = readSimpleString();
switch (_input.read()) {
case -1:
throw new SexpSyntaxError("End-of-stream between display hint and body");
case ']':
break;
default:
throw new SexpSyntaxError("Unexpected character after display hint");
}
byte[] body = readSimpleString();
return new SexpDisplayHint(hint, body);
default:
if (Character.isDigit(c)) {
return new SexpBytes(_readSimpleString(c - '0'));
} else if (Character.isWhitespace(c)) {
// Skip harmless (?) whitespace
c = _input.read();
continue;
} else {
throw new SexpSyntaxError("Unexpected character: " + c);
}
}
}
}
public Object read() throws IOException {
return _read(_input.read());
}
public SexpList readList() throws IOException {
Object x = read();
if (x != null && !(x instanceof SexpList)) {
throw new SexpSyntaxError("Unexpected non-list");
}
return (SexpList) x;
}
public SexpBytes readBytes() throws IOException {
Object x = read();
if (x != null && !(x instanceof SexpBytes)) {
throw new SexpSyntaxError("Unexpected non-bytes");
}
return (SexpBytes) x;
}
}

View File

@ -0,0 +1,29 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
/**
* Reports on a syntax problem reading an S-expression.
*/
public class SexpSyntaxError extends IOException {
public SexpSyntaxError(String s) {
super(s);
}
}

View File

@ -0,0 +1,81 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
/**
*/
public class SexpWriter {
public OutputStream _output;
public SexpWriter(OutputStream output) {
_output = output;
}
public static void writeSimpleString(OutputStream stream, byte[] buf) throws IOException {
stream.write(Integer.toString(buf.length).getBytes());
stream.write(':');
stream.write(buf);
}
public void write(Object x) throws IOException {
if (x instanceof String) {
writeSimpleString(_output, ((String) x).getBytes());
return;
}
if (x instanceof byte[]) {
writeSimpleString(_output, ((byte[]) x));
return;
}
if (x instanceof SexpBytes) {
((SexpBytes) x).writeTo(_output);
return;
}
if (x instanceof List) {
_output.write('(');
for (Object v : ((List<Object>) x)) {
write(v);
}
_output.write(')');
return;
}
if (x == null) {
_output.write("0:".getBytes());
return;
}
throw new SexpSyntaxError("Unsupported sexp object type");
}
public static void write(OutputStream output, Object x) throws IOException {
new SexpWriter(output).write(x);
}
public static String writeString(Object x) {
try {
ByteArrayOutputStream o = new ByteArrayOutputStream();
write(o, x);
return new String(o.toByteArray());
} catch (IOException ioe) {
return x.toString();
}
}
}

View File

@ -0,0 +1,51 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
/**
*/
public class Subscription {
public ServerApi _api;
public String _source;
public Object _filter;
public String _consumerName;
public HalfQueue _consumer;
public Object _subscriptionToken;
public Subscription(ServerApi api, String source, Object filter) throws InterruptedException, IOException {
_api = api;
_source = source;
_filter = filter;
_consumerName = UUID.randomUUID().toString();
_consumer = new HalfQueue();
_api._container.bind(_consumerName, _consumer);
_subscriptionToken = _api.subscribe(source, filter, _consumerName);
}
public BlockingQueue<Object> getQueue() {
return _consumer.getQueue();
}
public void unsubscribe() throws IOException {
_api.unsubscribe(_source, _subscriptionToken);
}
}

59
java/src/hop/Test1.java Normal file
View File

@ -0,0 +1,59 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
/**
*/
public class Test1 {
public static void main(String[] args) {
try {
run(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void run(String hostname) throws IOException, InterruptedException {
NodeContainer nc = new NodeContainer();
System.out.println("Hostname: " + hostname);
System.out.println("Container: " + nc.getName());
Relay r = new Relay(nc, hostname);
ServerApi api = new ServerApi(nc, r.getRemoteName());
api.createQueue("q1");
Subscription sub = api.subscribe("q1", null);
long startTime = 0;
int count = 0;
while (true) {
Object x = sub.getQueue().take();
if (startTime == 0) {
startTime = System.currentTimeMillis();
}
count++;
if ((count % 100000) == 0) {
long now = System.currentTimeMillis();
double delta = (now - startTime) / 1000.0;
System.out.println("Received "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz");
}
}
}
}

57
java/src/hop/Test3.java Normal file
View File

@ -0,0 +1,57 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
/**
*/
public class Test3 {
public static void main(String[] args) {
try {
run(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void run(String hostname) throws IOException, InterruptedException {
NodeContainer nc = new NodeContainer();
System.out.println("Hostname: " + hostname);
System.out.println("Container: " + nc.getName());
Relay r = new Relay(nc, hostname);
ServerApi api = new ServerApi(nc, r.getRemoteName());
api.createQueue("q1");
long startTime = System.currentTimeMillis();
int count = 0;
for (int i = 0; i < 10000000; i++) {
api.post("q1", null, Integer.toString(i), null);
count++;
if ((count % 100000) == 0) {
api.flush();
long now = System.currentTimeMillis();
double delta = (now - startTime) / 1000.0;
System.out.println("Sent "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz");
}
}
}
}

View File

@ -0,0 +1,84 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
/**
*/
public class TestPingPong {
public static void main(final String[] args) {
try {
new Thread(new Runnable() { public void run() {
try {
run1(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
} }).start();
run2(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void run1(String hostname) throws IOException, InterruptedException {
NodeContainer nc = new NodeContainer();
System.out.println("Hostname: " + hostname);
System.out.println("Container: " + nc.getName());
Relay r = new Relay(nc, hostname);
ServerApi api = new ServerApi(nc, r.getRemoteName());
api.createQueue("req");
Subscription sub = api.subscribe("req", null);
while (true) {
Object x = sub.getQueue().take();
//System.out.println("Message: " + x);
api.post("rep", "reply", SexpList.with("ok").and(x), null);
api.flush();
}
}
public static void run2(String hostname) throws IOException, InterruptedException {
NodeContainer nc = new NodeContainer();
System.out.println("Hostname: " + hostname);
System.out.println("Container: " + nc.getName());
Relay r = new Relay(nc, hostname);
ServerApi api = new ServerApi(nc, r.getRemoteName());
api.createQueue("req");
api.createQueue("rep");
Subscription sub = api.subscribe("rep", null);
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
api.post("req", "request", Integer.toString(i), null);
api.flush();
sub.getQueue().take();
int j = i + 1;
if ((j % 100) == 0) {
long now = System.currentTimeMillis();
double delta = (now - startTime) / 1000.0;
System.out.println("Message " + j + ": " + (j / delta) + " Hz");
}
}
}
}

View File

@ -0,0 +1,82 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class TestScale {
static AtomicLong counter = new AtomicLong();
public static void main(final String[] args) {
try {
final String hostname = args[0];
int count = Integer.parseInt(args[1]);
System.out.println("Hostname: " + hostname);
for (int i = 0; i < count; i++) {
new Thread(new Runnable() { public void run() {
try {
runConnection(hostname);
} catch (Exception e) {
e.printStackTrace();
}
} }).start();
Thread.sleep(100);
}
while (true) {
long startTime = System.currentTimeMillis();
long startCount = counter.longValue();
Thread.sleep(1000);
long now = System.currentTimeMillis();
long countNow = counter.longValue();
report(startTime, startCount, now, countNow);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void report(long t0, long c0, long t1, long c1) {
double dc = c1 - c0;
double dt = (t1 - t0) / 1000.0;
double rate = dc / dt;
System.out.println(dc + " messages in " + dt + "s = " + rate + " Hz");
}
public static void runConnection(String hostname) throws IOException, InterruptedException {
NodeContainer nc = new NodeContainer();
String qName = nc.getName() + "q";
System.out.println("Queue: " + qName);
Relay r = new Relay(nc, hostname);
ServerApi api = new ServerApi(nc, r.getRemoteName());
api.createQueue(qName);
Subscription sub = api.subscribe(qName, null);
while (true) {
Object in = "a";
api.post(qName, "", in, null);
api.flush();
Object out = sub.getQueue().take();
assert in.equals(out);
counter.incrementAndGet();
}
}
}

View File

@ -0,0 +1,95 @@
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
//
// This file is part of Hop.
//
// Hop is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Hop is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
//
package hop;
import junit.framework.TestCase;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
*/
public class TestSexpIO extends TestCase {
public Object read(String s) throws IOException {
return new SexpReader(new ByteArrayInputStream(s.getBytes())).read();
}
public byte[] write(Object x) throws IOException {
ByteArrayOutputStream o = new ByteArrayOutputStream();
new SexpWriter(o).write(x);
return o.toByteArray();
}
public void assertBytesEqual(String expected, byte[] actual) {
assertBytesEqual(expected.getBytes(), actual);
}
public void assertBytesEqual(byte[] expected, byte[] actual) {
assertEquals(expected.length, actual.length);
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], actual[i]);
}
}
public void testEndOfStream() throws IOException {
assertNull(read(""));
}
public void testEmptyList() throws IOException {
assertEquals(new ArrayList<Object>(), read("()"));
}
public void testSimpleString() throws IOException {
assertBytesEqual("hello", ((SexpBytes) read("5:hello")).getData());
}
public void testDisplayHint() throws IOException {
SexpDisplayHint v = (SexpDisplayHint) read("[1:h]1:b");
assertBytesEqual("b", v.getData());
assertBytesEqual("h", v.getHint());
assertBytesEqual("[1:h]1:b", write(v));
}
public void testSimpleList() throws IOException {
List<Object> l = (List<Object>) read("(1:a1:b1:c)");
assertEquals(3, l.size());
assertBytesEqual("a", ((SexpBytes) l.get(0)).getData());
assertBytesEqual("b", ((SexpBytes) l.get(1)).getData());
assertBytesEqual("c", ((SexpBytes) l.get(2)).getData());
}
public void testNestedList() throws IOException {
List<Object> l = (List<Object>) read("(1:a(1:b1:c)())");
assertEquals(3, l.size());
assertBytesEqual("a", ((SexpBytes) l.get(0)).getData());
List<Object> k = (List<Object>) l.get(1);
assertEquals(2, k.size());
assertBytesEqual("b", ((SexpBytes) k.get(0)).getData());
assertBytesEqual("c", ((SexpBytes) k.get(1)).getData());
assertEquals(new ArrayList<Object>(), l.get(2));
assertBytesEqual("(1:b1:c)", write(k));
assertBytesEqual("(1:a(1:b1:c)())", write(l));
}
public void testNullWrite() throws IOException {
assertBytesEqual("0:", write(null));
}
}