Detect socket closure during streaming; implement proper relay for HTTP tap

This commit is contained in:
Tony Garnock-Jones 2012-04-29 16:34:52 -04:00
parent 42f0a6e7fe
commit 6c77e30da4
4 changed files with 170 additions and 70 deletions

View File

@ -20,6 +20,7 @@ open Unix
type version = [`HTTP_1_0 | `HTTP_1_1]
type resp_version = [version | `SAME_AS_REQUEST]
type content = Fixed of string | Variable of Stringstream.t
type completion = Completion_normal | Completion_error
type body = {
headers: (string * string) list;
@ -41,7 +42,8 @@ type resp = {
resp_version: resp_version;
status: int;
reason: string;
resp_body: body
resp_body: body;
completion_callbacks: (completion -> unit) list
}
exception HTTPError of (int * string * body)
@ -54,6 +56,9 @@ let content_type_header_name = "Content-Type"
let html_content_type_header = (content_type_header_name, html_content_type)
let text_content_type_header = (content_type_header_name, text_content_type)
let add_completion_callback resp cb =
{resp with completion_callbacks = cb :: resp.completion_callbacks}
let http_error code reason body = raise (HTTPError (code, reason, body))
let http_error_plain code reason =
@ -77,7 +82,8 @@ let resp_generic code reason headers content =
{ resp_version = `SAME_AS_REQUEST;
status = code;
reason = reason;
resp_body = {headers = headers; content = content} }
resp_body = {headers = headers; content = content};
completion_callbacks = [] }
let resp_generic_ok headers content =
resp_generic 200 "OK" headers content
@ -142,13 +148,13 @@ let render_header cout (k, v) =
output_string cout "\r\n"
let render_chunk cout (chunk, should_flush) =
match chunk with
(match chunk with
| "" -> ()
| _ ->
output_string cout (Printf.sprintf "%x\r\n" (String.length chunk));
output_string cout chunk;
output_string cout "\r\n";
if should_flush then flush cout else ()
output_string cout "\r\n");
if should_flush then flush cout else ()
let render_fixed_content cout s headers_only =
render_header cout ("Content-Length", string_of_int (String.length s));
@ -221,7 +227,7 @@ let parse_urlencoded q =
let pieces = Str.split (Str.regexp "&") q in
List.map parse_urlencoded_binding pieces
let find_header name hs =
let find_header' name hs =
let lc_name = String.lowercase name in
let rec search hs =
match hs with
@ -233,8 +239,11 @@ let find_header name hs =
in
search hs
let find_header' name hs =
try Some (find_header name hs) with Not_found -> None
let find_header name hs =
try Some (find_header' name hs) with Not_found -> None
let find_param name params =
try Some (List.assoc name params) with Not_found -> None
let input_crlf cin =
let line = input_line cin in
@ -263,9 +272,9 @@ let parse_chunks cin =
let parse_body cin =
let headers = parse_headers cin in
match find_header' "Transfer-Encoding" headers with
match find_header "Transfer-Encoding" headers with
| None | Some "identity" ->
(match find_header' "Content-Length" headers with
(match find_header "Content-Length" headers with
| None ->
(* http_error_html 411 "Length required" [] *)
{headers = headers; content = empty_content}
@ -301,7 +310,7 @@ let discard_unread_body req =
| Variable s -> Stringstream.iter (fun v -> ()) s (* force chunks to be read *)
let connection_keepalive req =
find_header' "Connection" req.req_body.headers = Some "keep-alive"
find_header "Connection" req.req_body.headers = Some "keep-alive"
let main handle_req (s, peername) =
let cin = in_channel_of_descr s in
@ -310,16 +319,61 @@ let main handle_req (s, peername) =
(try
let rec request_loop () =
let req = parse_req cin 512 in
render_resp cout req.req_version req.verb (handle_req req);
discard_unread_body req;
flush cout;
let resp = handle_req req in
let completion_mutex = Mutex.create () in
let completion = ref None in
let set_completion v =
Util.with_mutex0 completion_mutex (fun () ->
match !completion with
| None ->
completion := Some v;
List.iter (fun cb -> cb v) resp.completion_callbacks
| Some _ -> ())
in
(* Here we spawn a thread that just watches the socket to see
if it either becomes active or closes during rendering of the
response, so that we can make decisions based on this in any
eventual streaming response generators. In particular, if
we're implementing some kind of XHR streaming andthe client
goes away, we want to abandon the streaming as soon as
possible. *)
let input_waiter () =
try
(let (r, w, e) = Unix.select [s] [] [s] (-1.0) in
set_completion (if r <> [] then Completion_normal else Completion_error))
with _ -> set_completion Completion_error
in
ignore (Thread.create input_waiter ());
(try
render_resp cout req.req_version req.verb resp;
discard_unread_body req;
flush cout;
set_completion Completion_normal
with e ->
set_completion Completion_error;
raise e);
if connection_keepalive req then request_loop () else ()
in
request_loop ()
with HTTPError (code, reason, body) ->
render_resp cout `HTTP_1_0
"GET" (* ugh this should probably be done better *)
{ resp_version = `HTTP_1_0; status = code; reason = reason; resp_body = body })
with _ -> ());
with
| End_of_file ->
()
| HTTPError (code, reason, body) ->
render_resp cout `HTTP_1_0
"GET" (* ugh this should probably be done better *)
{ resp_version = `HTTP_1_0;
status = code;
reason = reason;
resp_body = body;
completion_callbacks = [] })
with
| Sys_error message ->
Log.info "Sys_error in httpd handler" [Sexp.Str message]
| exn ->
Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)]);
(try flush cout with _ -> ());
close s

