diff --git a/connections.ml b/connections.ml new file mode 100644 index 0000000..de0c731 --- /dev/null +++ b/connections.ml @@ -0,0 +1,61 @@ +open Unix +open Printf +open Thread +open Sexp + +let connection_count = ref 0 + +let endpoint_name n = + match n with + | ADDR_INET (host, port) -> sprintf "%s:%d" (string_of_inet_addr host) port + | _ -> "??unknown??" + +let flush_output mtx flush_control cout = + let rec loop () = + match Event.poll (Event.receive flush_control) with + | Some () -> () + | None -> + let ok = Util.with_mutex0 mtx (fun () -> try flush cout; true with _ -> false) in + if ok then (Thread.delay 0.1; loop ()) else () + in loop () + +let connection_main class_name peername cin cout issue_banner node_fn mainloop = + Log.info ("Accepted "^class_name) [Str (endpoint_name peername)]; + if issue_banner cin cout + then + let mtx = Mutex.create () in + let flush_control = Event.new_channel () in + ignore (Util.create_thread (endpoint_name peername ^ " flush") None + (flush_output mtx flush_control) cout); + let n = Node.make class_name (node_fn mtx cin cout) in + (try + mainloop peername mtx cin cout n + with + | End_of_file -> + Log.info ("Disconnecting "^class_name^" normally") [Str (endpoint_name peername)] + | Sys_error message -> + Log.warn ("Disconnected "^class_name^" by Sys_error") + [Str (endpoint_name peername); Str message] + | exn -> + Log.error ("Uncaught exception in "^class_name) [Str (Printexc.to_string exn)] + ); + Node.unbind_all n; + Event.sync (Event.send flush_control ()) + else + Log.error ("Disconnected "^class_name^" by failed initial handshake") [] + +let start_connection' class_name issue_banner node_fn mainloop (s, peername) = + let cin = in_channel_of_descr s in + let cout = out_channel_of_descr s in + connection_count := !connection_count + 1; + connection_main class_name peername cin cout issue_banner node_fn mainloop; + connection_count := !connection_count - 1; + (try flush cout with _ -> ()); + close s + +let start_connection class_name issue_banner node_fn mainloop (s, peername) = + Util.create_thread + (endpoint_name peername ^ " input") + None + (start_connection' class_name issue_banner node_fn mainloop) + (s, peername) diff --git a/relay.ml b/relay.ml index 034e69b..6e30a5d 100644 --- a/relay.ml +++ b/relay.ml @@ -3,18 +3,9 @@ open Printf open Thread open Sexp -let connection_count = ref 0 - -let endpoint_name n = - match n with - | ADDR_INET (host, port) -> sprintf "%s:%d" (string_of_inet_addr host) port - | _ -> "??unknown??" - let send_error ch message details = let m = Message.error (Str message, details) in - print_string "WARNING: Sending error: "; - output_sexp Pervasives.stdout m; - print_newline (); + Log.warn "Sending error" [m]; ch m let send_sexp_syntax_error ch explanation = @@ -39,54 +30,29 @@ let dispatch_message n ch m = | _ -> send_error ch "Message not understood" (Message.sexp_of_message m) -let flush_output mtx flush_control cout = - let rec loop () = - match Event.poll (Event.receive flush_control) with - | Some () -> () - | None -> - let ok = Util.with_mutex0 mtx (fun () -> try flush cout; true with _ -> false) in - if ok then (Thread.delay 0.1; loop ()) else () - in loop () - -let relay_handler write_sexp n m = - write_sexp m - -let relay_main peername cin cout = - Log.info "Accepted connection" [Str (endpoint_name peername)]; +let issue_banner cin cout = output_sexp_and_flush cout (Arr [Str "hop"; Str ""]); output_sexp_and_flush cout (Message.subscribe (Str (Node.local_container_name()), Str "", Str "", Str "", Str "")); - let mtx = Mutex.create () in + true + +let relay_handler mtx cin cout n m = + Util.with_mutex mtx (output_sexp cout) m + +let relay_mainloop peername mtx cin cout n = let write_sexp = Util.with_mutex mtx (output_sexp cout) in - let flush_control = Event.new_channel () in - ignore (Util.create_thread (endpoint_name peername ^ " flush") None - (flush_output mtx flush_control) cout); - let n = Node.make "relay" (relay_handler write_sexp) in (try while true do dispatch_message n write_sexp (Message.message_of_sexp (Sexp.input_sexp cin)) done with - | End_of_file -> - Log.info "Disconnecting normally" [Str (endpoint_name peername)] | Sexp.Syntax_error explanation -> - send_sexp_syntax_error write_sexp explanation - | Sys_error message -> - Log.warn "Disconnected by Sys_error" [Str (endpoint_name peername); Str message] - ); - Node.unbind_all n; - Event.sync (Event.send flush_control ()) + (send_sexp_syntax_error write_sexp explanation; + Log.info "Disconnected relay for syntax error" + [Str (Connections.endpoint_name peername); Str explanation]) + ) -let start_relay' (s, peername) = - let cin = in_channel_of_descr s in - let cout = out_channel_of_descr s in - connection_count := 1 + !connection_count; - relay_main peername cin cout; - connection_count := 0 + !connection_count; - try flush cout with _ -> (); - close s - -let start_relay (s, peername) = - Util.create_thread (endpoint_name peername ^ " input") None start_relay' (s, peername) +let start (s, peername) = + Connections.start_connection "relay" issue_banner relay_handler relay_mainloop (s, peername)