Asynchronously flush output buffers.

This commit is contained in:
Tony Garnock-Jones 2012-01-08 13:19:58 -05:00
parent f7fdb70daf
commit 7dff60b8a3
1 changed files with 23 additions and 21 deletions

View File

@ -15,7 +15,7 @@ let send_error ch message details =
print_string "WARNING: Sending error: "; print_string "WARNING: Sending error: ";
output_sexp Pervasives.stdout m; output_sexp Pervasives.stdout m;
print_newline (); print_newline ();
output_sexp_and_flush ch m ch m
let send_sexp_syntax_error ch explanation = let send_sexp_syntax_error ch explanation =
send_error ch explanation (Str "http://people.csail.mit.edu/rivest/Sexp.txt") send_error ch explanation (Str "http://people.csail.mit.edu/rivest/Sexp.txt")
@ -37,23 +37,16 @@ let dispatch_message n ch m =
| _ -> | _ ->
send_error ch "Message not understood" (Message.sexp_of_message m) send_error ch "Message not understood" (Message.sexp_of_message m)
let output_thread ch cout = let flush_output mtx flush_control cout =
let rec loop v = while Event.poll (Event.receive flush_control) = None do
match v with Mutex.lock mtx;
| Some (Some sexp) -> flush cout;
output_sexp cout sexp; Mutex.unlock mtx;
loop (Event.poll (Event.receive ch)) Thread.delay 0.1
| Some None -> done
()
| None ->
(* flush cout; *)
loop (Some (Event.sync (Event.receive ch)))
in loop None
let relay_handler mtx cout n m = let relay_handler write_sexp n m =
Mutex.lock mtx; write_sexp m
output_sexp cout m;
Mutex.unlock mtx
let relay_main peername cin cout = let relay_main peername cin cout =
printf "INFO: Accepted connection from %s\n%!" (endpoint_name peername); printf "INFO: Accepted connection from %s\n%!" (endpoint_name peername);
@ -63,17 +56,26 @@ let relay_main peername cin cout =
Str "", Str "", Str "", Str "",
Str "", Str "")); Str "", Str ""));
let mtx = Mutex.create () in let mtx = Mutex.create () in
let n = Node.make "relay" (relay_handler mtx cout) in let write_sexp s =
Mutex.lock mtx;
output_sexp cout s;
Mutex.unlock mtx
in
let flush_control = Event.new_channel () in
ignore (Util.create_thread (endpoint_name peername ^ " flush") None
(flush_output mtx flush_control) cout);
let n = Node.make "relay" (relay_handler write_sexp) in
(try (try
while true do while true do
dispatch_message n cout (Message.message_of_sexp (Sexp.input_sexp cin)) dispatch_message n write_sexp (Message.message_of_sexp (Sexp.input_sexp cin))
done done
with with
| End_of_file -> | End_of_file ->
printf "INFO: Disconnecting %s normally.\n%!" (endpoint_name peername) printf "INFO: Disconnecting %s normally.\n%!" (endpoint_name peername)
| Sexp.Syntax_error explanation -> | Sexp.Syntax_error explanation ->
send_sexp_syntax_error cout explanation); send_sexp_syntax_error write_sexp explanation);
Node.unbind_all n Node.unbind_all n;
Event.sync (Event.send flush_control ())
let start_relay' (s, peername) = let start_relay' (s, peername) =
let cin = in_channel_of_descr s in let cin = in_channel_of_descr s in