// Copyright 2011, 2012 Tony Garnock-Jones . // // This file is part of Hop. // // Hop is free software: you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Hop is distributed in the hope that it will be useful, but WITHOUT // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public // License for more details. // // You should have received a copy of the GNU General Public License // along with Hop. If not, see . // package hop; import java.io.*; import java.net.Socket; /** */ public class Relay implements Runnable, Node, Flushable { NodeContainer _container; String _remoteName; Socket _sock; String _hostname; int _port; SexpReader _r; OutputStream _output; SexpWriter _w; public Relay(NodeContainer container, String hostname) throws IOException, InterruptedException { this(container, hostname, 5671); } public Relay(NodeContainer container, String hostname, int port) throws IOException, InterruptedException { _container = container; _remoteName = null; _hostname = hostname; _port = port; _connect(); } public String getRemoteName() { return _remoteName; } public void _connect() throws IOException, InterruptedException { _sock = new Socket(_hostname, _port); _sock.setTcpNoDelay(true); _r = new SexpReader(new BufferedInputStream(_sock.getInputStream())); _output = new BufferedOutputStream(_sock.getOutputStream()); _w = new SexpWriter(_output); _login(); new Thread(this).start(); synchronized (this) { while (_remoteName == null) { this.wait(); } } } public void _login() throws IOException { SexpList greeting = _r.readList(); if (!greeting.getBytes(0).getDataString().equals("hop")) { throw new InvalidGreetingException(greeting); } _w.write(SexpMessage.subscribe(_container.getName(), null, null, null, null)); } public void handle(Object message) { try { _w.write(message); } catch (IOException ioe) { ioe.printStackTrace(); System.err.print("Message to be written was: "); try { SexpWriter.write(System.err, message); } catch (IOException ioe2) { ioe2.printStackTrace(); } System.err.println(); } } public void flush() throws IOException { _output.flush(); } public void run() { SexpList m = null; try { while (true) { m = _r.readList(); if (m == null) { break; } //System.err.println("Received: " + m); String selector = m.getBytes(0).getDataString(); if (selector.equals("post") && m.size() == 4) { _container.send(m.getBytes(1).getDataString(), m.get(2)); } else if (selector.equals("subscribe") && m.size() == 6) { if (_remoteName != null) { System.err.println("Double bind attempted"); } else { _remoteName = m.getBytes(1).getDataString(); if (_container.bind(_remoteName, this)) { String replySink = m.getBytes(4).getDataString(); if (replySink.length() > 0) { _container.post(replySink, m.get(5), SexpMessage.subscribe_ok(_remoteName), null); } } else { System.err.println("Bind failed: " + _remoteName); } synchronized (this) { this.notifyAll(); } } } else if (selector.equals("unsubscribe") && m.size() == 2) { if (!m.getBytes(1).getDataString().equals(_remoteName)) { System.err.println("Unknown unbind attempted"); } else { if (!_container.unbind(m.getBytes(1).getDataString())) { System.err.println("Unbind failed: " + m.get(1)); } } } else { System.err.print("Unknown message: "); SexpWriter.write(System.err, m); System.err.println(); } } } catch (IOException ioe) { ioe.printStackTrace(); System.err.print("Most recent received message: "); try { SexpWriter.write(System.err, m); } catch (IOException ioe2) { ioe2.printStackTrace(); } System.err.println(); } } }