diff --git a/java/hop/ServerApi.java b/java/hop/ServerApi.java index 2631af6..bf72c6a 100644 --- a/java/hop/ServerApi.java +++ b/java/hop/ServerApi.java @@ -4,12 +4,13 @@ package hop; +import java.io.Flushable; import java.io.IOException; import java.util.UUID; /** */ -public class ServerApi { +public class ServerApi implements Flushable { public NodeContainer _container; public String _serverName; public String _kName; @@ -37,9 +38,13 @@ public class ServerApi { _container.post(_serverName, sink, message, null); } + public void flush() throws IOException { + _container.flush(_serverName); + } + public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException { send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName)); - _container.flush(_serverName); + flush(); SexpList reply = _nextReply(); assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok); return reply.get(1); @@ -55,13 +60,13 @@ public class ServerApi { public void unsubscribe(String source, Object token) throws IOException { send(source, SexpMessage.unsubscribe(token)); - _container.flush(_serverName); + flush(); /* TODO: optional synchronous reply? */ } public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException { send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName)); - _container.flush(_serverName); + flush(); SexpList reply = _nextReply(); SexpBytes selector = reply.getBytes(0); if (selector.equals(SexpMessage._create_ok)) return null; diff --git a/java/hop/Test3.java b/java/hop/Test3.java index f77749b..63512ef 100644 --- a/java/hop/Test3.java +++ b/java/hop/Test3.java @@ -21,6 +21,7 @@ public class Test3 { NodeContainer nc = new NodeContainer(); System.out.println("Hostname: " + hostname); + System.out.println("Container: " + nc.getName()); Relay r = new Relay(nc, hostname); @@ -33,7 +34,7 @@ public class Test3 { api.post("q1", null, Integer.toString(i), null); count++; if ((count % 100000) == 0) { - r.flush(); + api.flush(); long now = System.currentTimeMillis(); double delta = (now - startTime) / 1000.0; System.out.println("Sent "+count+" messages in "+delta+" seconds, rate = " + (count / delta) + " Hz"); diff --git a/java/hop/TestPingPong.java b/java/hop/TestPingPong.java index e37dfae..d8de0b6 100644 --- a/java/hop/TestPingPong.java +++ b/java/hop/TestPingPong.java @@ -39,6 +39,7 @@ public class TestPingPong { Object x = sub.getQueue().take(); //System.out.println("Message: " + x); api.post("rep", "reply", SexpList.with("ok").and(x), null); + api.flush(); } } @@ -57,7 +58,8 @@ public class TestPingPong { long startTime = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { api.post("req", "request", Integer.toString(i), null); - sub.getQueue().take(); + api.flush(); + sub.getQueue().take(); int j = i + 1; if ((j % 100) == 0) { long now = System.currentTimeMillis();