hop-2012/server/relay.ml

89 lines
3.2 KiB
OCaml

(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Hop. *)
(* Hop is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Hop is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Lwt
open Unix
open Printf
open Sexp
let send_error ch message details =
let m = Message.error (Str message, details) in
ignore (Log.warn "Sending error" [m]);
ch m
let send_sexp_syntax_error ch explanation =
send_error ch explanation (litstr "http://people.csail.mit.edu/rivest/Sexp.txt")
let dispatch_message n ch =
let lookup = Node.caching_name_of_bytes () in
function
| Message.Post (Str name, body, token) ->
Node.send_ignore (lookup name) body
| Message.Subscribe (Str filter, sink, name, Str reply_sink, Str reply_name) ->
(match_lwt Node.bind (Node.name_of_bytes filter, n) with
| true ->
Node.post_ignore'
reply_sink
(Str reply_name)
(Message.subscribe_ok (Str filter))
emptystr
| false ->
Log.warn "Bind failed" [Str filter])
| Message.Unsubscribe (Str token) ->
(match_lwt Node.unbind (Node.name_of_bytes token) with
| true -> return ()
| false -> Log.warn "Unbind failed" [Str token])
| m ->
send_error ch (Bytes.of_string "Message not understood") (Message.sexp_of_message m)
let issue_banner cin cout =
lwt () = output_sexp cout (Arr [litstr "hop"; emptystr]) in
lwt () = Lwt_io.flush cout in
lwt () = output_sexp cout (Message.subscribe (Str (Node.local_container_name()),
emptystr, emptystr,
emptystr, emptystr)) in
lwt () = Lwt_io.flush cout in
return true
let relay_boot (peername, cin, cout) = return (peername, Lwt_mutex.create (), cin, cout)
let relay_handler (_, mtx, _, cout) _ m =
Lwt_mutex.with_lock mtx (fun () -> output_sexp cout m)
let relay_mainloop (peername, mtx, cin, cout) n =
let write_sexp sexp = Lwt_mutex.with_lock mtx (fun () -> output_sexp cout sexp) in
(try_lwt
let dispatcher = dispatch_message n write_sexp in
while_lwt true do
lwt message_sexp = Sexp.input_sexp cin in
dispatcher (Message.message_of_sexp message_sexp)
done
with
| Sexp.Syntax_error explanation ->
lwt () = send_sexp_syntax_error write_sexp (Bytes.of_string explanation) in
Log.info "Disconnected relay for syntax error"
[str (Connections.endpoint_name peername);
str explanation])
let start (s, peername) =
Connections.start_connection "relay" issue_banner
relay_boot relay_handler relay_mainloop (s, peername)
let init () =
let port = Config.get_int "hop.port" 5671 in
Util.create_daemon_thread (Bytes.of_string "Hop listener") None (Net.start_net "Hop" port) start