Buffering; flushing; throughput test

This commit is contained in:
Tony Garnock-Jones 2011-01-07 15:05:56 -05:00
parent 9e5ac54d82
commit 8b3f23e600
6 changed files with 114 additions and 22 deletions

View File

@ -4,14 +4,14 @@
package hop;
import java.io.Flushable;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Hashtable;
import java.util.Map;
import java.util.UUID;
import java.util.*;
/**
*/
public class NodeContainer {
public class NodeContainer implements Flushable {
public String _name;
public Map<String, WeakReference<Node>> _directory;
@ -29,9 +29,12 @@ public class NodeContainer {
}
public synchronized boolean bind(String name, Node n) {
if (_directory.containsKey(name))
WeakReference<Node> ref = _directory.get(name);
if (ref != null && ref.get() != null) {
return false;
_directory.put(name, new WeakReference<Node>(n));
}
ref = new WeakReference<Node>(n);
_directory.put(name, ref);
return true;
}
@ -42,6 +45,34 @@ public class NodeContainer {
return true;
}
public void flush() throws IOException {
ArrayList<Flushable> fs = new ArrayList<Flushable>();
synchronized (this) {
for (Map.Entry<String, WeakReference<Node>> e : _directory.entrySet()) {
Node n = e.getValue().get();
if (n instanceof Flushable) {
fs.add((Flushable) n);
}
}
}
for (Flushable f : fs) {
f.flush();
}
}
public void flush(String name) throws IOException {
Flushable f;
synchronized (this) {
WeakReference<Node> ref = _directory.get(name);
if (ref == null) return;
Node n = ref.get();
if (n == null) return;
if (!(n instanceof Flushable)) return;
f = ((Flushable) n);
}
f.flush();
}
public synchronized void unbindReferencesTo(Node n) {
for (Map.Entry<String, WeakReference<Node>> e : _directory.entrySet()) {
if (e.getValue().get() == n) {

View File

@ -4,13 +4,12 @@
package hop;
import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.net.Socket;
/**
*/
public class Relay implements Runnable, Node {
public class Relay implements Runnable, Node, Flushable {
NodeContainer _container;
String _remoteName;
Socket _sock;
@ -39,8 +38,8 @@ public class Relay implements Runnable, Node {
public void _connect() throws IOException, InterruptedException {
_sock = new Socket(_hostname, _port);
_sock.setTcpNoDelay(true);
_r = new SexpReader(_sock.getInputStream());
_output = _sock.getOutputStream();
_r = new SexpReader(new BufferedInputStream(_sock.getInputStream()));
_output = new BufferedOutputStream(_sock.getOutputStream());
_w = new SexpWriter(_output);
_login();
new Thread(this).start();
@ -75,6 +74,10 @@ public class Relay implements Runnable, Node {
}
}
public void flush() throws IOException {
_output.flush();
}
public void run() {
SexpList m = null;
try {

View File

@ -4,6 +4,7 @@
package hop;
import java.io.IOException;
import java.util.UUID;
/**
@ -36,28 +37,31 @@ public class ServerApi {
_container.post(_serverName, sink, message, null);
}
public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, SexpSyntaxError {
public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException {
send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName));
_container.flush(_serverName);
SexpList reply = _nextReply();
assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok);
return reply.get(1);
}
public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, SexpSyntaxError {
public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, IOException {
return subscribe(source, filter, _container.getName(), name);
}
public Subscription subscribe(String source, Object filter) throws InterruptedException, SexpSyntaxError {
public Subscription subscribe(String source, Object filter) throws InterruptedException, IOException {
return new Subscription(this, source, filter);
}
public void unsubscribe(String source, Object token) {
public void unsubscribe(String source, Object token) throws IOException {
send(source, SexpMessage.unsubscribe(token));
_container.flush(_serverName);
/* TODO: optional synchronous reply? */
}
public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, SexpSyntaxError {
public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException {
send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName));
_container.flush(_serverName);
SexpList reply = _nextReply();
SexpBytes selector = reply.getBytes(0);
if (selector.equals(SexpMessage._create_ok)) return null;
@ -65,15 +69,15 @@ public class ServerApi {
return reply.get(1);
}
public Object createQueue(String name) throws InterruptedException, SexpSyntaxError {
public Object createQueue(String name) throws InterruptedException, IOException {
return create("queue", SexpList.with(name));
}
public Object createFanout(String name) throws InterruptedException, SexpSyntaxError {
public Object createFanout(String name) throws InterruptedException, IOException {
return create("fanout", SexpList.with(name));
}
public Object createDirect(String name) throws InterruptedException, SexpSyntaxError {
public Object createDirect(String name) throws InterruptedException, IOException {
return create("direct", SexpList.with(name));
}
}

View File

@ -4,6 +4,7 @@
package hop;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@ -17,7 +18,7 @@ public class Subscription {
public HalfQueue _consumer;
public Object _subscriptionToken;
public Subscription(ServerApi api, String source, Object filter) throws InterruptedException, SexpSyntaxError {
public Subscription(ServerApi api, String source, Object filter) throws InterruptedException, IOException {
_api = api;
_source = source;
_filter = filter;
@ -31,7 +32,7 @@ public class Subscription {
return _consumer.getQueue();
}
public void unsubscribe() {
public void unsubscribe() throws IOException {
_api.unsubscribe(_source, _subscriptionToken);
}
}

View File

@ -28,9 +28,19 @@ public class Test1 {
api.createQueue("q1");
Subscription sub = api.subscribe("q1", null);
long startTime = 0;
int count = 0;
while (true) {
Object x = sub.getQueue().take();
System.out.println("Message: " + x);
if (startTime == 0) {
startTime = System.currentTimeMillis();
}
count++;
if ((count % 100000) == 0) {
long now = System.currentTimeMillis();
double delta = (now - startTime) / 1000.0;
System.out.println("Received "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz");
}
}
}
}

43
java/hop/Test3.java Normal file
View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.IOException;
/**
*/
public class Test3 {
public static void main(String[] args) {
try {
run(args[0]);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void run(String hostname) throws IOException, InterruptedException {
NodeContainer nc = new NodeContainer();
System.out.println("Hostname: " + hostname);
System.out.println("Container: " + nc.getName());
Relay r = new Relay(nc, hostname);
ServerApi api = new ServerApi(nc, r.getRemoteName());
api.createQueue("q1");
long startTime = System.currentTimeMillis();
int count = 0;
for (int i = 0; i < 10000000; i++) {
api.post("q1", null, Integer.toString(i), null);
count++;
if ((count % 100000) == 0) {
r.flush();
long now = System.currentTimeMillis();
double delta = (now - startTime) / 1000.0;
System.out.println("Sent "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz");
}
}
}
}