From 8b3f23e60000fefe697255a9adee1d9c13211eaf Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Fri, 7 Jan 2011 15:05:56 -0500 Subject: [PATCH] Buffering; flushing; throughput test --- java/hop/NodeContainer.java | 43 +++++++++++++++++++++++++++++++------ java/hop/Relay.java | 13 ++++++----- java/hop/ServerApi.java | 20 ++++++++++------- java/hop/Subscription.java | 5 +++-- java/hop/Test1.java | 12 ++++++++++- java/hop/Test3.java | 43 +++++++++++++++++++++++++++++++++++++ 6 files changed, 114 insertions(+), 22 deletions(-) create mode 100644 java/hop/Test3.java diff --git a/java/hop/NodeContainer.java b/java/hop/NodeContainer.java index 3853650..789201f 100644 --- a/java/hop/NodeContainer.java +++ b/java/hop/NodeContainer.java @@ -4,14 +4,14 @@ package hop; +import java.io.Flushable; +import java.io.IOException; import java.lang.ref.WeakReference; -import java.util.Hashtable; -import java.util.Map; -import java.util.UUID; +import java.util.*; /** */ -public class NodeContainer { +public class NodeContainer implements Flushable { public String _name; public Map> _directory; @@ -29,9 +29,12 @@ public class NodeContainer { } public synchronized boolean bind(String name, Node n) { - if (_directory.containsKey(name)) + WeakReference ref = _directory.get(name); + if (ref != null && ref.get() != null) { return false; - _directory.put(name, new WeakReference(n)); + } + ref = new WeakReference(n); + _directory.put(name, ref); return true; } @@ -42,6 +45,34 @@ public class NodeContainer { return true; } + public void flush() throws IOException { + ArrayList fs = new ArrayList(); + synchronized (this) { + for (Map.Entry> e : _directory.entrySet()) { + Node n = e.getValue().get(); + if (n instanceof Flushable) { + fs.add((Flushable) n); + } + } + } + for (Flushable f : fs) { + f.flush(); + } + } + + public void flush(String name) throws IOException { + Flushable f; + synchronized (this) { + WeakReference ref = _directory.get(name); + if (ref == null) return; + Node n = ref.get(); + if (n == null) return; + if (!(n instanceof Flushable)) return; + f = ((Flushable) n); + } + f.flush(); + } + public synchronized void unbindReferencesTo(Node n) { for (Map.Entry> e : _directory.entrySet()) { if (e.getValue().get() == n) { diff --git a/java/hop/Relay.java b/java/hop/Relay.java index e3b8e8d..698be96 100644 --- a/java/hop/Relay.java +++ b/java/hop/Relay.java @@ -4,13 +4,12 @@ package hop; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.net.Socket; /** */ -public class Relay implements Runnable, Node { +public class Relay implements Runnable, Node, Flushable { NodeContainer _container; String _remoteName; Socket _sock; @@ -39,8 +38,8 @@ public class Relay implements Runnable, Node { public void _connect() throws IOException, InterruptedException { _sock = new Socket(_hostname, _port); _sock.setTcpNoDelay(true); - _r = new SexpReader(_sock.getInputStream()); - _output = _sock.getOutputStream(); + _r = new SexpReader(new BufferedInputStream(_sock.getInputStream())); + _output = new BufferedOutputStream(_sock.getOutputStream()); _w = new SexpWriter(_output); _login(); new Thread(this).start(); @@ -75,6 +74,10 @@ public class Relay implements Runnable, Node { } } + public void flush() throws IOException { + _output.flush(); + } + public void run() { SexpList m = null; try { diff --git a/java/hop/ServerApi.java b/java/hop/ServerApi.java index f0205d4..2631af6 100644 --- a/java/hop/ServerApi.java +++ b/java/hop/ServerApi.java @@ -4,6 +4,7 @@ package hop; +import java.io.IOException; import java.util.UUID; /** @@ -36,28 +37,31 @@ public class ServerApi { _container.post(_serverName, sink, message, null); } - public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, SexpSyntaxError { + public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException { send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName)); + _container.flush(_serverName); SexpList reply = _nextReply(); assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok); return reply.get(1); } - public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, SexpSyntaxError { + public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, IOException { return subscribe(source, filter, _container.getName(), name); } - public Subscription subscribe(String source, Object filter) throws InterruptedException, SexpSyntaxError { + public Subscription subscribe(String source, Object filter) throws InterruptedException, IOException { return new Subscription(this, source, filter); } - public void unsubscribe(String source, Object token) { + public void unsubscribe(String source, Object token) throws IOException { send(source, SexpMessage.unsubscribe(token)); + _container.flush(_serverName); /* TODO: optional synchronous reply? */ } - public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, SexpSyntaxError { + public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException { send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName)); + _container.flush(_serverName); SexpList reply = _nextReply(); SexpBytes selector = reply.getBytes(0); if (selector.equals(SexpMessage._create_ok)) return null; @@ -65,15 +69,15 @@ public class ServerApi { return reply.get(1); } - public Object createQueue(String name) throws InterruptedException, SexpSyntaxError { + public Object createQueue(String name) throws InterruptedException, IOException { return create("queue", SexpList.with(name)); } - public Object createFanout(String name) throws InterruptedException, SexpSyntaxError { + public Object createFanout(String name) throws InterruptedException, IOException { return create("fanout", SexpList.with(name)); } - public Object createDirect(String name) throws InterruptedException, SexpSyntaxError { + public Object createDirect(String name) throws InterruptedException, IOException { return create("direct", SexpList.with(name)); } } diff --git a/java/hop/Subscription.java b/java/hop/Subscription.java index 8f13f5f..0f09010 100644 --- a/java/hop/Subscription.java +++ b/java/hop/Subscription.java @@ -4,6 +4,7 @@ package hop; +import java.io.IOException; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -17,7 +18,7 @@ public class Subscription { public HalfQueue _consumer; public Object _subscriptionToken; - public Subscription(ServerApi api, String source, Object filter) throws InterruptedException, SexpSyntaxError { + public Subscription(ServerApi api, String source, Object filter) throws InterruptedException, IOException { _api = api; _source = source; _filter = filter; @@ -31,7 +32,7 @@ public class Subscription { return _consumer.getQueue(); } - public void unsubscribe() { + public void unsubscribe() throws IOException { _api.unsubscribe(_source, _subscriptionToken); } } diff --git a/java/hop/Test1.java b/java/hop/Test1.java index 9bb9f08..45e3332 100644 --- a/java/hop/Test1.java +++ b/java/hop/Test1.java @@ -28,9 +28,19 @@ public class Test1 { api.createQueue("q1"); Subscription sub = api.subscribe("q1", null); + long startTime = 0; + int count = 0; while (true) { Object x = sub.getQueue().take(); - System.out.println("Message: " + x); + if (startTime == 0) { + startTime = System.currentTimeMillis(); + } + count++; + if ((count % 100000) == 0) { + long now = System.currentTimeMillis(); + double delta = (now - startTime) / 1000.0; + System.out.println("Received "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz"); + } } } } diff --git a/java/hop/Test3.java b/java/hop/Test3.java new file mode 100644 index 0000000..f77749b --- /dev/null +++ b/java/hop/Test3.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2011 Tony Garnock-Jones. All rights reserved. + */ + +package hop; + +import java.io.IOException; + +/** + */ +public class Test3 { + public static void main(String[] args) { + try { + run(args[0]); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void run(String hostname) throws IOException, InterruptedException { + NodeContainer nc = new NodeContainer(); + + System.out.println("Hostname: " + hostname); + System.out.println("Container: " + nc.getName()); + + Relay r = new Relay(nc, hostname); + ServerApi api = new ServerApi(nc, r.getRemoteName()); + + api.createQueue("q1"); + long startTime = System.currentTimeMillis(); + int count = 0; + for (int i = 0; i < 10000000; i++) { + api.post("q1", null, Integer.toString(i), null); + count++; + if ((count % 100000) == 0) { + r.flush(); + long now = System.currentTimeMillis(); + double delta = (now - startTime) / 1000.0; + System.out.println("Sent "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz"); + } + } + } +}