From b4c0589777808d1130d14fa281e1181787bdac35 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 7 May 2012 06:31:59 -0400 Subject: [PATCH] Lwtize the UI --- hop_server.ml | 6 +- html.ml | 28 ++-- httpd.ml | 371 ++++++++++++++++++++++++++---------------------- httpd_file.ml | 20 +-- relay.ml | 12 +- sexp.ml | 56 +++----- sexpjson.ml | 33 +++-- streamutil.ml | 29 ++++ stringstream.ml | 84 ----------- ui_main.ml | 5 +- ui_relay.ml | 119 ++++++++-------- util.ml | 11 ++ 12 files changed, 381 insertions(+), 393 deletions(-) create mode 100644 streamutil.ml delete mode 100644 stringstream.ml diff --git a/hop_server.ml b/hop_server.ml index e21e0cc..41f5e61 100644 --- a/hop_server.ml +++ b/hop_server.ml @@ -48,11 +48,11 @@ lwt _ = lwt () = Meta.init () in hook_log (); ignore (Amqp_relay.init ()); - (* Ui_main.init (); - Ui_relay.init (); *) + ignore (Ui_main.init ()); + ignore (Ui_relay.init ()); ignore (Relay.init ()); lwt () = Server_control.run_until "AMQP ready" in - (* Server_control.run_until "HTTP ready"; *) + lwt () = Server_control.run_until "HTTP ready" in lwt () = Server_control.run_until "Hop ready" in if Server_control.is_running () then (lwt () = create_ready_file () in diff --git a/html.ml b/html.ml index 45f8c48..1494a8b 100644 --- a/html.ml +++ b/html.ml @@ -78,18 +78,22 @@ and string_of_html c = | Text str -> html_escape str -let rec stream_of_html_contents cs = Stringstream.map stream_of_html cs -and stream_of_html c = - Stringstream.make (fun () -> +let html_generator c yield = + let rec o c = match c with - | Tag (label, attrs, [], true) -> - Some (html_open_tag_string label attrs, false, Stringstream.empty) - | Tag (label, attrs, contents, _) -> - Some (html_open_tag_string label attrs, false, - Stringstream.seq - (stream_of_html_contents contents) (Stringstream.const (""))) - | Text str -> - Some (str, false, Stringstream.empty)) + | Tag (label, attrs, [], true) -> + yield (html_open_tag_string label attrs) + | Tag (label, attrs, contents, _) -> + lwt () = yield (html_open_tag_string label attrs) in + lwt () = Lwt_list.iter_s o contents in + yield ("") + | Text str -> + yield str + in o c + +let stream_of_html c = Streamutil.stream_generator (html_generator c) let stream_of_html_doc d = - Stringstream.seq (Stringstream.const "") (stream_of_html (tag_of_document d)) + Streamutil.stream_generator (fun yield -> + lwt () = yield "" in + html_generator (tag_of_document d) yield) diff --git a/httpd.ml b/httpd.ml index e5189f1..1986e8f 100644 --- a/httpd.ml +++ b/httpd.ml @@ -15,39 +15,41 @@ (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) -open Unix +open Lwt +open Hof type version = [`HTTP_1_0 | `HTTP_1_1] type resp_version = [version | `SAME_AS_REQUEST] -type content = Fixed of string | Variable of Stringstream.t -type completion = Completion_normal | Completion_error +type content = Fixed of string | Variable of string Lwt_stream.t type body = { - headers: (string * string) list; - content: content - } + headers: (string * string) list; + content: content +} let empty_content = Fixed "" let empty_body = {headers = []; content = empty_content} type req = { - verb: string; - path: string; - query: (string * string option) list; - req_version: version; - req_body: body - } + verb: string; + path: string; + query: (string * string option) list; + req_version: version; + req_body: body +} type resp = { - resp_version: resp_version; - status: int; - reason: string; - resp_body: body; - completion_callbacks: (completion -> unit) list - } + resp_version: resp_version; + status: int; + reason: string; + resp_body: body; + completion_callbacks: (unit -> unit Lwt.t) list +} exception HTTPError of (int * string * body) +exception HTTPSyntaxError of string + let html_content_type = "text/html;charset=utf-8" let text_content_type = "text/plain;charset=utf-8" @@ -62,18 +64,20 @@ let disable_cache_headers () = "Cache-Control", "no-cache, must-revalidate, max-age=0"; "Pragma", "no-cache"] -let add_headers headers resp = +let add_headers headers resp_thr = + lwt resp = resp_thr in let b = resp.resp_body in - {resp with resp_body = {b with headers = b.headers @ headers}} + return {resp with resp_body = {b with headers = b.headers @ headers}} let add_disable_cache_headers resp = add_headers (disable_cache_headers ()) resp let add_date_header resp = add_headers ["Date", Httpd_date.http_gmtime (Unix.time ())] resp -let add_completion_callback cb resp = - {resp with completion_callbacks = cb :: resp.completion_callbacks} +let add_completion_callback cb resp_thr = + lwt resp = resp_thr in + return {resp with completion_callbacks = cb :: resp.completion_callbacks} -let http_error code reason body = raise (HTTPError (code, reason, body)) +let http_error code reason body = raise_lwt (HTTPError (code, reason, body)) let http_error_plain code reason = http_error code reason @@ -92,12 +96,18 @@ let html_error_doc code reason extra_body = let http_error_html code reason extra_body = http_error_html_doc code reason (html_error_doc code reason extra_body) +let trap_syntax_errors thunk = + try + return (thunk ()) + with HTTPSyntaxError message -> + http_error_html 400 message [] + let resp_generic code reason headers content = - { resp_version = `SAME_AS_REQUEST; - status = code; - reason = reason; - resp_body = {headers = headers; content = content}; - completion_callbacks = [] } + return { resp_version = `SAME_AS_REQUEST; + status = code; + reason = reason; + resp_body = {headers = headers; content = content}; + completion_callbacks = [] } let resp_generic_ok headers content = resp_generic 200 "OK" headers content @@ -132,9 +142,9 @@ let resp_redirect_permanent new_path = let escape_url_char c = match c with - | '%' -> Some (fun (s, pos) -> ("%25", pos + 1)) - | ' ' -> Some (fun (s, pos) -> ("%20", pos + 1)) - | _ -> None + | '%' -> Some (fun (s, pos) -> ("%25", pos + 1)) + | ' ' -> Some (fun (s, pos) -> ("%20", pos + 1)) + | _ -> None let url_escape s = Util.strsub escape_url_char s let unescape_url_hex_code (s, pos) = @@ -144,109 +154,117 @@ let unescape_url_hex_code (s, pos) = let v1 = Util.unhex_char (String.get s (pos + 1)) in let v2 = Util.unhex_char (String.get s (pos + 2)) in if v1 = -1 || v2 = -1 - then http_error_html 400 ("Bad percent escaping: '"^String.sub s pos 3^"'") [] + then raise (HTTPSyntaxError ("Bad percent escaping: '"^String.sub s pos 3^"'")) else (String.make 1 (Char.chr (v1 * 16 + v2)), pos + 3) - else http_error_html 400 ("Bad percent escaping: '"^String.sub s pos (len - pos)^"'") [] + else raise (HTTPSyntaxError ("Bad percent escaping: '"^String.sub s pos (len - pos)^"'")) let unescape_url_char c = match c with - | '%' -> Some unescape_url_hex_code - | _ -> None + | '%' -> Some unescape_url_hex_code + | _ -> None -let url_unescape s = Util.strsub unescape_url_char s +let url_unescape s = + trap_syntax_errors (fun () -> Util.strsub unescape_url_char s) let render_header cout (k, v) = - output_string cout k; - output_string cout ": "; - output_string cout v; - output_string cout "\r\n" + lwt () = Lwt_io.write cout k in + lwt () = Lwt_io.write cout ": " in + lwt () = Lwt_io.write cout v in + Lwt_io.write cout "\r\n" -let render_chunk cout (chunk, should_flush) = - (match chunk with - | "" -> () - | _ -> - output_string cout (Printf.sprintf "%x\r\n" (String.length chunk)); - output_string cout chunk; - output_string cout "\r\n"); - if should_flush then flush cout else () +let render_chunk cout chunk = + match chunk with + | "" -> return () + | _ -> + lwt () = Lwt_io.write cout (Printf.sprintf "%x\r\n" (String.length chunk)) in + lwt () = Lwt_io.write cout chunk in + Lwt_io.write cout "\r\n" let render_fixed_content cout s headers_only = - render_header cout ("Content-Length", string_of_int (String.length s)); - output_string cout "\r\n"; - if headers_only then () else output_string cout s + lwt () = render_header cout ("Content-Length", string_of_int (String.length s)) in + lwt () = Lwt_io.write cout "\r\n" in + if headers_only then return () else Lwt_io.write cout s let string_of_content c = match c with - | Fixed s -> s - | Variable s -> Stringstream.to_string s + | Fixed s -> return s + | Variable s -> Streamutil.stream_to_string s let render_content cout v c headers_only = match c with - | Fixed s -> + | Fixed s -> render_fixed_content cout s headers_only - | Variable s -> + | Variable s -> match v with - | `HTTP_1_0 -> - render_fixed_content cout (Stringstream.to_string s) headers_only - | `HTTP_1_1 -> + | `HTTP_1_0 -> + lwt str = Streamutil.stream_to_string s in + render_fixed_content cout str headers_only + | `HTTP_1_1 -> if headers_only - then (output_string cout "\r\n") - else (render_header cout ("Transfer-Encoding", "chunked"); - output_string cout "\r\n"; - Stringstream.iter (render_chunk cout) s; - output_string cout "0\r\n\r\n") + then (Lwt_io.write cout "\r\n") + else (lwt () = render_header cout ("Transfer-Encoding", "chunked") in + lwt () = Lwt_io.write cout "\r\n" in + lwt () = Lwt_stream.iter_s (render_chunk cout) s in + Lwt_io.write cout "0\r\n\r\n") let render_body cout v b headers_only = - List.iter (render_header cout) b.headers; + lwt () = Lwt_list.iter_s (render_header cout) b.headers in render_content cout v b.content headers_only let string_of_version v = match v with - | `HTTP_1_0 -> "HTTP/1.0" - | `HTTP_1_1 -> "HTTP/1.1" + | `HTTP_1_0 -> "HTTP/1.0" + | `HTTP_1_1 -> "HTTP/1.1" let version_of_string v = match v with - | "HTTP/1.0" -> `HTTP_1_0 - | "HTTP/1.1" -> `HTTP_1_1 - | _ -> http_error_html 400 "Invalid HTTP version" [] + | "HTTP/1.0" -> `HTTP_1_0 + | "HTTP/1.1" -> `HTTP_1_1 + | _ -> raise (HTTPSyntaxError "Invalid HTTP version") let render_req cout r = - output_string cout (r.verb^" "^url_escape r.path^" "^string_of_version r.req_version^"\r\n"); + lwt () = Lwt_io.write cout + (r.verb^" "^url_escape r.path^" "^string_of_version r.req_version^"\r\n") in render_body cout r.req_version r.req_body false let render_resp cout req_version req_verb r = let resp_version = (match r.resp_version with - | `SAME_AS_REQUEST -> req_version - | #version as v -> v) + | `SAME_AS_REQUEST -> req_version + | #version as v -> v) in - output_string cout - (string_of_version resp_version^" "^string_of_int r.status^" "^r.reason^"\r\n"); + lwt () = Lwt_io.write cout + (string_of_version resp_version^" "^string_of_int r.status^" "^r.reason^"\r\n") in render_body cout resp_version r.resp_body (match req_verb with "HEAD" -> true | _ -> false) let split_query p = match Str.bounded_split (Str.regexp "\\?") p 2 with - | path :: query :: _ -> (path, query) - | path :: [] -> (path, "") - | [] -> ("", "") + | path :: query :: _ -> (path, query) + | path :: [] -> (path, "") + | [] -> ("", "") let parse_urlencoded_binding s = match Str.bounded_split (Str.regexp "=") s 2 with - | k :: v :: _ -> (url_unescape k, Some (url_unescape v)) - | k :: [] -> (url_unescape k, None) - | [] -> ("", None) + | k :: v :: _ -> + lwt k' = url_unescape k in + lwt v' = url_unescape v in + return (k', Some v') + | k :: [] -> + lwt k' = url_unescape k in + return (k', None) + | [] -> + return ("", None) let parse_urlencoded q = let pieces = Str.split (Str.regexp "&") q in - List.map parse_urlencoded_binding pieces + Lwt_list.map_s parse_urlencoded_binding pieces let find_header' name hs = let lc_name = String.lowercase name in let rec search hs = match hs with - | [] -> raise Not_found - | (k, v) :: hs' -> + | [] -> raise Not_found + | (k, v) :: hs' -> if String.lowercase k = lc_name then v else search hs' @@ -260,134 +278,151 @@ let find_param name params = try Some (List.assoc name params) with Not_found -> None let input_crlf cin = - let line = input_line cin in + lwt line = Lwt_io.read_line cin in let len = String.length line in if len > 0 && String.get line (len - 1) = '\r' - then String.sub line 0 (len - 1) - else line + then return (String.sub line 0 (len - 1)) + else return line let rec parse_headers cin = - match Str.bounded_split (Str.regexp ":") (input_crlf cin) 2 with - | [] -> - [] - | [k; v] -> - (k, Util.strip v) :: parse_headers cin - | k :: _ -> + lwt header_line = input_crlf cin in + match Str.bounded_split (Str.regexp ":") header_line 2 with + | [] -> + return [] + | [k; v] -> + lwt headers = parse_headers cin in + return ((k, Util.strip v) :: headers) + | k :: _ -> http_error_html 400 ("Bad header: "^k) [] let parse_chunks cin = fun () -> - let hexlen_str = input_crlf cin in + lwt hexlen_str = input_crlf cin in let chunk_len = Util.unhex hexlen_str in let buffer = String.make chunk_len '\000' in - really_input cin buffer 0 chunk_len; - (if input_crlf cin <> "" then http_error_html 400 "Invalid chunk boundary" [] else ()); - if chunk_len = 0 then None else Some (buffer, false) + lwt () = Lwt_io.read_into_exactly cin buffer 0 chunk_len in + lwt chunk_terminator = input_crlf cin in + if chunk_terminator <> "" + then http_error_html 400 "Invalid chunk boundary" [] + else + if chunk_len = 0 + then return None + else return (Some buffer) let parse_body cin = - let headers = parse_headers cin in + lwt headers = parse_headers cin in match find_header "Transfer-Encoding" headers with - | None | Some "identity" -> + | None | Some "identity" -> (match find_header "Content-Length" headers with - | None -> + | None -> (* http_error_html 411 "Length required" [] *) - {headers = headers; content = empty_content} - | Some length_str -> + return {headers = headers; content = empty_content} + | Some length_str -> let length = int_of_string length_str in let buffer = String.make length '\000' in - really_input cin buffer 0 length; - {headers = headers; content = Fixed buffer}) - | Some "chunked" -> - {headers = headers; content = Variable (Stringstream.from_iter (parse_chunks cin))} - | Some unsupported -> + lwt () = Lwt_io.read_into_exactly cin buffer 0 length in + return {headers = headers; content = Fixed buffer}) + | Some "chunked" -> + return {headers = headers; content = Variable (Lwt_stream.from (parse_chunks cin))} + | Some unsupported -> http_error_html 400 ("Unsupported Transfer-Encoding: "^unsupported) [] let rec parse_req cin spurious_newline_credit = - match Str.bounded_split (Str.regexp " ") (input_crlf cin) 3 with - | [] -> + lwt req_line = input_crlf cin in + parse_req' cin spurious_newline_credit req_line +and parse_req' cin spurious_newline_credit req_line = + match Str.bounded_split (Str.regexp " ") req_line 3 with + | [] -> (* HTTP spec requires that we ignore leading CRLFs. We choose to do so, up to a point. *) if spurious_newline_credit = 0 then http_error_html 400 "Bad request: too many leading CRLFs" [] else parse_req cin (spurious_newline_credit - 1) - | [verb; path; version_str] -> + | [verb; path; version_str] -> let version = version_of_string version_str in - let body = parse_body cin in + lwt body = parse_body cin in let (path, query) = split_query path in - let path = url_unescape path in - let query = parse_urlencoded query in - { verb = verb; path = path; query = query; req_version = version; req_body = body } - | _ -> http_error_html 400 "Bad request line" [] + lwt path = url_unescape path in + lwt query = parse_urlencoded query in + return { verb = verb; path = path; query = query; req_version = version; req_body = body } + | _ -> + http_error_html 400 "Bad request line" [] let discard_unread_body req = match req.req_body.content with - | Fixed _ -> () - | Variable s -> Stringstream.iter (fun v -> ()) s (* force chunks to be read *) + | Fixed _ -> + return () + | Variable s -> + Lwt_stream.junk_while (fun _ -> true) s (* force chunks to be read *) let connection_keepalive req = find_header "Connection" req.req_body.headers = Some "keep-alive" let main handle_req (s, peername) = - let cin = in_channel_of_descr s in - let cout = out_channel_of_descr s in - (try - (try - let rec request_loop () = - let req = parse_req cin 512 in - let resp = handle_req req in + let cin = Lwt_io.of_fd Lwt_io.input s in + let cout = Lwt_io.of_fd Lwt_io.output s in - let completion_mutex = Mutex.create () in - let completion = ref None in - let set_completion v = - Util.with_mutex0 completion_mutex (fun () -> - match !completion with - | None -> - completion := Some v; - List.iter (fun cb -> cb v) resp.completion_callbacks - | Some _ -> ()) - in + let pending_completion_callbacks = Queue.create () in + let fire_pending_callbacks () = + while_lwt not (Queue.is_empty pending_completion_callbacks) do + let cbs = Queue.take pending_completion_callbacks in + ignore (Lwt_list.iter_s (fun cb -> cb ()) cbs); + return () + done + in - (* Here we spawn a thread that just watches the socket to see - if it either becomes active or closes during rendering of the - response, so that we can make decisions based on this in any - eventual streaming response generators. In particular, if - we're implementing some kind of XHR streaming andthe client - goes away, we want to abandon the streaming as soon as - possible. *) - let input_waiter () = - try - (let (r, w, e) = Unix.select [s] [] [s] (-1.0) in - set_completion (if r <> [] then Completion_normal else Completion_error)) - with _ -> set_completion Completion_error - in - ignore (Thread.create input_waiter ()); - - (try - render_resp cout req.req_version req.verb resp; - discard_unread_body req; - flush cout; - set_completion Completion_normal + let next_request () = + (try_lwt + (try_lwt + lwt req = parse_req cin 512 in + lwt () = fire_pending_callbacks () in + return (Some req) with e -> - set_completion Completion_error; - raise e); + lwt () = fire_pending_callbacks () in + raise_lwt e) + with End_of_file -> return None) + in - if connection_keepalive req then request_loop () else () - in + let request_stream = Lwt_stream.from next_request in + + let rec request_loop () = + match_lwt Lwt_stream.get request_stream with + | None -> return () + | Some req -> + lwt resp = handle_req req in + + (* Watch in the background for a new request arriving, and let + the currently-streaming (well, the about-to-be-streaming) + response know about it so it can decide to terminate if it + likes. *) + Queue.add resp.completion_callbacks pending_completion_callbacks; + ignore (Lwt_stream.peek request_stream); + + lwt () = render_resp cout req.req_version req.verb resp in + lwt () = discard_unread_body req in + lwt () = Lwt_io.flush cout in + lwt () = fire_pending_callbacks () in + + if connection_keepalive req then request_loop () else return () + in + + lwt () = + try_lwt request_loop () with - | End_of_file -> - () - | HTTPError (code, reason, body) -> + | HTTPError (code, reason, body) -> render_resp cout `HTTP_1_0 "GET" (* ugh this should probably be done better *) { resp_version = `HTTP_1_0; status = code; reason = reason; resp_body = body; - completion_callbacks = [] }) - with - | Sys_error message -> - Log.info "Sys_error in httpd handler" [Sexp.Str message] - | exn -> - Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)]); - (try flush cout with _ -> ()); - close s + completion_callbacks = [] } + | Sys_error message -> + Log.info "Sys_error in httpd handler" [Sexp.Str message] + | exn -> + Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)] + in + + lwt () = fire_pending_callbacks () in + lwt () = (try_lwt Lwt_io.flush cout with _ -> return ()) in + Lwt_unix.close s diff --git a/httpd_file.ml b/httpd_file.ml index e1d27a4..ccd286c 100644 --- a/httpd_file.ml +++ b/httpd_file.ml @@ -15,6 +15,9 @@ (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) +open Lwt +open Hof + let visible_path_component s = match s with | "" -> false @@ -46,19 +49,14 @@ let analyze_path p = let p = if p = "" then "." else p in (p, extension_map ext) -let read_and_close_file handle = +let stream_file handle = let buflen = 4096 in let buffer = String.make buflen '\000' in fun () -> - let count = - (try - input handle buffer 0 buflen - with e -> (close_in handle; raise e)) - in + let count = input handle buffer 0 buflen in if count > 0 - then Some (String.sub buffer 0 count, false) - else (close_in handle; - None) + then return (Some (String.sub buffer 0 count)) + else return None let rec read_dir dirhandle = try @@ -80,9 +78,11 @@ let render_directory_listing path = Html.unclosed_tag "hr" [] []] let resp_raw_file mime_type path = + let handle = open_in_bin path in Httpd.resp_generic_ok [Httpd.content_type_header_name, mime_type] - (Httpd.Variable (Stringstream.from_iter (read_and_close_file (open_in_bin path)))) + (Httpd.Variable (Lwt_stream.from (stream_file handle))) + |> Httpd.add_completion_callback (fun () -> close_in handle; return ()) let resp_file raw_path = let (path, mime_type) = analyze_path raw_path in diff --git a/relay.ml b/relay.ml index 0cdea74..64295c9 100644 --- a/relay.ml +++ b/relay.ml @@ -50,12 +50,12 @@ let dispatch_message n ch m = send_error ch "Message not understood" (Message.sexp_of_message m) let issue_banner cin cout = - lwt () = output_sexp_and_flush cout (Arr [Str "hop"; Str ""]) in - lwt () = - output_sexp_and_flush cout - (Message.subscribe (Str (Node.local_container_name()), - Str "", Str "", - Str "", Str "")) in + lwt () = output_sexp cout (Arr [Str "hop"; Str ""]) in + lwt () = Lwt_io.flush cout in + lwt () = output_sexp cout (Message.subscribe (Str (Node.local_container_name()), + Str "", Str "", + Str "", Str "")) in + lwt () = Lwt_io.flush cout in return true let relay_boot (peername, cin, cout) = return (peername, Lwt_mutex.create (), cin, cout) diff --git a/sexp.ml b/sexp.ml index 039b588..05e5782 100644 --- a/sexp.ml +++ b/sexp.ml @@ -28,38 +28,26 @@ and t = | Hint of display_hint_t | Arr of t list -let _output_str ch s = - lwt () = write ch (string_of_int (String.length s)) in - lwt () = write_char ch ':' in - write ch s - -let rec output_sexp ch x = - match x with - | Str s -> - _output_str ch s - | Hint {hint = h; body = b} -> - lwt () = write_char ch '[' in - lwt () = output_sexp ch h in - lwt () = write_char ch ']' in - output_sexp ch b - | Arr xs -> - lwt () = write_char ch '(' in - lwt () = Lwt_list.iter_s (output_sexp ch) xs in - write_char ch ')' - -let rec stream_of_sexp x = - Stringstream.make (fun () -> +let generic_output_sexp write x = + let rec walk x = match x with - | Str s -> - Some (string_of_int (String.length s) ^ ":", false, Stringstream.const s) - | Hint {hint = h; body = b} -> - Some ("[", false, - Stringstream.seq (stream_of_sexp h) - (Stringstream.seq (Stringstream.const "]") - (stream_of_sexp b))) - | Arr xs -> - Some ("(", false, - Stringstream.seq (Stringstream.map stream_of_sexp xs) (Stringstream.const ")"))) + | Str s -> + lwt () = write (string_of_int (String.length s)) in + lwt () = write ":" in + write s + | Hint {hint = h; body = b} -> + lwt () = write "[" in + lwt () = walk h in + lwt () = write "]" in + walk b + | Arr xs -> + lwt () = write "(" in + lwt () = Lwt_list.iter_s walk xs in + write ")" + in walk x + +let output_sexp ch x = generic_output_sexp (write ch) x +let stream_of_sexp x = Streamutil.stream_generator (fun yield -> generic_output_sexp yield x) let write_char_escaped ch c = if c = '\"' @@ -92,10 +80,6 @@ let rec output_sexp_human ch x = xs') in write_char ch ')' -let output_sexp_and_flush ch x = - lwt () = output_sexp ch x in - flush ch - let char_numeric c = '0' <= c && c <= '9' let char_whitespace c = c <= ' ' @@ -148,7 +132,7 @@ let parse b = (fun () -> return (Ibuffer.next_char b)) (fun count -> return (Ibuffer.next_chars b count)) let sexp_of_string s = parse (Ibuffer.of_string s) -let string_of_sexp x = Stringstream.to_string (stream_of_sexp x) +let string_of_sexp x = Streamutil.stream_to_string (stream_of_sexp x) let assoc' key v = match v with diff --git a/sexpjson.ml b/sexpjson.ml index 5f1cf5a..66d06a7 100644 --- a/sexpjson.ml +++ b/sexpjson.ml @@ -10,18 +10,21 @@ let rec sexp_of_json j = | Json.Flg f -> Hint {hint = Str "bool"; body = Str (string_of_bool f)} | Json.Nil -> Hint {hint = Str "null"; body = Arr []} -let rec json_of_sexp x = - match x with - | Hint {hint = Str "num"; body = Str n} -> Json.Num (float_of_string n) - | Str s -> Json.Str s - | Arr xs -> Json.Arr (List.map json_of_sexp xs) - | Hint {hint = Str "obj"; body = Arr kvs} -> - Json.Rec (List.map - (fun kv -> - (match kv with - | Arr [Str k; v] -> (k, json_of_sexp v) - | _ -> syntax_error "Bad JSON-SEXP key-value")) - kvs) - | Hint {hint = Str "bool"; body = Str bs} -> Json.Flg (bool_of_string bs) - | Hint {hint = Str "null"; body = Arr []} -> Json.Nil - | Hint {hint = h; body = b} -> Json.Rec ["_hint", json_of_sexp h; "_body", json_of_sexp b] +let json_of_sexp x = + let rec walk x = + match x with + | Hint {hint = Str "num"; body = Str n} -> Json.Num (float_of_string n) + | Str s -> Json.Str s + | Arr xs -> Json.Arr (List.map walk xs) + | Hint {hint = Str "obj"; body = Arr kvs} -> + Json.Rec (List.map + (fun kv -> + (match kv with + | Arr [Str k; v] -> (k, walk v) + | _ -> raise (Syntax_error "Bad JSON-SEXP key-value"))) + kvs) + | Hint {hint = Str "bool"; body = Str bs} -> Json.Flg (bool_of_string bs) + | Hint {hint = Str "null"; body = Arr []} -> Json.Nil + | Hint {hint = h; body = b} -> Json.Rec ["_hint", walk h; "_body", walk b] + in + Lwt.wrap1 walk x diff --git a/streamutil.ml b/streamutil.ml new file mode 100644 index 0000000..821beee --- /dev/null +++ b/streamutil.ml @@ -0,0 +1,29 @@ +(* Copyright 2012 Tony Garnock-Jones . *) + +(* This file is part of Hop. *) + +(* Hop is free software: you can redistribute it and/or modify it *) +(* under the terms of the GNU General Public License as published by the *) +(* Free Software Foundation, either version 3 of the License, or (at your *) +(* option) any later version. *) + +(* Hop is distributed in the hope that it will be useful, but *) +(* WITHOUT ANY WARRANTY; without even the implied warranty of *) +(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) +(* General Public License for more details. *) + +(* You should have received a copy of the GNU General Public License *) +(* along with Hop. If not, see . *) + +open Lwt + +let stream_to_string s = + lwt pieces = Lwt_stream.to_list s in + return (String.concat "" pieces) + +let stream_generator f = + let mbox = Lwt_mvar.create_empty () in + let yield v = Lwt_mvar.put mbox (Some v) in + ignore (lwt () = f yield in + Lwt_mvar.put mbox None); + Lwt_stream.from (fun () -> Lwt_mvar.take mbox) diff --git a/stringstream.ml b/stringstream.ml deleted file mode 100644 index dd87283..0000000 --- a/stringstream.ml +++ /dev/null @@ -1,84 +0,0 @@ -(* Copyright 2012 Tony Garnock-Jones . *) - -(* This file is part of Hop. *) - -(* Hop is free software: you can redistribute it and/or modify it *) -(* under the terms of the GNU General Public License as published by the *) -(* Free Software Foundation, either version 3 of the License, or (at your *) -(* option) any later version. *) - -(* Hop is distributed in the hope that it will be useful, but *) -(* WITHOUT ANY WARRANTY; without even the implied warranty of *) -(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) -(* General Public License for more details. *) - -(* You should have received a copy of the GNU General Public License *) -(* along with Hop. If not, see . *) - -type t = Stream of (unit -> (string * bool * t) option) - -let make f = Stream f - -let run (Stream f) = f () - -let empty = Stream (fun () -> None) -let const v = Stream (fun () -> Some (v, false, empty)) -let const_flush v = Stream (fun () -> Some (v, true, empty)) - -let flush_before s = Stream (fun () -> Some ("", true, s)) -let flush = flush_before empty - -let rec seq s1 s2 = - Stream (fun () -> - match run s1 with - | None -> run s2 - | Some (v, f, k) -> Some (v, f, seq k s2)) - -let rec from_list vs = - Stream (fun () -> - match vs with - | [] -> None - | v :: vs -> Some (v, false, (from_list vs))) - -let rec map f vs = - Stream (fun () -> - match vs with - | [] -> None - | v :: vs -> run (seq (f v) (map f vs))) - -let rec from_iter f = - let cache = ref None in - Stream (fun () -> - match !cache with - | Some v -> v - | None -> - let result = - (match f () with - | Some (str, should_flush) -> Some (str, should_flush, from_iter f) - | None -> None) - in - cache := Some result; - result) - -let rec iter f s = - match run s with - | None -> () - | Some (v, flush, k) -> (f (v, flush); iter f k) - -let rec to_list s = - match run s with - | None -> [] - | Some (v, _, k) -> v :: to_list k - -let rec to_string s = - String.concat "" (to_list s) - -let rec switch_after' on_boundary limit s1 s2 = - if limit > 0 || not on_boundary - then Stream (fun () -> - match run s1 with - | None -> None - | Some (v, f, k) -> Some (v, f, switch_after' f (limit - String.length v) k s2)) - else s2 - -let switch_after limit s1 s2 = switch_after' true limit s1 s2 diff --git a/ui_main.ml b/ui_main.ml index 5b9d4d5..75e4380 100644 --- a/ui_main.ml +++ b/ui_main.ml @@ -15,6 +15,7 @@ (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) +open Lwt open Html open Hof open Datastructures @@ -56,7 +57,7 @@ let handle_req id r = let cleanup_req id exn = match Node.lookup id with | Some n -> Node.unbind_all n - | None -> () + | None -> return () let start (s, peername) = let id = Node.name_of_string ("http-" ^ Uuid.create ()) in @@ -102,4 +103,4 @@ let init () = register_dispatcher ("/_/server_stats", api_server_stats); register_dispatcher ("/_/nodes", api_nodes); register_dispatcher ("/_/node/", api_node_info); - ignore (Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start) + Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start diff --git a/ui_relay.ml b/ui_relay.ml index 84cbecb..9a13faf 100644 --- a/ui_relay.ml +++ b/ui_relay.ml @@ -15,79 +15,84 @@ (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) +open Lwt open Hof -type outbound_message = - | Data of Sexp.t - | Heartbeat - | Shutdown - -let rec message_stream ch = - let deliver_payload payload = - Some (Printf.sprintf "%d;%s;" (String.length payload) payload, - true, - Stringstream.make (message_stream ch)) - in - let deliver_sexp_chunk sexp = deliver_payload (Json.to_string (Sexpjson.json_of_sexp sexp)) in - fun () -> - match Squeue.pop ch with - | Data sexp -> deliver_sexp_chunk sexp - | Heartbeat -> deliver_payload "" - | Shutdown -> None - let rec api_tap_source id r = - let ch = Squeue.create 10 in - let handle_message n sexp = Squeue.add (Data sexp) ch in + let mbox = Lwt_mvar.create + (Some (Message.subscribe (Sexp.Str (Node.local_container_name()), + Sexp.Str "", Sexp.Str "", + Sexp.Str "", Sexp.Str ""))) in + + let handle_message n sexp = Lwt_mvar.put mbox (Some sexp) in let n = Node.make "http_tap" handle_message in - if not (Node.bind (id, n)) - then Httpd.http_error_html 500 "Internal ID collision" [] - else - let id_block_and_padding = - Stringstream.const_flush (id.Node.label ^ ";" ^ String.make 2048 'h' ^ ";") in - handle_message n (Message.subscribe (Sexp.Str (Node.local_container_name()), - Sexp.Str "", Sexp.Str "", - Sexp.Str "", Sexp.Str "")); - Httpd.resp_generic 200 "Streaming" - [Httpd.text_content_type_header; - "Access-Control-Allow-Origin", "*"] - (Httpd.Variable - (Stringstream.switch_after 131072 - (Stringstream.seq id_block_and_padding (Stringstream.make (message_stream ch))) - Stringstream.empty)) - |> Httpd.add_disable_cache_headers - |> Httpd.add_date_header - |> Httpd.add_completion_callback - (fun _ -> - Node.unbind_all n; - Squeue.add Shutdown ch) + + let shutdown () = + lwt () = Node.unbind_all n in + Lwt_mvar.put mbox None + in + + let generator yield = + let body_counter = ref 0 in + let yield_and_count s = + body_counter := String.length s + !body_counter; + yield s + in + lwt () = yield_and_count (id.Node.label ^ ";" ^ String.make 2048 'h' ^ ";") in + let rec drain_mbox () = + match_lwt Lwt_mvar.take mbox with + | None -> return () + | Some sexp -> + lwt json = Sexpjson.json_of_sexp sexp in + let payload = Json.to_string json in + lwt () = yield_and_count (Printf.sprintf "%d;%s;" (String.length payload) payload) in + lwt () = + if !body_counter >= 131072 + then shutdown () + else return () + in + drain_mbox () + in drain_mbox () + in + + match_lwt Node.bind (id, n) with + | false -> Httpd.http_error_html 500 "Internal ID collision" [] + | true -> + Httpd.resp_generic 200 "Streaming" + [Httpd.text_content_type_header; + "Access-Control-Allow-Origin", "*"] + (Httpd.Variable (Streamutil.stream_generator generator)) + |> Httpd.add_disable_cache_headers + |> Httpd.add_date_header + |> Httpd.add_completion_callback shutdown let api_tap_sink irrelevant_id r = - let params = Httpd.parse_urlencoded (Httpd.string_of_content r.Httpd.req_body.Httpd.content) in - (* let stream_id = List.assoc "metadata.id" params in *) + lwt content = Httpd.string_of_content r.Httpd.req_body.Httpd.content in + lwt params = Httpd.parse_urlencoded content in match Httpd.find_param "metadata.type" params with - | Some (Some "send") -> + | Some (Some "send") -> (match Httpd.find_param "data" params with - | Some (Some data_str) -> - let data = - (try Sexpjson.sexp_of_json (Json.of_string data_str) - with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in + | Some (Some data_str) -> + lwt data = + (try return (Sexpjson.sexp_of_json (Json.of_string data_str)) + with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in (match Message.message_of_sexp data with - | Message.Post (Sexp.Str name, body, token) -> - Node.send_ignore' name body; + | Message.Post (Sexp.Str name, body, token) -> + lwt () = Node.send_ignore' name body in Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content) - | _ -> + | _ -> Httpd.http_error_html 406 "Message not understood" []) - | _ -> Httpd.http_error_html 406 "Bad data parameter" []) - | Some (Some "close") -> + | _ -> Httpd.http_error_html 406 "Bad data parameter" []) + | Some (Some "close") -> Httpd.resp_generic_ok [] Httpd.empty_content - | _ -> + | _ -> Httpd.http_error_html 406 "Unsupported metadata.type" [] let api_tap _ id r = match r.Httpd.verb with - | "GET" -> api_tap_source id r - | "POST" -> api_tap_sink id r - | _ -> Httpd.http_error_html 400 "Unsupported tap method" [] + | "GET" -> api_tap_source id r + | "POST" -> api_tap_sink id r + | _ -> Httpd.http_error_html 400 "Unsupported tap method" [] let init () = Ui_main.register_dispatcher ("/_/tap", api_tap) diff --git a/util.ml b/util.ml index d559b0a..c48d002 100644 --- a/util.ml +++ b/util.ml @@ -92,3 +92,14 @@ let unhex s = else loop (index + 1) (acc * 16 + unhex_char (String.get s index)) in loop 0 0 + +let stream_to_string s = + lwt pieces = Lwt_stream.to_list s in + return (String.concat "" pieces) + +let stream_generator f = + let mbox = Lwt_mvar.create_empty () in + let yield v = Lwt_mvar.put mbox (Some v) in + ignore (lwt () = f yield in + Lwt_mvar.put mbox None); + Lwt_stream.from (fun () -> Lwt_mvar.take mbox)