hop-2012/java/hop/ServerApi.java

89 lines
3.1 KiB
Java

/*
* Copyright (c) 2011 Tony Garnock-Jones. All rights reserved.
*/
package hop;
import java.io.Flushable;
import java.io.IOException;
import java.util.UUID;
/**
*/
public class ServerApi implements Flushable {
public NodeContainer _container;
public String _serverName;
public String _kName;
public HalfQueue _k;
public ServerApi(NodeContainer container, String serverName) {
_container = container;
_serverName = serverName;
_kName = UUID.randomUUID().toString();
_k = new HalfQueue();
_container.bind(_kName, _k);
}
public SexpList _nextReply() throws InterruptedException, SexpSyntaxError {
Object x = _k.getQueue().take();
if (x instanceof SexpList) return (SexpList) x;
throw new SexpSyntaxError("Unexpected non-list");
}
public void post(String sink, Object name, Object message, Object token) {
_container.post(_serverName, sink, SexpMessage.post(name, message, token), null);
}
public void send(String sink, Object message) {
_container.post(_serverName, sink, message, null);
}
public void flush() throws IOException {
_container.flush(_serverName);
}
public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException {
send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName));
flush();
SexpList reply = _nextReply();
assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok);
return reply.get(1);
}
public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, IOException {
return subscribe(source, filter, _container.getName(), name);
}
public Subscription subscribe(String source, Object filter) throws InterruptedException, IOException {
return new Subscription(this, source, filter);
}
public void unsubscribe(String source, Object token) throws IOException {
send(source, SexpMessage.unsubscribe(token));
flush();
/* TODO: optional synchronous reply? */
}
public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException {
send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName));
flush();
SexpList reply = _nextReply();
SexpBytes selector = reply.getBytes(0);
if (selector.equals(SexpMessage._create_ok)) return null;
assert selector.equals(SexpMessage._create_failed);
return reply.get(1);
}
public Object createQueue(String name) throws InterruptedException, IOException {
return create("queue", SexpList.with(name));
}
public Object createFanout(String name) throws InterruptedException, IOException {
return create("fanout", SexpList.with(name));
}
public Object createDirect(String name) throws InterruptedException, IOException {
return create("direct", SexpList.with(name));
}
}