diff --git a/httpd.ml b/httpd.ml index 04327eb..cdf6fb0 100644 --- a/httpd.ml +++ b/httpd.ml @@ -20,6 +20,7 @@ open Unix type version = [`HTTP_1_0 | `HTTP_1_1] type resp_version = [version | `SAME_AS_REQUEST] type content = Fixed of string | Variable of Stringstream.t +type completion = Completion_normal | Completion_error type body = { headers: (string * string) list; @@ -41,7 +42,8 @@ type resp = { resp_version: resp_version; status: int; reason: string; - resp_body: body + resp_body: body; + completion_callbacks: (completion -> unit) list } exception HTTPError of (int * string * body) @@ -54,6 +56,9 @@ let content_type_header_name = "Content-Type" let html_content_type_header = (content_type_header_name, html_content_type) let text_content_type_header = (content_type_header_name, text_content_type) +let add_completion_callback resp cb = + {resp with completion_callbacks = cb :: resp.completion_callbacks} + let http_error code reason body = raise (HTTPError (code, reason, body)) let http_error_plain code reason = @@ -77,7 +82,8 @@ let resp_generic code reason headers content = { resp_version = `SAME_AS_REQUEST; status = code; reason = reason; - resp_body = {headers = headers; content = content} } + resp_body = {headers = headers; content = content}; + completion_callbacks = [] } let resp_generic_ok headers content = resp_generic 200 "OK" headers content @@ -142,13 +148,13 @@ let render_header cout (k, v) = output_string cout "\r\n" let render_chunk cout (chunk, should_flush) = - match chunk with + (match chunk with | "" -> () | _ -> output_string cout (Printf.sprintf "%x\r\n" (String.length chunk)); output_string cout chunk; - output_string cout "\r\n"; - if should_flush then flush cout else () + output_string cout "\r\n"); + if should_flush then flush cout else () let render_fixed_content cout s headers_only = render_header cout ("Content-Length", string_of_int (String.length s)); @@ -221,7 +227,7 @@ let parse_urlencoded q = let pieces = Str.split (Str.regexp "&") q in List.map parse_urlencoded_binding pieces -let find_header name hs = +let find_header' name hs = let lc_name = String.lowercase name in let rec search hs = match hs with @@ -233,8 +239,11 @@ let find_header name hs = in search hs -let find_header' name hs = - try Some (find_header name hs) with Not_found -> None +let find_header name hs = + try Some (find_header' name hs) with Not_found -> None + +let find_param name params = + try Some (List.assoc name params) with Not_found -> None let input_crlf cin = let line = input_line cin in @@ -263,9 +272,9 @@ let parse_chunks cin = let parse_body cin = let headers = parse_headers cin in - match find_header' "Transfer-Encoding" headers with + match find_header "Transfer-Encoding" headers with | None | Some "identity" -> - (match find_header' "Content-Length" headers with + (match find_header "Content-Length" headers with | None -> (* http_error_html 411 "Length required" [] *) {headers = headers; content = empty_content} @@ -301,7 +310,7 @@ let discard_unread_body req = | Variable s -> Stringstream.iter (fun v -> ()) s (* force chunks to be read *) let connection_keepalive req = - find_header' "Connection" req.req_body.headers = Some "keep-alive" + find_header "Connection" req.req_body.headers = Some "keep-alive" let main handle_req (s, peername) = let cin = in_channel_of_descr s in @@ -310,16 +319,61 @@ let main handle_req (s, peername) = (try let rec request_loop () = let req = parse_req cin 512 in - render_resp cout req.req_version req.verb (handle_req req); - discard_unread_body req; - flush cout; + let resp = handle_req req in + + let completion_mutex = Mutex.create () in + let completion = ref None in + let set_completion v = + Util.with_mutex0 completion_mutex (fun () -> + match !completion with + | None -> + completion := Some v; + List.iter (fun cb -> cb v) resp.completion_callbacks + | Some _ -> ()) + in + + (* Here we spawn a thread that just watches the socket to see + if it either becomes active or closes during rendering of the + response, so that we can make decisions based on this in any + eventual streaming response generators. In particular, if + we're implementing some kind of XHR streaming andthe client + goes away, we want to abandon the streaming as soon as + possible. *) + let input_waiter () = + try + (let (r, w, e) = Unix.select [s] [] [s] (-1.0) in + set_completion (if r <> [] then Completion_normal else Completion_error)) + with _ -> set_completion Completion_error + in + ignore (Thread.create input_waiter ()); + + (try + render_resp cout req.req_version req.verb resp; + discard_unread_body req; + flush cout; + set_completion Completion_normal + with e -> + set_completion Completion_error; + raise e); + if connection_keepalive req then request_loop () else () in request_loop () - with HTTPError (code, reason, body) -> - render_resp cout `HTTP_1_0 - "GET" (* ugh this should probably be done better *) - { resp_version = `HTTP_1_0; status = code; reason = reason; resp_body = body }) - with _ -> ()); + with + | End_of_file -> + () + | HTTPError (code, reason, body) -> + render_resp cout `HTTP_1_0 + "GET" (* ugh this should probably be done better *) + { resp_version = `HTTP_1_0; + status = code; + reason = reason; + resp_body = body; + completion_callbacks = [] }) + with + | Sys_error message -> + Log.info "Sys_error in httpd handler" [Sexp.Str message] + | exn -> + Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)]); (try flush cout with _ -> ()); close s diff --git a/ocamlmsg.ml b/ocamlmsg.ml index ba59004..f940a7b 100644 --- a/ocamlmsg.ml +++ b/ocamlmsg.ml @@ -36,5 +36,6 @@ let _ = hook_log (); Amqp_relay.init (); Ui_main.init (); + Ui_relay.init (); (* Speedtest.init (); *) Net.start_net "Hop" 5671 Relay.start diff --git a/ui_main.ml b/ui_main.ml index fa7130b..98ad174 100644 --- a/ui_main.ml +++ b/ui_main.ml @@ -25,7 +25,7 @@ let longest_prefix_first (p1, _) (p2, _) = let register_dispatcher (prefix, handler) = dispatch_table := List.sort longest_prefix_first ((prefix, handler) :: !dispatch_table) -let handle_dynamic_req r = +let handle_dynamic_req id r = let rec search_table table = match table with | [] -> @@ -33,75 +33,38 @@ let handle_dynamic_req r = [Html.tag "p" [] [Html.text ("No route for URL path "^r.Httpd.path)]] | (prefix, handler) :: rest -> if Util.starts_with r.Httpd.path prefix - then handler r + then handler id r else search_table rest in search_table !dispatch_table -let handle_req r = +let handle_req id r = if Util.starts_with r.Httpd.path "/_" - then handle_dynamic_req r + then handle_dynamic_req id r else match r.Httpd.verb with | "GET" | "HEAD" -> Httpd_file.resp_file (Filename.concat "./web" r.Httpd.path) | _ -> Httpd.http_error_html 400 ("Unsupported HTTP method "^r.Httpd.verb) [] +let cleanup_req id () = + match Node.lookup id with + | Some n -> Node.unbind_all n + | None -> () + let start (s, peername) = + let id = "http-" ^ Uuid.create () in Util.create_thread (Connections.endpoint_name peername ^ " HTTP service") - None - (Httpd.main handle_req) + (Some (cleanup_req id)) + (Httpd.main (handle_req id)) (s, peername) let boot_time = Unix.time () -let api_server_stats r = +let api_server_stats id r = Json.resp_ok [] (Json.Rec ["connection_count", Json.Num (float_of_int !Connections.connection_count); "boot_time", Json.Num boot_time; "uptime", Json.Num (Unix.time () -. boot_time)]) -let api_tap_source r = - let id = Uuid.create () in - let id_block_and_padding = Stringstream.const_flush (id ^ ";" ^ String.make 2048 'h' ^ ";") in - let rec message_stream () = - Thread.delay 0.1; - let v = Json.to_string (Json.Rec ["now", Json.Num (Unix.time ()); - "id", Json.Str (Uuid.create ())]) in - Some (Printf.sprintf "%d;%s;" (String.length v) v, true, Stringstream.make message_stream) - in - Httpd.resp_generic 200 "Streaming" - [Httpd.text_content_type_header; - "Access-Control-Allow-Origin", "*"] - (Httpd.Variable - (Stringstream.switch_after 131072 - (Stringstream.seq id_block_and_padding (Stringstream.make message_stream)) - Stringstream.empty)) - -let counter = ref 0 -let api_tap_sink r = - let params = Httpd.parse_urlencoded (Httpd.string_of_content r.Httpd.req_body.Httpd.content) in - (* let stream_id = List.assoc "metadata.id" params in *) - match List.assoc "metadata.type" params with - | Some "send" -> - (match List.assoc "data" params with - | Some data_str -> - let data = Json.of_string data_str in - counter := 1 + !counter; - Printf.printf "Data: %d %s\n%!" !counter (Json.to_string data); - Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content) - | _ -> Httpd.http_error_html 406 "Bad data parameter" []) - | _ -> Httpd.http_error_html 406 "Unsupported metadata.type" [] - -let api_tap r = - match r.Httpd.verb with - | "GET" -> api_tap_source r - | "POST" -> api_tap_sink r - | _ -> Httpd.http_error_html 400 "Unsupported tap method" [] - -let register_api_hooks () = - List.iter register_dispatcher - ["/_/server_stats", api_server_stats; - "/_/tap", api_tap] - let init () = - register_api_hooks (); + register_dispatcher ("/_/server_stats", api_server_stats); ignore (Util.create_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start) diff --git a/ui_relay.ml b/ui_relay.ml new file mode 100644 index 0000000..21faf3a --- /dev/null +++ b/ui_relay.ml @@ -0,0 +1,82 @@ +(* Copyright 2012 Tony Garnock-Jones . *) + +(* This file is part of Ocamlmsg. *) + +(* Ocamlmsg is free software: you can redistribute it and/or modify it *) +(* under the terms of the GNU General Public License as published by the *) +(* Free Software Foundation, either version 3 of the License, or (at your *) +(* option) any later version. *) + +(* Ocamlmsg is distributed in the hope that it will be useful, but *) +(* WITHOUT ANY WARRANTY; without even the implied warranty of *) +(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) +(* General Public License for more details. *) + +(* You should have received a copy of the GNU General Public License *) +(* along with Ocamlmsg. If not, see . *) + +type outbound_message = + | Data of Sexp.t + | Heartbeat + | Shutdown + +let rec message_stream ch = + let deliver_payload payload = + Some (Printf.sprintf "%d;%s;" (String.length payload) payload, + true, + Stringstream.make (message_stream ch)) + in + let deliver_sexp_chunk sexp = deliver_payload (Json.to_string (Sexpjson.json_of_sexp sexp)) in + fun () -> + match Squeue.pop ch with + | Data sexp -> deliver_sexp_chunk sexp + | Heartbeat -> deliver_payload "" + | Shutdown -> None + +let rec api_tap_source id r = + let ch = Squeue.create 10 in + let handle_message n sexp = Squeue.add (Data sexp) ch in + let n = Node.make "http_tap" handle_message in + if not (Node.bind (id, n)) + then Httpd.http_error_html 500 "Internal ID collision" [] + else + let id_block_and_padding = Stringstream.const_flush (id ^ ";" ^ String.make 2048 'h' ^ ";") in + Httpd.add_completion_callback + (Httpd.resp_generic 200 "Streaming" + [Httpd.text_content_type_header; + "Access-Control-Allow-Origin", "*"] + (Httpd.Variable + (Stringstream.switch_after 131072 + (Stringstream.seq id_block_and_padding (Stringstream.make (message_stream ch))) + Stringstream.empty))) + (fun _ -> + Node.unbind_all n; + Squeue.add Shutdown ch) + +let api_tap_sink irrelevant_id r = + let params = Httpd.parse_urlencoded (Httpd.string_of_content r.Httpd.req_body.Httpd.content) in + (* let stream_id = List.assoc "metadata.id" params in *) + match Httpd.find_param "metadata.type" params with + | Some (Some "send") -> + (match Httpd.find_param "data" params with + | Some (Some data_str) -> + let data = + (try Sexpjson.sexp_of_json (Json.of_string data_str) + with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in + (match Message.message_of_sexp data with + | Message.Post (Sexp.Str name, body, token) -> + Node.send_ignore name body; + Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content) + | _ -> + Httpd.http_error_html 406 "Message not understood" []) + | _ -> Httpd.http_error_html 406 "Bad data parameter" []) + | _ -> Httpd.http_error_html 406 "Unsupported metadata.type" [] + +let api_tap id r = + match r.Httpd.verb with + | "GET" -> api_tap_source id r + | "POST" -> api_tap_sink id r + | _ -> Httpd.http_error_html 400 "Unsupported tap method" [] + +let init () = + Ui_main.register_dispatcher ("/_/tap", api_tap)