View File

@ -36,5 +36,6 @@ let _ =
hook_log ();
Amqp_relay.init ();
Ui_main.init ();
Ui_relay.init ();
(* Speedtest.init (); *)
Net.start_net "Hop" 5671 Relay.start

View File

@ -25,7 +25,7 @@ let longest_prefix_first (p1, _) (p2, _) =
let register_dispatcher (prefix, handler) =
dispatch_table := List.sort longest_prefix_first ((prefix, handler) :: !dispatch_table)
let handle_dynamic_req r =
let handle_dynamic_req id r =
let rec search_table table =
match table with
| [] ->
@ -33,75 +33,38 @@ let handle_dynamic_req r =
[Html.tag "p" [] [Html.text ("No route for URL path "^r.Httpd.path)]]
| (prefix, handler) :: rest ->
if Util.starts_with r.Httpd.path prefix
then handler r
then handler id r
else search_table rest
in
search_table !dispatch_table
let handle_req r =
let handle_req id r =
if Util.starts_with r.Httpd.path "/_"
then handle_dynamic_req r
then handle_dynamic_req id r
else
match r.Httpd.verb with
| "GET" | "HEAD" -> Httpd_file.resp_file (Filename.concat "./web" r.Httpd.path)
| _ -> Httpd.http_error_html 400 ("Unsupported HTTP method "^r.Httpd.verb) []
let cleanup_req id () =
match Node.lookup id with
| Some n -> Node.unbind_all n
| None -> ()
let start (s, peername) =
let id = "http-" ^ Uuid.create () in
Util.create_thread (Connections.endpoint_name peername ^ " HTTP service")
None
(Httpd.main handle_req)
(Some (cleanup_req id))
(Httpd.main (handle_req id))
(s, peername)
let boot_time = Unix.time ()
let api_server_stats r =
let api_server_stats id r =
Json.resp_ok [] (Json.Rec
["connection_count", Json.Num (float_of_int !Connections.connection_count);
"boot_time", Json.Num boot_time;
"uptime", Json.Num (Unix.time () -. boot_time)])
let api_tap_source r =
let id = Uuid.create () in
let id_block_and_padding = Stringstream.const_flush (id ^ ";" ^ String.make 2048 'h' ^ ";") in
let rec message_stream () =
Thread.delay 0.1;
let v = Json.to_string (Json.Rec ["now", Json.Num (Unix.time ());
"id", Json.Str (Uuid.create ())]) in
Some (Printf.sprintf "%d;%s;" (String.length v) v, true, Stringstream.make message_stream)
in
Httpd.resp_generic 200 "Streaming"
[Httpd.text_content_type_header;
"Access-Control-Allow-Origin", "*"]
(Httpd.Variable
(Stringstream.switch_after 131072
(Stringstream.seq id_block_and_padding (Stringstream.make message_stream))
Stringstream.empty))
let counter = ref 0
let api_tap_sink r =
let params = Httpd.parse_urlencoded (Httpd.string_of_content r.Httpd.req_body.Httpd.content) in
(* let stream_id = List.assoc "metadata.id" params in *)
match List.assoc "metadata.type" params with
| Some "send" ->
(match List.assoc "data" params with
| Some data_str ->
let data = Json.of_string data_str in
counter := 1 + !counter;
Printf.printf "Data: %d %s\n%!" !counter (Json.to_string data);
Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content)
| _ -> Httpd.http_error_html 406 "Bad data parameter" [])
| _ -> Httpd.http_error_html 406 "Unsupported metadata.type" []
let api_tap r =
match r.Httpd.verb with
| "GET" -> api_tap_source r
| "POST" -> api_tap_sink r
| _ -> Httpd.http_error_html 400 "Unsupported tap method" []
let register_api_hooks () =
List.iter register_dispatcher
["/_/server_stats", api_server_stats;
"/_/tap", api_tap]
let init () =
register_api_hooks ();
register_dispatcher ("/_/server_stats", api_server_stats);
ignore (Util.create_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start)

