(* Copyright 2012 Tony Garnock-Jones . *) (* This file is part of Hop. *) (* Hop is free software: you can redistribute it and/or modify it *) (* under the terms of the GNU General Public License as published by the *) (* Free Software Foundation, either version 3 of the License, or (at your *) (* option) any later version. *) (* Hop is distributed in the hope that it will be useful, but *) (* WITHOUT ANY WARRANTY; without even the implied warranty of *) (* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) (* General Public License for more details. *) (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) open Lwt open Unix open Printf open Sexp let connection_count = ref 0 let endpoint_name n = match n with | ADDR_INET (host, port) -> sprintf "%s:%d" (string_of_inet_addr host) port | _ -> "??unknown??" let flush_output flush_control cout = let keep_running = ref true in Lwt.pick [ Lwt_stream.next flush_control; while_lwt !keep_running do try_lwt lwt () = Lwt_io.flush cout in Lwt_unix.sleep 0.1 with _ -> keep_running := false; return () done ] let connection_main class_name peername cin cout issue_banner boot_fn node_fn mainloop = ignore (Log.info ("Accepted "^class_name) [Str (endpoint_name peername)]); match_lwt issue_banner cin cout with | true -> let (flush_control, flush_stop) = Lwt_stream.create () in ignore (flush_output flush_control cout); lwt shared_state = boot_fn (peername, cin, cout) in let n = Node.make class_name (node_fn shared_state) in lwt () = (try_lwt mainloop shared_state n with | End_of_file -> Log.info ("Disconnecting "^class_name^" normally") [Str (endpoint_name peername)] | Sys_error message -> Log.warn ("Disconnected "^class_name^" by Sys_error") [Str (endpoint_name peername); Str message] | exn -> Log.error ("Uncaught exception in "^class_name) [Str (Printexc.to_string exn)]) in flush_stop None; Node.unbind_all n | false -> Log.error ("Disconnected "^class_name^" by failed initial handshake") [] let start_connection' class_name issue_banner boot_fn node_fn mainloop (s, peername) = let cin = Lwt_io.of_fd Lwt_io.input s in let cout = Lwt_io.of_fd Lwt_io.output s in connection_count := !connection_count + 1; lwt () = connection_main class_name peername cin cout issue_banner boot_fn node_fn mainloop in connection_count := !connection_count - 1; lwt () = (try_lwt Lwt_io.flush cout with _ -> return ()) in Lwt_unix.close s let start_connection class_name issue_banner boot_fn node_fn mainloop (s, peername) = Util.create_thread (endpoint_name peername ^ " input") None (start_connection' class_name issue_banner boot_fn node_fn mainloop) (s, peername)