// Copyright 2011, 2012 Tony Garnock-Jones . // // This file is part of Hop. // // Hop is free software: you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Hop is distributed in the hope that it will be useful, but WITHOUT // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public // License for more details. // // You should have received a copy of the GNU General Public License // along with Hop. If not, see . // package hop; import java.io.Flushable; import java.io.IOException; import java.util.UUID; /** */ public class ServerApi implements Flushable { public NodeContainer _container; public String _serverName; public String _kName; public HalfQueue _k; public ServerApi(NodeContainer container, String serverName) { _container = container; _serverName = serverName; _kName = UUID.randomUUID().toString(); _k = new HalfQueue(); _container.bind(_kName, _k); } public SexpList _nextReply() throws InterruptedException, SexpSyntaxError { Object x = _k.getQueue().take(); if (x instanceof SexpList) return (SexpList) x; throw new SexpSyntaxError("Unexpected non-list"); } public void post(String sink, Object name, Object message, Object token) { _container.post(_serverName, sink, SexpMessage.post(name, message, token), null); } public void send(String sink, Object message) { _container.post(_serverName, sink, message, null); } public void flush() throws IOException { _container.flush(_serverName); } public synchronized Object subscribe(String source, Object filter, String sink, String name) throws InterruptedException, IOException { send(source, SexpMessage.subscribe(filter, sink, name, _container.getName(), _kName)); flush(); SexpList reply = _nextReply(); assert reply.getBytes(0).getDataString().equals(SexpMessage._subscribe_ok); return reply.get(1); } public synchronized Object subscribe(String source, Object filter, String name) throws InterruptedException, IOException { return subscribe(source, filter, _container.getName(), name); } public Subscription subscribe(String source, Object filter) throws InterruptedException, IOException { return new Subscription(this, source, filter); } public void unsubscribe(String source, Object token) throws IOException { send(source, SexpMessage.unsubscribe(token)); flush(); /* TODO: optional synchronous reply? */ } public synchronized Object create(String nodeClassName, Object arg) throws InterruptedException, IOException { send("factory", SexpMessage.create(nodeClassName, arg, _container.getName(), _kName)); flush(); SexpList reply = _nextReply(); SexpBytes selector = reply.getBytes(0); if (selector.equals(SexpMessage._create_ok)) return null; assert selector.equals(SexpMessage._create_failed); return reply.get(1); } public Object createQueue(String name) throws InterruptedException, IOException { return create("queue", SexpList.with(name)); } public Object createFanout(String name) throws InterruptedException, IOException { return create("fanout", SexpList.with(name)); } public Object createDirect(String name) throws InterruptedException, IOException { return create("direct", SexpList.with(name)); } }