diff --git a/relay.ml b/relay.ml index 8601e8b..40368f7 100644 --- a/relay.ml +++ b/relay.ml @@ -15,7 +15,7 @@ let send_error ch message details = print_string "WARNING: Sending error: "; output_sexp Pervasives.stdout m; print_newline (); - output_sexp_and_flush ch m + ch m let send_sexp_syntax_error ch explanation = send_error ch explanation (Str "http://people.csail.mit.edu/rivest/Sexp.txt") @@ -37,23 +37,16 @@ let dispatch_message n ch m = | _ -> send_error ch "Message not understood" (Message.sexp_of_message m) -let output_thread ch cout = - let rec loop v = - match v with - | Some (Some sexp) -> - output_sexp cout sexp; - loop (Event.poll (Event.receive ch)) - | Some None -> - () - | None -> - (* flush cout; *) - loop (Some (Event.sync (Event.receive ch))) - in loop None +let flush_output mtx flush_control cout = + while Event.poll (Event.receive flush_control) = None do + Mutex.lock mtx; + flush cout; + Mutex.unlock mtx; + Thread.delay 0.1 + done -let relay_handler mtx cout n m = - Mutex.lock mtx; - output_sexp cout m; - Mutex.unlock mtx +let relay_handler write_sexp n m = + write_sexp m let relay_main peername cin cout = printf "INFO: Accepted connection from %s\n%!" (endpoint_name peername); @@ -63,17 +56,26 @@ let relay_main peername cin cout = Str "", Str "", Str "", Str "")); let mtx = Mutex.create () in - let n = Node.make "relay" (relay_handler mtx cout) in + let write_sexp s = + Mutex.lock mtx; + output_sexp cout s; + Mutex.unlock mtx + in + let flush_control = Event.new_channel () in + ignore (Util.create_thread (endpoint_name peername ^ " flush") None + (flush_output mtx flush_control) cout); + let n = Node.make "relay" (relay_handler write_sexp) in (try while true do - dispatch_message n cout (Message.message_of_sexp (Sexp.input_sexp cin)) + dispatch_message n write_sexp (Message.message_of_sexp (Sexp.input_sexp cin)) done with | End_of_file -> printf "INFO: Disconnecting %s normally.\n%!" (endpoint_name peername) | Sexp.Syntax_error explanation -> - send_sexp_syntax_error cout explanation); - Node.unbind_all n + send_sexp_syntax_error write_sexp explanation); + Node.unbind_all n; + Event.sync (Event.send flush_control ()) let start_relay' (s, peername) = let cin = in_channel_of_descr s in