82
ui_relay.ml Normal file
View File

@ -0,0 +1,82 @@
(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Ocamlmsg. *)
(* Ocamlmsg is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Ocamlmsg is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Ocamlmsg. If not, see <http://www.gnu.org/licenses/>. *)
type outbound_message =
| Data of Sexp.t
| Heartbeat
| Shutdown
let rec message_stream ch =
let deliver_payload payload =
Some (Printf.sprintf "%d;%s;" (String.length payload) payload,
true,
Stringstream.make (message_stream ch))
in
let deliver_sexp_chunk sexp = deliver_payload (Json.to_string (Sexpjson.json_of_sexp sexp)) in
fun () ->
match Squeue.pop ch with
| Data sexp -> deliver_sexp_chunk sexp
| Heartbeat -> deliver_payload ""
| Shutdown -> None
let rec api_tap_source id r =
let ch = Squeue.create 10 in
let handle_message n sexp = Squeue.add (Data sexp) ch in
let n = Node.make "http_tap" handle_message in
if not (Node.bind (id, n))
then Httpd.http_error_html 500 "Internal ID collision" []
else
let id_block_and_padding = Stringstream.const_flush (id ^ ";" ^ String.make 2048 'h' ^ ";") in
Httpd.add_completion_callback
(Httpd.resp_generic 200 "Streaming"
[Httpd.text_content_type_header;
"Access-Control-Allow-Origin", "*"]
(Httpd.Variable
(Stringstream.switch_after 131072
(Stringstream.seq id_block_and_padding (Stringstream.make (message_stream ch)))
Stringstream.empty)))
(fun _ ->
Node.unbind_all n;
Squeue.add Shutdown ch)
let api_tap_sink irrelevant_id r =
let params = Httpd.parse_urlencoded (Httpd.string_of_content r.Httpd.req_body.Httpd.content) in
(* let stream_id = List.assoc "metadata.id" params in *)
match Httpd.find_param "metadata.type" params with
| Some (Some "send") ->
(match Httpd.find_param "data" params with
| Some (Some data_str) ->
let data =
(try Sexpjson.sexp_of_json (Json.of_string data_str)
with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in
(match Message.message_of_sexp data with
| Message.Post (Sexp.Str name, body, token) ->
Node.send_ignore name body;
Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content)
| _ ->
Httpd.http_error_html 406 "Message not understood" [])
| _ -> Httpd.http_error_html 406 "Bad data parameter" [])
| _ -> Httpd.http_error_html 406 "Unsupported metadata.type" []
let api_tap id r =
match r.Httpd.verb with
| "GET" -> api_tap_source id r
| "POST" -> api_tap_sink id r
| _ -> Httpd.http_error_html 400 "Unsupported tap method" []
let init () =
Ui_main.register_dispatcher ("/_/tap", api_tap)