diff --git a/hop_server.ml b/hop_server.ml
index e21e0cc..41f5e61 100644
--- a/hop_server.ml
+++ b/hop_server.ml
@@ -48,11 +48,11 @@ lwt _ =
lwt () = Meta.init () in
hook_log ();
ignore (Amqp_relay.init ());
- (* Ui_main.init ();
- Ui_relay.init (); *)
+ ignore (Ui_main.init ());
+ ignore (Ui_relay.init ());
ignore (Relay.init ());
lwt () = Server_control.run_until "AMQP ready" in
- (* Server_control.run_until "HTTP ready"; *)
+ lwt () = Server_control.run_until "HTTP ready" in
lwt () = Server_control.run_until "Hop ready" in
if Server_control.is_running ()
then (lwt () = create_ready_file () in
diff --git a/html.ml b/html.ml
index 45f8c48..1494a8b 100644
--- a/html.ml
+++ b/html.ml
@@ -78,18 +78,22 @@ and string_of_html c =
| Text str ->
html_escape str
-let rec stream_of_html_contents cs = Stringstream.map stream_of_html cs
-and stream_of_html c =
- Stringstream.make (fun () ->
+let html_generator c yield =
+ let rec o c =
match c with
- | Tag (label, attrs, [], true) ->
- Some (html_open_tag_string label attrs, false, Stringstream.empty)
- | Tag (label, attrs, contents, _) ->
- Some (html_open_tag_string label attrs, false,
- Stringstream.seq
- (stream_of_html_contents contents) (Stringstream.const (""^label^">")))
- | Text str ->
- Some (str, false, Stringstream.empty))
+ | Tag (label, attrs, [], true) ->
+ yield (html_open_tag_string label attrs)
+ | Tag (label, attrs, contents, _) ->
+ lwt () = yield (html_open_tag_string label attrs) in
+ lwt () = Lwt_list.iter_s o contents in
+ yield (""^label^">")
+ | Text str ->
+ yield str
+ in o c
+
+let stream_of_html c = Streamutil.stream_generator (html_generator c)
let stream_of_html_doc d =
- Stringstream.seq (Stringstream.const "") (stream_of_html (tag_of_document d))
+ Streamutil.stream_generator (fun yield ->
+ lwt () = yield "" in
+ html_generator (tag_of_document d) yield)
diff --git a/httpd.ml b/httpd.ml
index e5189f1..1986e8f 100644
--- a/httpd.ml
+++ b/httpd.ml
@@ -15,39 +15,41 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see . *)
-open Unix
+open Lwt
+open Hof
type version = [`HTTP_1_0 | `HTTP_1_1]
type resp_version = [version | `SAME_AS_REQUEST]
-type content = Fixed of string | Variable of Stringstream.t
-type completion = Completion_normal | Completion_error
+type content = Fixed of string | Variable of string Lwt_stream.t
type body = {
- headers: (string * string) list;
- content: content
- }
+ headers: (string * string) list;
+ content: content
+}
let empty_content = Fixed ""
let empty_body = {headers = []; content = empty_content}
type req = {
- verb: string;
- path: string;
- query: (string * string option) list;
- req_version: version;
- req_body: body
- }
+ verb: string;
+ path: string;
+ query: (string * string option) list;
+ req_version: version;
+ req_body: body
+}
type resp = {
- resp_version: resp_version;
- status: int;
- reason: string;
- resp_body: body;
- completion_callbacks: (completion -> unit) list
- }
+ resp_version: resp_version;
+ status: int;
+ reason: string;
+ resp_body: body;
+ completion_callbacks: (unit -> unit Lwt.t) list
+}
exception HTTPError of (int * string * body)
+exception HTTPSyntaxError of string
+
let html_content_type = "text/html;charset=utf-8"
let text_content_type = "text/plain;charset=utf-8"
@@ -62,18 +64,20 @@ let disable_cache_headers () =
"Cache-Control", "no-cache, must-revalidate, max-age=0";
"Pragma", "no-cache"]
-let add_headers headers resp =
+let add_headers headers resp_thr =
+ lwt resp = resp_thr in
let b = resp.resp_body in
- {resp with resp_body = {b with headers = b.headers @ headers}}
+ return {resp with resp_body = {b with headers = b.headers @ headers}}
let add_disable_cache_headers resp = add_headers (disable_cache_headers ()) resp
let add_date_header resp = add_headers ["Date", Httpd_date.http_gmtime (Unix.time ())] resp
-let add_completion_callback cb resp =
- {resp with completion_callbacks = cb :: resp.completion_callbacks}
+let add_completion_callback cb resp_thr =
+ lwt resp = resp_thr in
+ return {resp with completion_callbacks = cb :: resp.completion_callbacks}
-let http_error code reason body = raise (HTTPError (code, reason, body))
+let http_error code reason body = raise_lwt (HTTPError (code, reason, body))
let http_error_plain code reason =
http_error code reason
@@ -92,12 +96,18 @@ let html_error_doc code reason extra_body =
let http_error_html code reason extra_body =
http_error_html_doc code reason (html_error_doc code reason extra_body)
+let trap_syntax_errors thunk =
+ try
+ return (thunk ())
+ with HTTPSyntaxError message ->
+ http_error_html 400 message []
+
let resp_generic code reason headers content =
- { resp_version = `SAME_AS_REQUEST;
- status = code;
- reason = reason;
- resp_body = {headers = headers; content = content};
- completion_callbacks = [] }
+ return { resp_version = `SAME_AS_REQUEST;
+ status = code;
+ reason = reason;
+ resp_body = {headers = headers; content = content};
+ completion_callbacks = [] }
let resp_generic_ok headers content =
resp_generic 200 "OK" headers content
@@ -132,9 +142,9 @@ let resp_redirect_permanent new_path =
let escape_url_char c =
match c with
- | '%' -> Some (fun (s, pos) -> ("%25", pos + 1))
- | ' ' -> Some (fun (s, pos) -> ("%20", pos + 1))
- | _ -> None
+ | '%' -> Some (fun (s, pos) -> ("%25", pos + 1))
+ | ' ' -> Some (fun (s, pos) -> ("%20", pos + 1))
+ | _ -> None
let url_escape s = Util.strsub escape_url_char s
let unescape_url_hex_code (s, pos) =
@@ -144,109 +154,117 @@ let unescape_url_hex_code (s, pos) =
let v1 = Util.unhex_char (String.get s (pos + 1)) in
let v2 = Util.unhex_char (String.get s (pos + 2)) in
if v1 = -1 || v2 = -1
- then http_error_html 400 ("Bad percent escaping: '"^String.sub s pos 3^"'") []
+ then raise (HTTPSyntaxError ("Bad percent escaping: '"^String.sub s pos 3^"'"))
else (String.make 1 (Char.chr (v1 * 16 + v2)), pos + 3)
- else http_error_html 400 ("Bad percent escaping: '"^String.sub s pos (len - pos)^"'") []
+ else raise (HTTPSyntaxError ("Bad percent escaping: '"^String.sub s pos (len - pos)^"'"))
let unescape_url_char c =
match c with
- | '%' -> Some unescape_url_hex_code
- | _ -> None
+ | '%' -> Some unescape_url_hex_code
+ | _ -> None
-let url_unescape s = Util.strsub unescape_url_char s
+let url_unescape s =
+ trap_syntax_errors (fun () -> Util.strsub unescape_url_char s)
let render_header cout (k, v) =
- output_string cout k;
- output_string cout ": ";
- output_string cout v;
- output_string cout "\r\n"
+ lwt () = Lwt_io.write cout k in
+ lwt () = Lwt_io.write cout ": " in
+ lwt () = Lwt_io.write cout v in
+ Lwt_io.write cout "\r\n"
-let render_chunk cout (chunk, should_flush) =
- (match chunk with
- | "" -> ()
- | _ ->
- output_string cout (Printf.sprintf "%x\r\n" (String.length chunk));
- output_string cout chunk;
- output_string cout "\r\n");
- if should_flush then flush cout else ()
+let render_chunk cout chunk =
+ match chunk with
+ | "" -> return ()
+ | _ ->
+ lwt () = Lwt_io.write cout (Printf.sprintf "%x\r\n" (String.length chunk)) in
+ lwt () = Lwt_io.write cout chunk in
+ Lwt_io.write cout "\r\n"
let render_fixed_content cout s headers_only =
- render_header cout ("Content-Length", string_of_int (String.length s));
- output_string cout "\r\n";
- if headers_only then () else output_string cout s
+ lwt () = render_header cout ("Content-Length", string_of_int (String.length s)) in
+ lwt () = Lwt_io.write cout "\r\n" in
+ if headers_only then return () else Lwt_io.write cout s
let string_of_content c =
match c with
- | Fixed s -> s
- | Variable s -> Stringstream.to_string s
+ | Fixed s -> return s
+ | Variable s -> Streamutil.stream_to_string s
let render_content cout v c headers_only =
match c with
- | Fixed s ->
+ | Fixed s ->
render_fixed_content cout s headers_only
- | Variable s ->
+ | Variable s ->
match v with
- | `HTTP_1_0 ->
- render_fixed_content cout (Stringstream.to_string s) headers_only
- | `HTTP_1_1 ->
+ | `HTTP_1_0 ->
+ lwt str = Streamutil.stream_to_string s in
+ render_fixed_content cout str headers_only
+ | `HTTP_1_1 ->
if headers_only
- then (output_string cout "\r\n")
- else (render_header cout ("Transfer-Encoding", "chunked");
- output_string cout "\r\n";
- Stringstream.iter (render_chunk cout) s;
- output_string cout "0\r\n\r\n")
+ then (Lwt_io.write cout "\r\n")
+ else (lwt () = render_header cout ("Transfer-Encoding", "chunked") in
+ lwt () = Lwt_io.write cout "\r\n" in
+ lwt () = Lwt_stream.iter_s (render_chunk cout) s in
+ Lwt_io.write cout "0\r\n\r\n")
let render_body cout v b headers_only =
- List.iter (render_header cout) b.headers;
+ lwt () = Lwt_list.iter_s (render_header cout) b.headers in
render_content cout v b.content headers_only
let string_of_version v =
match v with
- | `HTTP_1_0 -> "HTTP/1.0"
- | `HTTP_1_1 -> "HTTP/1.1"
+ | `HTTP_1_0 -> "HTTP/1.0"
+ | `HTTP_1_1 -> "HTTP/1.1"
let version_of_string v =
match v with
- | "HTTP/1.0" -> `HTTP_1_0
- | "HTTP/1.1" -> `HTTP_1_1
- | _ -> http_error_html 400 "Invalid HTTP version" []
+ | "HTTP/1.0" -> `HTTP_1_0
+ | "HTTP/1.1" -> `HTTP_1_1
+ | _ -> raise (HTTPSyntaxError "Invalid HTTP version")
let render_req cout r =
- output_string cout (r.verb^" "^url_escape r.path^" "^string_of_version r.req_version^"\r\n");
+ lwt () = Lwt_io.write cout
+ (r.verb^" "^url_escape r.path^" "^string_of_version r.req_version^"\r\n") in
render_body cout r.req_version r.req_body false
let render_resp cout req_version req_verb r =
let resp_version =
(match r.resp_version with
- | `SAME_AS_REQUEST -> req_version
- | #version as v -> v)
+ | `SAME_AS_REQUEST -> req_version
+ | #version as v -> v)
in
- output_string cout
- (string_of_version resp_version^" "^string_of_int r.status^" "^r.reason^"\r\n");
+ lwt () = Lwt_io.write cout
+ (string_of_version resp_version^" "^string_of_int r.status^" "^r.reason^"\r\n") in
render_body cout resp_version r.resp_body (match req_verb with "HEAD" -> true | _ -> false)
let split_query p =
match Str.bounded_split (Str.regexp "\\?") p 2 with
- | path :: query :: _ -> (path, query)
- | path :: [] -> (path, "")
- | [] -> ("", "")
+ | path :: query :: _ -> (path, query)
+ | path :: [] -> (path, "")
+ | [] -> ("", "")
let parse_urlencoded_binding s =
match Str.bounded_split (Str.regexp "=") s 2 with
- | k :: v :: _ -> (url_unescape k, Some (url_unescape v))
- | k :: [] -> (url_unescape k, None)
- | [] -> ("", None)
+ | k :: v :: _ ->
+ lwt k' = url_unescape k in
+ lwt v' = url_unescape v in
+ return (k', Some v')
+ | k :: [] ->
+ lwt k' = url_unescape k in
+ return (k', None)
+ | [] ->
+ return ("", None)
let parse_urlencoded q =
let pieces = Str.split (Str.regexp "&") q in
- List.map parse_urlencoded_binding pieces
+ Lwt_list.map_s parse_urlencoded_binding pieces
let find_header' name hs =
let lc_name = String.lowercase name in
let rec search hs =
match hs with
- | [] -> raise Not_found
- | (k, v) :: hs' ->
+ | [] -> raise Not_found
+ | (k, v) :: hs' ->
if String.lowercase k = lc_name
then v
else search hs'
@@ -260,134 +278,151 @@ let find_param name params =
try Some (List.assoc name params) with Not_found -> None
let input_crlf cin =
- let line = input_line cin in
+ lwt line = Lwt_io.read_line cin in
let len = String.length line in
if len > 0 && String.get line (len - 1) = '\r'
- then String.sub line 0 (len - 1)
- else line
+ then return (String.sub line 0 (len - 1))
+ else return line
let rec parse_headers cin =
- match Str.bounded_split (Str.regexp ":") (input_crlf cin) 2 with
- | [] ->
- []
- | [k; v] ->
- (k, Util.strip v) :: parse_headers cin
- | k :: _ ->
+ lwt header_line = input_crlf cin in
+ match Str.bounded_split (Str.regexp ":") header_line 2 with
+ | [] ->
+ return []
+ | [k; v] ->
+ lwt headers = parse_headers cin in
+ return ((k, Util.strip v) :: headers)
+ | k :: _ ->
http_error_html 400 ("Bad header: "^k) []
let parse_chunks cin =
fun () ->
- let hexlen_str = input_crlf cin in
+ lwt hexlen_str = input_crlf cin in
let chunk_len = Util.unhex hexlen_str in
let buffer = String.make chunk_len '\000' in
- really_input cin buffer 0 chunk_len;
- (if input_crlf cin <> "" then http_error_html 400 "Invalid chunk boundary" [] else ());
- if chunk_len = 0 then None else Some (buffer, false)
+ lwt () = Lwt_io.read_into_exactly cin buffer 0 chunk_len in
+ lwt chunk_terminator = input_crlf cin in
+ if chunk_terminator <> ""
+ then http_error_html 400 "Invalid chunk boundary" []
+ else
+ if chunk_len = 0
+ then return None
+ else return (Some buffer)
let parse_body cin =
- let headers = parse_headers cin in
+ lwt headers = parse_headers cin in
match find_header "Transfer-Encoding" headers with
- | None | Some "identity" ->
+ | None | Some "identity" ->
(match find_header "Content-Length" headers with
- | None ->
+ | None ->
(* http_error_html 411 "Length required" [] *)
- {headers = headers; content = empty_content}
- | Some length_str ->
+ return {headers = headers; content = empty_content}
+ | Some length_str ->
let length = int_of_string length_str in
let buffer = String.make length '\000' in
- really_input cin buffer 0 length;
- {headers = headers; content = Fixed buffer})
- | Some "chunked" ->
- {headers = headers; content = Variable (Stringstream.from_iter (parse_chunks cin))}
- | Some unsupported ->
+ lwt () = Lwt_io.read_into_exactly cin buffer 0 length in
+ return {headers = headers; content = Fixed buffer})
+ | Some "chunked" ->
+ return {headers = headers; content = Variable (Lwt_stream.from (parse_chunks cin))}
+ | Some unsupported ->
http_error_html 400 ("Unsupported Transfer-Encoding: "^unsupported) []
let rec parse_req cin spurious_newline_credit =
- match Str.bounded_split (Str.regexp " ") (input_crlf cin) 3 with
- | [] ->
+ lwt req_line = input_crlf cin in
+ parse_req' cin spurious_newline_credit req_line
+and parse_req' cin spurious_newline_credit req_line =
+ match Str.bounded_split (Str.regexp " ") req_line 3 with
+ | [] ->
(* HTTP spec requires that we ignore leading CRLFs. We choose to do so, up to a point. *)
if spurious_newline_credit = 0
then http_error_html 400 "Bad request: too many leading CRLFs" []
else parse_req cin (spurious_newline_credit - 1)
- | [verb; path; version_str] ->
+ | [verb; path; version_str] ->
let version = version_of_string version_str in
- let body = parse_body cin in
+ lwt body = parse_body cin in
let (path, query) = split_query path in
- let path = url_unescape path in
- let query = parse_urlencoded query in
- { verb = verb; path = path; query = query; req_version = version; req_body = body }
- | _ -> http_error_html 400 "Bad request line" []
+ lwt path = url_unescape path in
+ lwt query = parse_urlencoded query in
+ return { verb = verb; path = path; query = query; req_version = version; req_body = body }
+ | _ ->
+ http_error_html 400 "Bad request line" []
let discard_unread_body req =
match req.req_body.content with
- | Fixed _ -> ()
- | Variable s -> Stringstream.iter (fun v -> ()) s (* force chunks to be read *)
+ | Fixed _ ->
+ return ()
+ | Variable s ->
+ Lwt_stream.junk_while (fun _ -> true) s (* force chunks to be read *)
let connection_keepalive req =
find_header "Connection" req.req_body.headers = Some "keep-alive"
let main handle_req (s, peername) =
- let cin = in_channel_of_descr s in
- let cout = out_channel_of_descr s in
- (try
- (try
- let rec request_loop () =
- let req = parse_req cin 512 in
- let resp = handle_req req in
+ let cin = Lwt_io.of_fd Lwt_io.input s in
+ let cout = Lwt_io.of_fd Lwt_io.output s in
- let completion_mutex = Mutex.create () in
- let completion = ref None in
- let set_completion v =
- Util.with_mutex0 completion_mutex (fun () ->
- match !completion with
- | None ->
- completion := Some v;
- List.iter (fun cb -> cb v) resp.completion_callbacks
- | Some _ -> ())
- in
+ let pending_completion_callbacks = Queue.create () in
+ let fire_pending_callbacks () =
+ while_lwt not (Queue.is_empty pending_completion_callbacks) do
+ let cbs = Queue.take pending_completion_callbacks in
+ ignore (Lwt_list.iter_s (fun cb -> cb ()) cbs);
+ return ()
+ done
+ in
- (* Here we spawn a thread that just watches the socket to see
- if it either becomes active or closes during rendering of the
- response, so that we can make decisions based on this in any
- eventual streaming response generators. In particular, if
- we're implementing some kind of XHR streaming andthe client
- goes away, we want to abandon the streaming as soon as
- possible. *)
- let input_waiter () =
- try
- (let (r, w, e) = Unix.select [s] [] [s] (-1.0) in
- set_completion (if r <> [] then Completion_normal else Completion_error))
- with _ -> set_completion Completion_error
- in
- ignore (Thread.create input_waiter ());
-
- (try
- render_resp cout req.req_version req.verb resp;
- discard_unread_body req;
- flush cout;
- set_completion Completion_normal
+ let next_request () =
+ (try_lwt
+ (try_lwt
+ lwt req = parse_req cin 512 in
+ lwt () = fire_pending_callbacks () in
+ return (Some req)
with e ->
- set_completion Completion_error;
- raise e);
+ lwt () = fire_pending_callbacks () in
+ raise_lwt e)
+ with End_of_file -> return None)
+ in
- if connection_keepalive req then request_loop () else ()
- in
+ let request_stream = Lwt_stream.from next_request in
+
+ let rec request_loop () =
+ match_lwt Lwt_stream.get request_stream with
+ | None -> return ()
+ | Some req ->
+ lwt resp = handle_req req in
+
+ (* Watch in the background for a new request arriving, and let
+ the currently-streaming (well, the about-to-be-streaming)
+ response know about it so it can decide to terminate if it
+ likes. *)
+ Queue.add resp.completion_callbacks pending_completion_callbacks;
+ ignore (Lwt_stream.peek request_stream);
+
+ lwt () = render_resp cout req.req_version req.verb resp in
+ lwt () = discard_unread_body req in
+ lwt () = Lwt_io.flush cout in
+ lwt () = fire_pending_callbacks () in
+
+ if connection_keepalive req then request_loop () else return ()
+ in
+
+ lwt () =
+ try_lwt
request_loop ()
with
- | End_of_file ->
- ()
- | HTTPError (code, reason, body) ->
+ | HTTPError (code, reason, body) ->
render_resp cout `HTTP_1_0
"GET" (* ugh this should probably be done better *)
{ resp_version = `HTTP_1_0;
status = code;
reason = reason;
resp_body = body;
- completion_callbacks = [] })
- with
- | Sys_error message ->
- Log.info "Sys_error in httpd handler" [Sexp.Str message]
- | exn ->
- Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)]);
- (try flush cout with _ -> ());
- close s
+ completion_callbacks = [] }
+ | Sys_error message ->
+ Log.info "Sys_error in httpd handler" [Sexp.Str message]
+ | exn ->
+ Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)]
+ in
+
+ lwt () = fire_pending_callbacks () in
+ lwt () = (try_lwt Lwt_io.flush cout with _ -> return ()) in
+ Lwt_unix.close s
diff --git a/httpd_file.ml b/httpd_file.ml
index e1d27a4..ccd286c 100644
--- a/httpd_file.ml
+++ b/httpd_file.ml
@@ -15,6 +15,9 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see . *)
+open Lwt
+open Hof
+
let visible_path_component s =
match s with
| "" -> false
@@ -46,19 +49,14 @@ let analyze_path p =
let p = if p = "" then "." else p in
(p, extension_map ext)
-let read_and_close_file handle =
+let stream_file handle =
let buflen = 4096 in
let buffer = String.make buflen '\000' in
fun () ->
- let count =
- (try
- input handle buffer 0 buflen
- with e -> (close_in handle; raise e))
- in
+ let count = input handle buffer 0 buflen in
if count > 0
- then Some (String.sub buffer 0 count, false)
- else (close_in handle;
- None)
+ then return (Some (String.sub buffer 0 count))
+ else return None
let rec read_dir dirhandle =
try
@@ -80,9 +78,11 @@ let render_directory_listing path =
Html.unclosed_tag "hr" [] []]
let resp_raw_file mime_type path =
+ let handle = open_in_bin path in
Httpd.resp_generic_ok
[Httpd.content_type_header_name, mime_type]
- (Httpd.Variable (Stringstream.from_iter (read_and_close_file (open_in_bin path))))
+ (Httpd.Variable (Lwt_stream.from (stream_file handle)))
+ |> Httpd.add_completion_callback (fun () -> close_in handle; return ())
let resp_file raw_path =
let (path, mime_type) = analyze_path raw_path in
diff --git a/relay.ml b/relay.ml
index 0cdea74..64295c9 100644
--- a/relay.ml
+++ b/relay.ml
@@ -50,12 +50,12 @@ let dispatch_message n ch m =
send_error ch "Message not understood" (Message.sexp_of_message m)
let issue_banner cin cout =
- lwt () = output_sexp_and_flush cout (Arr [Str "hop"; Str ""]) in
- lwt () =
- output_sexp_and_flush cout
- (Message.subscribe (Str (Node.local_container_name()),
- Str "", Str "",
- Str "", Str "")) in
+ lwt () = output_sexp cout (Arr [Str "hop"; Str ""]) in
+ lwt () = Lwt_io.flush cout in
+ lwt () = output_sexp cout (Message.subscribe (Str (Node.local_container_name()),
+ Str "", Str "",
+ Str "", Str "")) in
+ lwt () = Lwt_io.flush cout in
return true
let relay_boot (peername, cin, cout) = return (peername, Lwt_mutex.create (), cin, cout)
diff --git a/sexp.ml b/sexp.ml
index 039b588..05e5782 100644
--- a/sexp.ml
+++ b/sexp.ml
@@ -28,38 +28,26 @@ and t =
| Hint of display_hint_t
| Arr of t list
-let _output_str ch s =
- lwt () = write ch (string_of_int (String.length s)) in
- lwt () = write_char ch ':' in
- write ch s
-
-let rec output_sexp ch x =
- match x with
- | Str s ->
- _output_str ch s
- | Hint {hint = h; body = b} ->
- lwt () = write_char ch '[' in
- lwt () = output_sexp ch h in
- lwt () = write_char ch ']' in
- output_sexp ch b
- | Arr xs ->
- lwt () = write_char ch '(' in
- lwt () = Lwt_list.iter_s (output_sexp ch) xs in
- write_char ch ')'
-
-let rec stream_of_sexp x =
- Stringstream.make (fun () ->
+let generic_output_sexp write x =
+ let rec walk x =
match x with
- | Str s ->
- Some (string_of_int (String.length s) ^ ":", false, Stringstream.const s)
- | Hint {hint = h; body = b} ->
- Some ("[", false,
- Stringstream.seq (stream_of_sexp h)
- (Stringstream.seq (Stringstream.const "]")
- (stream_of_sexp b)))
- | Arr xs ->
- Some ("(", false,
- Stringstream.seq (Stringstream.map stream_of_sexp xs) (Stringstream.const ")")))
+ | Str s ->
+ lwt () = write (string_of_int (String.length s)) in
+ lwt () = write ":" in
+ write s
+ | Hint {hint = h; body = b} ->
+ lwt () = write "[" in
+ lwt () = walk h in
+ lwt () = write "]" in
+ walk b
+ | Arr xs ->
+ lwt () = write "(" in
+ lwt () = Lwt_list.iter_s walk xs in
+ write ")"
+ in walk x
+
+let output_sexp ch x = generic_output_sexp (write ch) x
+let stream_of_sexp x = Streamutil.stream_generator (fun yield -> generic_output_sexp yield x)
let write_char_escaped ch c =
if c = '\"'
@@ -92,10 +80,6 @@ let rec output_sexp_human ch x =
xs') in
write_char ch ')'
-let output_sexp_and_flush ch x =
- lwt () = output_sexp ch x in
- flush ch
-
let char_numeric c = '0' <= c && c <= '9'
let char_whitespace c = c <= ' '
@@ -148,7 +132,7 @@ let parse b =
(fun () -> return (Ibuffer.next_char b))
(fun count -> return (Ibuffer.next_chars b count))
let sexp_of_string s = parse (Ibuffer.of_string s)
-let string_of_sexp x = Stringstream.to_string (stream_of_sexp x)
+let string_of_sexp x = Streamutil.stream_to_string (stream_of_sexp x)
let assoc' key v =
match v with
diff --git a/sexpjson.ml b/sexpjson.ml
index 5f1cf5a..66d06a7 100644
--- a/sexpjson.ml
+++ b/sexpjson.ml
@@ -10,18 +10,21 @@ let rec sexp_of_json j =
| Json.Flg f -> Hint {hint = Str "bool"; body = Str (string_of_bool f)}
| Json.Nil -> Hint {hint = Str "null"; body = Arr []}
-let rec json_of_sexp x =
- match x with
- | Hint {hint = Str "num"; body = Str n} -> Json.Num (float_of_string n)
- | Str s -> Json.Str s
- | Arr xs -> Json.Arr (List.map json_of_sexp xs)
- | Hint {hint = Str "obj"; body = Arr kvs} ->
- Json.Rec (List.map
- (fun kv ->
- (match kv with
- | Arr [Str k; v] -> (k, json_of_sexp v)
- | _ -> syntax_error "Bad JSON-SEXP key-value"))
- kvs)
- | Hint {hint = Str "bool"; body = Str bs} -> Json.Flg (bool_of_string bs)
- | Hint {hint = Str "null"; body = Arr []} -> Json.Nil
- | Hint {hint = h; body = b} -> Json.Rec ["_hint", json_of_sexp h; "_body", json_of_sexp b]
+let json_of_sexp x =
+ let rec walk x =
+ match x with
+ | Hint {hint = Str "num"; body = Str n} -> Json.Num (float_of_string n)
+ | Str s -> Json.Str s
+ | Arr xs -> Json.Arr (List.map walk xs)
+ | Hint {hint = Str "obj"; body = Arr kvs} ->
+ Json.Rec (List.map
+ (fun kv ->
+ (match kv with
+ | Arr [Str k; v] -> (k, walk v)
+ | _ -> raise (Syntax_error "Bad JSON-SEXP key-value")))
+ kvs)
+ | Hint {hint = Str "bool"; body = Str bs} -> Json.Flg (bool_of_string bs)
+ | Hint {hint = Str "null"; body = Arr []} -> Json.Nil
+ | Hint {hint = h; body = b} -> Json.Rec ["_hint", walk h; "_body", walk b]
+ in
+ Lwt.wrap1 walk x
diff --git a/streamutil.ml b/streamutil.ml
new file mode 100644
index 0000000..821beee
--- /dev/null
+++ b/streamutil.ml
@@ -0,0 +1,29 @@
+(* Copyright 2012 Tony Garnock-Jones . *)
+
+(* This file is part of Hop. *)
+
+(* Hop is free software: you can redistribute it and/or modify it *)
+(* under the terms of the GNU General Public License as published by the *)
+(* Free Software Foundation, either version 3 of the License, or (at your *)
+(* option) any later version. *)
+
+(* Hop is distributed in the hope that it will be useful, but *)
+(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
+(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
+(* General Public License for more details. *)
+
+(* You should have received a copy of the GNU General Public License *)
+(* along with Hop. If not, see . *)
+
+open Lwt
+
+let stream_to_string s =
+ lwt pieces = Lwt_stream.to_list s in
+ return (String.concat "" pieces)
+
+let stream_generator f =
+ let mbox = Lwt_mvar.create_empty () in
+ let yield v = Lwt_mvar.put mbox (Some v) in
+ ignore (lwt () = f yield in
+ Lwt_mvar.put mbox None);
+ Lwt_stream.from (fun () -> Lwt_mvar.take mbox)
diff --git a/stringstream.ml b/stringstream.ml
deleted file mode 100644
index dd87283..0000000
--- a/stringstream.ml
+++ /dev/null
@@ -1,84 +0,0 @@
-(* Copyright 2012 Tony Garnock-Jones . *)
-
-(* This file is part of Hop. *)
-
-(* Hop is free software: you can redistribute it and/or modify it *)
-(* under the terms of the GNU General Public License as published by the *)
-(* Free Software Foundation, either version 3 of the License, or (at your *)
-(* option) any later version. *)
-
-(* Hop is distributed in the hope that it will be useful, but *)
-(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
-(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
-(* General Public License for more details. *)
-
-(* You should have received a copy of the GNU General Public License *)
-(* along with Hop. If not, see . *)
-
-type t = Stream of (unit -> (string * bool * t) option)
-
-let make f = Stream f
-
-let run (Stream f) = f ()
-
-let empty = Stream (fun () -> None)
-let const v = Stream (fun () -> Some (v, false, empty))
-let const_flush v = Stream (fun () -> Some (v, true, empty))
-
-let flush_before s = Stream (fun () -> Some ("", true, s))
-let flush = flush_before empty
-
-let rec seq s1 s2 =
- Stream (fun () ->
- match run s1 with
- | None -> run s2
- | Some (v, f, k) -> Some (v, f, seq k s2))
-
-let rec from_list vs =
- Stream (fun () ->
- match vs with
- | [] -> None
- | v :: vs -> Some (v, false, (from_list vs)))
-
-let rec map f vs =
- Stream (fun () ->
- match vs with
- | [] -> None
- | v :: vs -> run (seq (f v) (map f vs)))
-
-let rec from_iter f =
- let cache = ref None in
- Stream (fun () ->
- match !cache with
- | Some v -> v
- | None ->
- let result =
- (match f () with
- | Some (str, should_flush) -> Some (str, should_flush, from_iter f)
- | None -> None)
- in
- cache := Some result;
- result)
-
-let rec iter f s =
- match run s with
- | None -> ()
- | Some (v, flush, k) -> (f (v, flush); iter f k)
-
-let rec to_list s =
- match run s with
- | None -> []
- | Some (v, _, k) -> v :: to_list k
-
-let rec to_string s =
- String.concat "" (to_list s)
-
-let rec switch_after' on_boundary limit s1 s2 =
- if limit > 0 || not on_boundary
- then Stream (fun () ->
- match run s1 with
- | None -> None
- | Some (v, f, k) -> Some (v, f, switch_after' f (limit - String.length v) k s2))
- else s2
-
-let switch_after limit s1 s2 = switch_after' true limit s1 s2
diff --git a/ui_main.ml b/ui_main.ml
index 5b9d4d5..75e4380 100644
--- a/ui_main.ml
+++ b/ui_main.ml
@@ -15,6 +15,7 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see . *)
+open Lwt
open Html
open Hof
open Datastructures
@@ -56,7 +57,7 @@ let handle_req id r =
let cleanup_req id exn =
match Node.lookup id with
| Some n -> Node.unbind_all n
- | None -> ()
+ | None -> return ()
let start (s, peername) =
let id = Node.name_of_string ("http-" ^ Uuid.create ()) in
@@ -102,4 +103,4 @@ let init () =
register_dispatcher ("/_/server_stats", api_server_stats);
register_dispatcher ("/_/nodes", api_nodes);
register_dispatcher ("/_/node/", api_node_info);
- ignore (Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start)
+ Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start
diff --git a/ui_relay.ml b/ui_relay.ml
index 84cbecb..9a13faf 100644
--- a/ui_relay.ml
+++ b/ui_relay.ml
@@ -15,79 +15,84 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see . *)
+open Lwt
open Hof
-type outbound_message =
- | Data of Sexp.t
- | Heartbeat
- | Shutdown
-
-let rec message_stream ch =
- let deliver_payload payload =
- Some (Printf.sprintf "%d;%s;" (String.length payload) payload,
- true,
- Stringstream.make (message_stream ch))
- in
- let deliver_sexp_chunk sexp = deliver_payload (Json.to_string (Sexpjson.json_of_sexp sexp)) in
- fun () ->
- match Squeue.pop ch with
- | Data sexp -> deliver_sexp_chunk sexp
- | Heartbeat -> deliver_payload ""
- | Shutdown -> None
-
let rec api_tap_source id r =
- let ch = Squeue.create 10 in
- let handle_message n sexp = Squeue.add (Data sexp) ch in
+ let mbox = Lwt_mvar.create
+ (Some (Message.subscribe (Sexp.Str (Node.local_container_name()),
+ Sexp.Str "", Sexp.Str "",
+ Sexp.Str "", Sexp.Str ""))) in
+
+ let handle_message n sexp = Lwt_mvar.put mbox (Some sexp) in
let n = Node.make "http_tap" handle_message in
- if not (Node.bind (id, n))
- then Httpd.http_error_html 500 "Internal ID collision" []
- else
- let id_block_and_padding =
- Stringstream.const_flush (id.Node.label ^ ";" ^ String.make 2048 'h' ^ ";") in
- handle_message n (Message.subscribe (Sexp.Str (Node.local_container_name()),
- Sexp.Str "", Sexp.Str "",
- Sexp.Str "", Sexp.Str ""));
- Httpd.resp_generic 200 "Streaming"
- [Httpd.text_content_type_header;
- "Access-Control-Allow-Origin", "*"]
- (Httpd.Variable
- (Stringstream.switch_after 131072
- (Stringstream.seq id_block_and_padding (Stringstream.make (message_stream ch)))
- Stringstream.empty))
- |> Httpd.add_disable_cache_headers
- |> Httpd.add_date_header
- |> Httpd.add_completion_callback
- (fun _ ->
- Node.unbind_all n;
- Squeue.add Shutdown ch)
+
+ let shutdown () =
+ lwt () = Node.unbind_all n in
+ Lwt_mvar.put mbox None
+ in
+
+ let generator yield =
+ let body_counter = ref 0 in
+ let yield_and_count s =
+ body_counter := String.length s + !body_counter;
+ yield s
+ in
+ lwt () = yield_and_count (id.Node.label ^ ";" ^ String.make 2048 'h' ^ ";") in
+ let rec drain_mbox () =
+ match_lwt Lwt_mvar.take mbox with
+ | None -> return ()
+ | Some sexp ->
+ lwt json = Sexpjson.json_of_sexp sexp in
+ let payload = Json.to_string json in
+ lwt () = yield_and_count (Printf.sprintf "%d;%s;" (String.length payload) payload) in
+ lwt () =
+ if !body_counter >= 131072
+ then shutdown ()
+ else return ()
+ in
+ drain_mbox ()
+ in drain_mbox ()
+ in
+
+ match_lwt Node.bind (id, n) with
+ | false -> Httpd.http_error_html 500 "Internal ID collision" []
+ | true ->
+ Httpd.resp_generic 200 "Streaming"
+ [Httpd.text_content_type_header;
+ "Access-Control-Allow-Origin", "*"]
+ (Httpd.Variable (Streamutil.stream_generator generator))
+ |> Httpd.add_disable_cache_headers
+ |> Httpd.add_date_header
+ |> Httpd.add_completion_callback shutdown
let api_tap_sink irrelevant_id r =
- let params = Httpd.parse_urlencoded (Httpd.string_of_content r.Httpd.req_body.Httpd.content) in
- (* let stream_id = List.assoc "metadata.id" params in *)
+ lwt content = Httpd.string_of_content r.Httpd.req_body.Httpd.content in
+ lwt params = Httpd.parse_urlencoded content in
match Httpd.find_param "metadata.type" params with
- | Some (Some "send") ->
+ | Some (Some "send") ->
(match Httpd.find_param "data" params with
- | Some (Some data_str) ->
- let data =
- (try Sexpjson.sexp_of_json (Json.of_string data_str)
- with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in
+ | Some (Some data_str) ->
+ lwt data =
+ (try return (Sexpjson.sexp_of_json (Json.of_string data_str))
+ with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in
(match Message.message_of_sexp data with
- | Message.Post (Sexp.Str name, body, token) ->
- Node.send_ignore' name body;
+ | Message.Post (Sexp.Str name, body, token) ->
+ lwt () = Node.send_ignore' name body in
Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content)
- | _ ->
+ | _ ->
Httpd.http_error_html 406 "Message not understood" [])
- | _ -> Httpd.http_error_html 406 "Bad data parameter" [])
- | Some (Some "close") ->
+ | _ -> Httpd.http_error_html 406 "Bad data parameter" [])
+ | Some (Some "close") ->
Httpd.resp_generic_ok [] Httpd.empty_content
- | _ ->
+ | _ ->
Httpd.http_error_html 406 "Unsupported metadata.type" []
let api_tap _ id r =
match r.Httpd.verb with
- | "GET" -> api_tap_source id r
- | "POST" -> api_tap_sink id r
- | _ -> Httpd.http_error_html 400 "Unsupported tap method" []
+ | "GET" -> api_tap_source id r
+ | "POST" -> api_tap_sink id r
+ | _ -> Httpd.http_error_html 400 "Unsupported tap method" []
let init () =
Ui_main.register_dispatcher ("/_/tap", api_tap)
diff --git a/util.ml b/util.ml
index d559b0a..c48d002 100644
--- a/util.ml
+++ b/util.ml
@@ -92,3 +92,14 @@ let unhex s =
else loop (index + 1) (acc * 16 + unhex_char (String.get s index))
in
loop 0 0
+
+let stream_to_string s =
+ lwt pieces = Lwt_stream.to_list s in
+ return (String.concat "" pieces)
+
+let stream_generator f =
+ let mbox = Lwt_mvar.create_empty () in
+ let yield v = Lwt_mvar.put mbox (Some v) in
+ ignore (lwt () = f yield in
+ Lwt_mvar.put mbox None);
+ Lwt_stream.from (fun () -> Lwt_mvar.take mbox)