2012-01-08 17:41:04 +00:00
|
|
|
open Unix
|
|
|
|
open Printf
|
|
|
|
open Thread
|
|
|
|
open Sexp
|
|
|
|
|
|
|
|
let connection_count = ref 0
|
|
|
|
|
|
|
|
let endpoint_name n =
|
|
|
|
match n with
|
|
|
|
| ADDR_INET (host, port) -> sprintf "%s:%d" (string_of_inet_addr host) port
|
|
|
|
| _ -> "??unknown??"
|
|
|
|
|
|
|
|
let send_error ch message details =
|
|
|
|
let m = Message.error (Str message, details) in
|
|
|
|
print_string "WARNING: Sending error: ";
|
|
|
|
output_sexp Pervasives.stdout m;
|
|
|
|
print_newline ();
|
2012-01-08 18:19:58 +00:00
|
|
|
ch m
|
2012-01-08 17:41:04 +00:00
|
|
|
|
|
|
|
let send_sexp_syntax_error ch explanation =
|
|
|
|
send_error ch explanation (Str "http://people.csail.mit.edu/rivest/Sexp.txt")
|
|
|
|
|
|
|
|
let dispatch_message n ch m =
|
|
|
|
match m with
|
|
|
|
| Message.Post (Str name, body, token) ->
|
|
|
|
Node.send_ignore name body
|
|
|
|
| Message.Subscribe (Str filter, sink, name, Str reply_sink, Str reply_name) ->
|
|
|
|
if Node.bind(filter, n)
|
|
|
|
then Node.post_ignore
|
|
|
|
reply_sink
|
|
|
|
(Str reply_name)
|
|
|
|
(Message.subscribe_ok (Str filter))
|
|
|
|
(Str "")
|
2012-01-08 19:48:07 +00:00
|
|
|
else Log.warn "Bind failed" [Str filter]
|
2012-01-08 17:41:04 +00:00
|
|
|
| Message.Unsubscribe token ->
|
|
|
|
() (* %%% TODO *)
|
|
|
|
| _ ->
|
|
|
|
send_error ch "Message not understood" (Message.sexp_of_message m)
|
|
|
|
|
2012-01-08 18:19:58 +00:00
|
|
|
let flush_output mtx flush_control cout =
|
2012-01-08 19:03:51 +00:00
|
|
|
let rec loop () =
|
|
|
|
match Event.poll (Event.receive flush_control) with
|
|
|
|
| Some () -> ()
|
|
|
|
| None ->
|
2012-01-08 19:52:03 +00:00
|
|
|
let ok = Util.with_mutex0 mtx (fun () -> try flush cout; true with _ -> false) in
|
2012-01-08 19:03:51 +00:00
|
|
|
if ok then (Thread.delay 0.1; loop ()) else ()
|
|
|
|
in loop ()
|
2012-01-08 17:41:04 +00:00
|
|
|
|
2012-01-08 18:19:58 +00:00
|
|
|
let relay_handler write_sexp n m =
|
|
|
|
write_sexp m
|
2012-01-08 17:41:04 +00:00
|
|
|
|
|
|
|
let relay_main peername cin cout =
|
2012-01-08 19:48:07 +00:00
|
|
|
Log.info "Accepted connection" [Str (endpoint_name peername)];
|
2012-01-08 17:41:04 +00:00
|
|
|
output_sexp_and_flush cout (Arr [Str "hop"; Str ""]);
|
|
|
|
output_sexp_and_flush cout
|
|
|
|
(Message.subscribe (Str (Node.local_container_name()),
|
|
|
|
Str "", Str "",
|
|
|
|
Str "", Str ""));
|
|
|
|
let mtx = Mutex.create () in
|
2012-01-08 19:52:03 +00:00
|
|
|
let write_sexp = Util.with_mutex mtx (output_sexp cout) in
|
2012-01-08 18:19:58 +00:00
|
|
|
let flush_control = Event.new_channel () in
|
|
|
|
ignore (Util.create_thread (endpoint_name peername ^ " flush") None
|
|
|
|
(flush_output mtx flush_control) cout);
|
|
|
|
let n = Node.make "relay" (relay_handler write_sexp) in
|
2012-01-08 17:41:04 +00:00
|
|
|
(try
|
|
|
|
while true do
|
2012-01-08 18:19:58 +00:00
|
|
|
dispatch_message n write_sexp (Message.message_of_sexp (Sexp.input_sexp cin))
|
2012-01-08 17:41:04 +00:00
|
|
|
done
|
|
|
|
with
|
|
|
|
| End_of_file ->
|
2012-01-08 19:48:07 +00:00
|
|
|
Log.info "Disconnecting normally" [Str (endpoint_name peername)]
|
2012-01-08 17:41:04 +00:00
|
|
|
| Sexp.Syntax_error explanation ->
|
2012-01-08 19:03:51 +00:00
|
|
|
send_sexp_syntax_error write_sexp explanation
|
|
|
|
| Sys_error message ->
|
2012-01-08 19:48:07 +00:00
|
|
|
Log.warn "Disconnected by Sys_error" [Str (endpoint_name peername); Str message]
|
2012-01-08 19:03:51 +00:00
|
|
|
);
|
2012-01-08 18:19:58 +00:00
|
|
|
Node.unbind_all n;
|
|
|
|
Event.sync (Event.send flush_control ())
|
2012-01-08 17:41:04 +00:00
|
|
|
|
|
|
|
let start_relay' (s, peername) =
|
|
|
|
let cin = in_channel_of_descr s in
|
|
|
|
let cout = out_channel_of_descr s in
|
|
|
|
connection_count := 1 + !connection_count;
|
|
|
|
relay_main peername cin cout;
|
|
|
|
connection_count := 0 + !connection_count;
|
2012-01-08 19:03:51 +00:00
|
|
|
try flush cout with _ -> ();
|
2012-01-08 17:41:04 +00:00
|
|
|
close s
|
|
|
|
|
|
|
|
let start_relay (s, peername) =
|
|
|
|
Util.create_thread (endpoint_name peername ^ " input") None start_relay' (s, peername)
|