102 lines
3.7 KiB
Java
102 lines
3.7 KiB
Java
// Copyright 2011, 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>.
|
|
//
|
|
// This file is part of Hop.
|
|
//
|
|
// Hop is free software: you can redistribute it and/or modify it
|
|
// under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// Hop is distributed in the hope that it will be useful, but WITHOUT
|
|
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
|
// or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
|
|
// License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with Hop. If not, see <http://www.gnu.org/licenses/>.
|
|
//
|
|
package hop;
|
|
|
|
import java.io.Flushable;
|
|
import java.io.IOException;
|
|
import java.util.UUID;
|
|
|
|
/**
|
|
*/
|
|
public class ServerApi implements Flushable {
|
|
public NodeContainer _container;
|
|
public String _serverName;
|
|
public String _kName;
|
|
public HalfQueue _k;
|
|
|
|
public ServerApi(NodeContainer container, String serverName) {
|
|
_container = container;
|
|
_serverName = serverName;
|
|
_kName = UUID.randomUUID().toString();
|
|
_k = new HalfQueue();
|
|
_container.bind(_kName, _k);
|
|
}
|
|
|
|
public SexpList _nextReply() throws InterruptedException, SexpSyntaxError {
|
|
Object x = _k.getQueue().take();
|
|
if (x instanceof SexpList) return (SexpList) x;
|
|
throw new SexpSyntaxError("Unexpected non-list");
|
|
}
|
|
|
|
public void post(String sink, Object name, Object message, Object token) {
|
|
_container.post(_serverName, sink, SexpMessage.post(name, message, token), null);
|
|
}
|
|
|
|
public void send(String sink, Object message) {
|
|
_container.post(_serverName, sink, message, null);
|
|
}
|
|
|
|
public void flush() throws IOException {
|
|
_container.flush(_serverName);
|
|
}
|
|
|
|
public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException {
|
|
send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName));
|
|
flush();
|
|
SexpList reply = _nextReply();
|
|
assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok);
|
|
return reply.get(1);
|
|
}
|
|
|
|
public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, IOException {
|
|
return subscribe(source, filter, _container.getName(), name);
|
|
}
|
|
|
|
public Subscription subscribe(String source, Object filter) throws InterruptedException, IOException {
|
|
return new Subscription(this, source, filter);
|
|
}
|
|
|
|
public void unsubscribe(String source, Object token) throws IOException {
|
|
send(source, SexpMessage.unsubscribe(token));
|
|
flush();
|
|
/* TODO: optional synchronous reply? */
|
|
}
|
|
|
|
public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException {
|
|
send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName));
|
|
flush();
|
|
SexpList reply = _nextReply();
|
|
SexpBytes selector = reply.getBytes(0);
|
|
if (selector.equals(SexpMessage._create_ok)) return null;
|
|
assert selector.equals(SexpMessage._create_failed);
|
|
return reply.get(1);
|
|
}
|
|
|
|
public Object createQueue(String name) throws InterruptedException, IOException {
|
|
return create("queue", SexpList.with(name));
|
|
}
|
|
|
|
public Object createFanout(String name) throws InterruptedException, IOException {
|
|
return create("fanout", SexpList.with(name));
|
|
}
|
|
|
|
public Object createDirect(String name) throws InterruptedException, IOException {
|
|
return create("direct", SexpList.with(name));
|
|
}
|
|
}
|