Lwtize the UI

This commit is contained in:
Tony Garnock-Jones 2012-05-07 06:31:59 -04:00
parent de9104cdb5
commit b4c0589777
12 changed files with 381 additions and 393 deletions

View File

@ -48,11 +48,11 @@ lwt _ =
lwt () = Meta.init () in
hook_log ();
ignore (Amqp_relay.init ());
(* Ui_main.init ();
Ui_relay.init (); *)
ignore (Ui_main.init ());
ignore (Ui_relay.init ());
ignore (Relay.init ());
lwt () = Server_control.run_until "AMQP ready" in
(* Server_control.run_until "HTTP ready"; *)
lwt () = Server_control.run_until "HTTP ready" in
lwt () = Server_control.run_until "Hop ready" in
if Server_control.is_running ()
then (lwt () = create_ready_file () in

28
html.ml
View File

@ -78,18 +78,22 @@ and string_of_html c =
| Text str ->
html_escape str
let rec stream_of_html_contents cs = Stringstream.map stream_of_html cs
and stream_of_html c =
Stringstream.make (fun () ->
let html_generator c yield =
let rec o c =
match c with
| Tag (label, attrs, [], true) ->
Some (html_open_tag_string label attrs, false, Stringstream.empty)
| Tag (label, attrs, contents, _) ->
Some (html_open_tag_string label attrs, false,
Stringstream.seq
(stream_of_html_contents contents) (Stringstream.const ("</"^label^">")))
| Text str ->
Some (str, false, Stringstream.empty))
| Tag (label, attrs, [], true) ->
yield (html_open_tag_string label attrs)
| Tag (label, attrs, contents, _) ->
lwt () = yield (html_open_tag_string label attrs) in
lwt () = Lwt_list.iter_s o contents in
yield ("</"^label^">")
| Text str ->
yield str
in o c
let stream_of_html c = Streamutil.stream_generator (html_generator c)
let stream_of_html_doc d =
Stringstream.seq (Stringstream.const "<!DOCTYPE html>") (stream_of_html (tag_of_document d))
Streamutil.stream_generator (fun yield ->
lwt () = yield "<!DOCTYPE html>" in
html_generator (tag_of_document d) yield)

371
httpd.ml
View File

@ -15,39 +15,41 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Unix
open Lwt
open Hof
type version = [`HTTP_1_0 | `HTTP_1_1]
type resp_version = [version | `SAME_AS_REQUEST]
type content = Fixed of string | Variable of Stringstream.t
type completion = Completion_normal | Completion_error
type content = Fixed of string | Variable of string Lwt_stream.t
type body = {
headers: (string * string) list;
content: content
}
headers: (string * string) list;
content: content
}
let empty_content = Fixed ""
let empty_body = {headers = []; content = empty_content}
type req = {
verb: string;
path: string;
query: (string * string option) list;
req_version: version;
req_body: body
}
verb: string;
path: string;
query: (string * string option) list;
req_version: version;
req_body: body
}
type resp = {
resp_version: resp_version;
status: int;
reason: string;
resp_body: body;
completion_callbacks: (completion -> unit) list
}
resp_version: resp_version;
status: int;
reason: string;
resp_body: body;
completion_callbacks: (unit -> unit Lwt.t) list
}
exception HTTPError of (int * string * body)
exception HTTPSyntaxError of string
let html_content_type = "text/html;charset=utf-8"
let text_content_type = "text/plain;charset=utf-8"
@ -62,18 +64,20 @@ let disable_cache_headers () =
"Cache-Control", "no-cache, must-revalidate, max-age=0";
"Pragma", "no-cache"]
let add_headers headers resp =
let add_headers headers resp_thr =
lwt resp = resp_thr in
let b = resp.resp_body in
{resp with resp_body = {b with headers = b.headers @ headers}}
return {resp with resp_body = {b with headers = b.headers @ headers}}
let add_disable_cache_headers resp = add_headers (disable_cache_headers ()) resp
let add_date_header resp = add_headers ["Date", Httpd_date.http_gmtime (Unix.time ())] resp
let add_completion_callback cb resp =
{resp with completion_callbacks = cb :: resp.completion_callbacks}
let add_completion_callback cb resp_thr =
lwt resp = resp_thr in
return {resp with completion_callbacks = cb :: resp.completion_callbacks}
let http_error code reason body = raise (HTTPError (code, reason, body))
let http_error code reason body = raise_lwt (HTTPError (code, reason, body))
let http_error_plain code reason =
http_error code reason
@ -92,12 +96,18 @@ let html_error_doc code reason extra_body =
let http_error_html code reason extra_body =
http_error_html_doc code reason (html_error_doc code reason extra_body)
let trap_syntax_errors thunk =
try
return (thunk ())
with HTTPSyntaxError message ->
http_error_html 400 message []
let resp_generic code reason headers content =
{ resp_version = `SAME_AS_REQUEST;
status = code;
reason = reason;
resp_body = {headers = headers; content = content};
completion_callbacks = [] }
return { resp_version = `SAME_AS_REQUEST;
status = code;
reason = reason;
resp_body = {headers = headers; content = content};
completion_callbacks = [] }
let resp_generic_ok headers content =
resp_generic 200 "OK" headers content
@ -132,9 +142,9 @@ let resp_redirect_permanent new_path =
let escape_url_char c =
match c with
| '%' -> Some (fun (s, pos) -> ("%25", pos + 1))
| ' ' -> Some (fun (s, pos) -> ("%20", pos + 1))
| _ -> None
| '%' -> Some (fun (s, pos) -> ("%25", pos + 1))
| ' ' -> Some (fun (s, pos) -> ("%20", pos + 1))
| _ -> None
let url_escape s = Util.strsub escape_url_char s
let unescape_url_hex_code (s, pos) =
@ -144,109 +154,117 @@ let unescape_url_hex_code (s, pos) =
let v1 = Util.unhex_char (String.get s (pos + 1)) in
let v2 = Util.unhex_char (String.get s (pos + 2)) in
if v1 = -1 || v2 = -1
then http_error_html 400 ("Bad percent escaping: '"^String.sub s pos 3^"'") []
then raise (HTTPSyntaxError ("Bad percent escaping: '"^String.sub s pos 3^"'"))
else (String.make 1 (Char.chr (v1 * 16 + v2)), pos + 3)
else http_error_html 400 ("Bad percent escaping: '"^String.sub s pos (len - pos)^"'") []
else raise (HTTPSyntaxError ("Bad percent escaping: '"^String.sub s pos (len - pos)^"'"))
let unescape_url_char c =
match c with
| '%' -> Some unescape_url_hex_code
| _ -> None
| '%' -> Some unescape_url_hex_code
| _ -> None
let url_unescape s = Util.strsub unescape_url_char s
let url_unescape s =
trap_syntax_errors (fun () -> Util.strsub unescape_url_char s)
let render_header cout (k, v) =
output_string cout k;
output_string cout ": ";
output_string cout v;
output_string cout "\r\n"
lwt () = Lwt_io.write cout k in
lwt () = Lwt_io.write cout ": " in
lwt () = Lwt_io.write cout v in
Lwt_io.write cout "\r\n"
let render_chunk cout (chunk, should_flush) =
(match chunk with
| "" -> ()
| _ ->
output_string cout (Printf.sprintf "%x\r\n" (String.length chunk));
output_string cout chunk;
output_string cout "\r\n");
if should_flush then flush cout else ()
let render_chunk cout chunk =
match chunk with
| "" -> return ()
| _ ->
lwt () = Lwt_io.write cout (Printf.sprintf "%x\r\n" (String.length chunk)) in
lwt () = Lwt_io.write cout chunk in
Lwt_io.write cout "\r\n"
let render_fixed_content cout s headers_only =
render_header cout ("Content-Length", string_of_int (String.length s));
output_string cout "\r\n";
if headers_only then () else output_string cout s
lwt () = render_header cout ("Content-Length", string_of_int (String.length s)) in
lwt () = Lwt_io.write cout "\r\n" in
if headers_only then return () else Lwt_io.write cout s
let string_of_content c =
match c with
| Fixed s -> s
| Variable s -> Stringstream.to_string s
| Fixed s -> return s
| Variable s -> Streamutil.stream_to_string s
let render_content cout v c headers_only =
match c with
| Fixed s ->
| Fixed s ->
render_fixed_content cout s headers_only
| Variable s ->
| Variable s ->
match v with
| `HTTP_1_0 ->
render_fixed_content cout (Stringstream.to_string s) headers_only
| `HTTP_1_1 ->
| `HTTP_1_0 ->
lwt str = Streamutil.stream_to_string s in
render_fixed_content cout str headers_only
| `HTTP_1_1 ->
if headers_only
then (output_string cout "\r\n")
else (render_header cout ("Transfer-Encoding", "chunked");
output_string cout "\r\n";
Stringstream.iter (render_chunk cout) s;
output_string cout "0\r\n\r\n")
then (Lwt_io.write cout "\r\n")
else (lwt () = render_header cout ("Transfer-Encoding", "chunked") in
lwt () = Lwt_io.write cout "\r\n" in
lwt () = Lwt_stream.iter_s (render_chunk cout) s in
Lwt_io.write cout "0\r\n\r\n")
let render_body cout v b headers_only =
List.iter (render_header cout) b.headers;
lwt () = Lwt_list.iter_s (render_header cout) b.headers in
render_content cout v b.content headers_only
let string_of_version v =
match v with
| `HTTP_1_0 -> "HTTP/1.0"
| `HTTP_1_1 -> "HTTP/1.1"
| `HTTP_1_0 -> "HTTP/1.0"
| `HTTP_1_1 -> "HTTP/1.1"
let version_of_string v =
match v with
| "HTTP/1.0" -> `HTTP_1_0
| "HTTP/1.1" -> `HTTP_1_1
| _ -> http_error_html 400 "Invalid HTTP version" []
| "HTTP/1.0" -> `HTTP_1_0
| "HTTP/1.1" -> `HTTP_1_1
| _ -> raise (HTTPSyntaxError "Invalid HTTP version")
let render_req cout r =
output_string cout (r.verb^" "^url_escape r.path^" "^string_of_version r.req_version^"\r\n");
lwt () = Lwt_io.write cout
(r.verb^" "^url_escape r.path^" "^string_of_version r.req_version^"\r\n") in
render_body cout r.req_version r.req_body false
let render_resp cout req_version req_verb r =
let resp_version =
(match r.resp_version with
| `SAME_AS_REQUEST -> req_version
| #version as v -> v)
| `SAME_AS_REQUEST -> req_version
| #version as v -> v)
in
output_string cout
(string_of_version resp_version^" "^string_of_int r.status^" "^r.reason^"\r\n");
lwt () = Lwt_io.write cout
(string_of_version resp_version^" "^string_of_int r.status^" "^r.reason^"\r\n") in
render_body cout resp_version r.resp_body (match req_verb with "HEAD" -> true | _ -> false)
let split_query p =
match Str.bounded_split (Str.regexp "\\?") p 2 with
| path :: query :: _ -> (path, query)
| path :: [] -> (path, "")
| [] -> ("", "")
| path :: query :: _ -> (path, query)
| path :: [] -> (path, "")
| [] -> ("", "")
let parse_urlencoded_binding s =
match Str.bounded_split (Str.regexp "=") s 2 with
| k :: v :: _ -> (url_unescape k, Some (url_unescape v))
| k :: [] -> (url_unescape k, None)
| [] -> ("", None)
| k :: v :: _ ->
lwt k' = url_unescape k in
lwt v' = url_unescape v in
return (k', Some v')
| k :: [] ->
lwt k' = url_unescape k in
return (k', None)
| [] ->
return ("", None)
let parse_urlencoded q =
let pieces = Str.split (Str.regexp "&") q in
List.map parse_urlencoded_binding pieces
Lwt_list.map_s parse_urlencoded_binding pieces
let find_header' name hs =
let lc_name = String.lowercase name in
let rec search hs =
match hs with
| [] -> raise Not_found
| (k, v) :: hs' ->
| [] -> raise Not_found
| (k, v) :: hs' ->
if String.lowercase k = lc_name
then v
else search hs'
@ -260,134 +278,151 @@ let find_param name params =
try Some (List.assoc name params) with Not_found -> None
let input_crlf cin =
let line = input_line cin in
lwt line = Lwt_io.read_line cin in
let len = String.length line in
if len > 0 && String.get line (len - 1) = '\r'
then String.sub line 0 (len - 1)
else line
then return (String.sub line 0 (len - 1))
else return line
let rec parse_headers cin =
match Str.bounded_split (Str.regexp ":") (input_crlf cin) 2 with
| [] ->
[]
| [k; v] ->
(k, Util.strip v) :: parse_headers cin
| k :: _ ->
lwt header_line = input_crlf cin in
match Str.bounded_split (Str.regexp ":") header_line 2 with
| [] ->
return []
| [k; v] ->
lwt headers = parse_headers cin in
return ((k, Util.strip v) :: headers)
| k :: _ ->
http_error_html 400 ("Bad header: "^k) []
let parse_chunks cin =
fun () ->
let hexlen_str = input_crlf cin in
lwt hexlen_str = input_crlf cin in
let chunk_len = Util.unhex hexlen_str in
let buffer = String.make chunk_len '\000' in
really_input cin buffer 0 chunk_len;
(if input_crlf cin <> "" then http_error_html 400 "Invalid chunk boundary" [] else ());
if chunk_len = 0 then None else Some (buffer, false)
lwt () = Lwt_io.read_into_exactly cin buffer 0 chunk_len in
lwt chunk_terminator = input_crlf cin in
if chunk_terminator <> ""
then http_error_html 400 "Invalid chunk boundary" []
else
if chunk_len = 0
then return None
else return (Some buffer)
let parse_body cin =
let headers = parse_headers cin in
lwt headers = parse_headers cin in
match find_header "Transfer-Encoding" headers with
| None | Some "identity" ->
| None | Some "identity" ->
(match find_header "Content-Length" headers with
| None ->
| None ->
(* http_error_html 411 "Length required" [] *)
{headers = headers; content = empty_content}
| Some length_str ->
return {headers = headers; content = empty_content}
| Some length_str ->
let length = int_of_string length_str in
let buffer = String.make length '\000' in
really_input cin buffer 0 length;
{headers = headers; content = Fixed buffer})
| Some "chunked" ->
{headers = headers; content = Variable (Stringstream.from_iter (parse_chunks cin))}
| Some unsupported ->
lwt () = Lwt_io.read_into_exactly cin buffer 0 length in
return {headers = headers; content = Fixed buffer})
| Some "chunked" ->
return {headers = headers; content = Variable (Lwt_stream.from (parse_chunks cin))}
| Some unsupported ->
http_error_html 400 ("Unsupported Transfer-Encoding: "^unsupported) []
let rec parse_req cin spurious_newline_credit =
match Str.bounded_split (Str.regexp " ") (input_crlf cin) 3 with
| [] ->
lwt req_line = input_crlf cin in
parse_req' cin spurious_newline_credit req_line
and parse_req' cin spurious_newline_credit req_line =
match Str.bounded_split (Str.regexp " ") req_line 3 with
| [] ->
(* HTTP spec requires that we ignore leading CRLFs. We choose to do so, up to a point. *)
if spurious_newline_credit = 0
then http_error_html 400 "Bad request: too many leading CRLFs" []
else parse_req cin (spurious_newline_credit - 1)
| [verb; path; version_str] ->
| [verb; path; version_str] ->
let version = version_of_string version_str in
let body = parse_body cin in
lwt body = parse_body cin in
let (path, query) = split_query path in
let path = url_unescape path in
let query = parse_urlencoded query in
{ verb = verb; path = path; query = query; req_version = version; req_body = body }
| _ -> http_error_html 400 "Bad request line" []
lwt path = url_unescape path in
lwt query = parse_urlencoded query in
return { verb = verb; path = path; query = query; req_version = version; req_body = body }
| _ ->
http_error_html 400 "Bad request line" []
let discard_unread_body req =
match req.req_body.content with
| Fixed _ -> ()
| Variable s -> Stringstream.iter (fun v -> ()) s (* force chunks to be read *)
| Fixed _ ->
return ()
| Variable s ->
Lwt_stream.junk_while (fun _ -> true) s (* force chunks to be read *)
let connection_keepalive req =
find_header "Connection" req.req_body.headers = Some "keep-alive"
let main handle_req (s, peername) =
let cin = in_channel_of_descr s in
let cout = out_channel_of_descr s in
(try
(try
let rec request_loop () =
let req = parse_req cin 512 in
let resp = handle_req req in
let cin = Lwt_io.of_fd Lwt_io.input s in
let cout = Lwt_io.of_fd Lwt_io.output s in
let completion_mutex = Mutex.create () in
let completion = ref None in
let set_completion v =
Util.with_mutex0 completion_mutex (fun () ->
match !completion with
| None ->
completion := Some v;
List.iter (fun cb -> cb v) resp.completion_callbacks
| Some _ -> ())
in
let pending_completion_callbacks = Queue.create () in
let fire_pending_callbacks () =
while_lwt not (Queue.is_empty pending_completion_callbacks) do
let cbs = Queue.take pending_completion_callbacks in
ignore (Lwt_list.iter_s (fun cb -> cb ()) cbs);
return ()
done
in
(* Here we spawn a thread that just watches the socket to see
if it either becomes active or closes during rendering of the
response, so that we can make decisions based on this in any
eventual streaming response generators. In particular, if
we're implementing some kind of XHR streaming andthe client
goes away, we want to abandon the streaming as soon as
possible. *)
let input_waiter () =
try
(let (r, w, e) = Unix.select [s] [] [s] (-1.0) in
set_completion (if r <> [] then Completion_normal else Completion_error))
with _ -> set_completion Completion_error
in
ignore (Thread.create input_waiter ());
(try
render_resp cout req.req_version req.verb resp;
discard_unread_body req;
flush cout;
set_completion Completion_normal
let next_request () =
(try_lwt
(try_lwt
lwt req = parse_req cin 512 in
lwt () = fire_pending_callbacks () in
return (Some req)
with e ->
set_completion Completion_error;
raise e);
lwt () = fire_pending_callbacks () in
raise_lwt e)
with End_of_file -> return None)
in
if connection_keepalive req then request_loop () else ()
in
let request_stream = Lwt_stream.from next_request in
let rec request_loop () =
match_lwt Lwt_stream.get request_stream with
| None -> return ()
| Some req ->
lwt resp = handle_req req in
(* Watch in the background for a new request arriving, and let
the currently-streaming (well, the about-to-be-streaming)
response know about it so it can decide to terminate if it
likes. *)
Queue.add resp.completion_callbacks pending_completion_callbacks;
ignore (Lwt_stream.peek request_stream);
lwt () = render_resp cout req.req_version req.verb resp in
lwt () = discard_unread_body req in
lwt () = Lwt_io.flush cout in
lwt () = fire_pending_callbacks () in
if connection_keepalive req then request_loop () else return ()
in
lwt () =
try_lwt
request_loop ()
with
| End_of_file ->
()
| HTTPError (code, reason, body) ->
| HTTPError (code, reason, body) ->
render_resp cout `HTTP_1_0
"GET" (* ugh this should probably be done better *)
{ resp_version = `HTTP_1_0;
status = code;
reason = reason;
resp_body = body;
completion_callbacks = [] })
with
| Sys_error message ->
Log.info "Sys_error in httpd handler" [Sexp.Str message]
| exn ->
Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)]);
(try flush cout with _ -> ());
close s
completion_callbacks = [] }
| Sys_error message ->
Log.info "Sys_error in httpd handler" [Sexp.Str message]
| exn ->
Log.error "Uncaught exception in httpd handler" [Sexp.Str (Printexc.to_string exn)]
in
lwt () = fire_pending_callbacks () in
lwt () = (try_lwt Lwt_io.flush cout with _ -> return ()) in
Lwt_unix.close s

View File

@ -15,6 +15,9 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Lwt
open Hof
let visible_path_component s =
match s with
| "" -> false
@ -46,19 +49,14 @@ let analyze_path p =
let p = if p = "" then "." else p in
(p, extension_map ext)
let read_and_close_file handle =
let stream_file handle =
let buflen = 4096 in
let buffer = String.make buflen '\000' in
fun () ->
let count =
(try
input handle buffer 0 buflen
with e -> (close_in handle; raise e))
in
let count = input handle buffer 0 buflen in
if count > 0
then Some (String.sub buffer 0 count, false)
else (close_in handle;
None)
then return (Some (String.sub buffer 0 count))
else return None
let rec read_dir dirhandle =
try
@ -80,9 +78,11 @@ let render_directory_listing path =
Html.unclosed_tag "hr" [] []]
let resp_raw_file mime_type path =
let handle = open_in_bin path in
Httpd.resp_generic_ok
[Httpd.content_type_header_name, mime_type]
(Httpd.Variable (Stringstream.from_iter (read_and_close_file (open_in_bin path))))
(Httpd.Variable (Lwt_stream.from (stream_file handle)))
|> Httpd.add_completion_callback (fun () -> close_in handle; return ())
let resp_file raw_path =
let (path, mime_type) = analyze_path raw_path in

View File

@ -50,12 +50,12 @@ let dispatch_message n ch m =
send_error ch "Message not understood" (Message.sexp_of_message m)
let issue_banner cin cout =
lwt () = output_sexp_and_flush cout (Arr [Str "hop"; Str ""]) in
lwt () =
output_sexp_and_flush cout
(Message.subscribe (Str (Node.local_container_name()),
Str "", Str "",
Str "", Str "")) in
lwt () = output_sexp cout (Arr [Str "hop"; Str ""]) in
lwt () = Lwt_io.flush cout in
lwt () = output_sexp cout (Message.subscribe (Str (Node.local_container_name()),
Str "", Str "",
Str "", Str "")) in
lwt () = Lwt_io.flush cout in
return true
let relay_boot (peername, cin, cout) = return (peername, Lwt_mutex.create (), cin, cout)

56
sexp.ml
View File

@ -28,38 +28,26 @@ and t =
| Hint of display_hint_t
| Arr of t list
let _output_str ch s =
lwt () = write ch (string_of_int (String.length s)) in
lwt () = write_char ch ':' in
write ch s
let rec output_sexp ch x =
match x with
| Str s ->
_output_str ch s
| Hint {hint = h; body = b} ->
lwt () = write_char ch '[' in
lwt () = output_sexp ch h in
lwt () = write_char ch ']' in
output_sexp ch b
| Arr xs ->
lwt () = write_char ch '(' in
lwt () = Lwt_list.iter_s (output_sexp ch) xs in
write_char ch ')'
let rec stream_of_sexp x =
Stringstream.make (fun () ->
let generic_output_sexp write x =
let rec walk x =
match x with
| Str s ->
Some (string_of_int (String.length s) ^ ":", false, Stringstream.const s)
| Hint {hint = h; body = b} ->
Some ("[", false,
Stringstream.seq (stream_of_sexp h)
(Stringstream.seq (Stringstream.const "]")
(stream_of_sexp b)))
| Arr xs ->
Some ("(", false,
Stringstream.seq (Stringstream.map stream_of_sexp xs) (Stringstream.const ")")))
| Str s ->
lwt () = write (string_of_int (String.length s)) in
lwt () = write ":" in
write s
| Hint {hint = h; body = b} ->
lwt () = write "[" in
lwt () = walk h in
lwt () = write "]" in
walk b
| Arr xs ->
lwt () = write "(" in
lwt () = Lwt_list.iter_s walk xs in
write ")"
in walk x
let output_sexp ch x = generic_output_sexp (write ch) x
let stream_of_sexp x = Streamutil.stream_generator (fun yield -> generic_output_sexp yield x)
let write_char_escaped ch c =
if c = '\"'
@ -92,10 +80,6 @@ let rec output_sexp_human ch x =
xs') in
write_char ch ')'
let output_sexp_and_flush ch x =
lwt () = output_sexp ch x in
flush ch
let char_numeric c = '0' <= c && c <= '9'
let char_whitespace c = c <= ' '
@ -148,7 +132,7 @@ let parse b =
(fun () -> return (Ibuffer.next_char b))
(fun count -> return (Ibuffer.next_chars b count))
let sexp_of_string s = parse (Ibuffer.of_string s)
let string_of_sexp x = Stringstream.to_string (stream_of_sexp x)
let string_of_sexp x = Streamutil.stream_to_string (stream_of_sexp x)
let assoc' key v =
match v with

View File

@ -10,18 +10,21 @@ let rec sexp_of_json j =
| Json.Flg f -> Hint {hint = Str "bool"; body = Str (string_of_bool f)}
| Json.Nil -> Hint {hint = Str "null"; body = Arr []}
let rec json_of_sexp x =
match x with
| Hint {hint = Str "num"; body = Str n} -> Json.Num (float_of_string n)
| Str s -> Json.Str s
| Arr xs -> Json.Arr (List.map json_of_sexp xs)
| Hint {hint = Str "obj"; body = Arr kvs} ->
Json.Rec (List.map
(fun kv ->
(match kv with
| Arr [Str k; v] -> (k, json_of_sexp v)
| _ -> syntax_error "Bad JSON-SEXP key-value"))
kvs)
| Hint {hint = Str "bool"; body = Str bs} -> Json.Flg (bool_of_string bs)
| Hint {hint = Str "null"; body = Arr []} -> Json.Nil
| Hint {hint = h; body = b} -> Json.Rec ["_hint", json_of_sexp h; "_body", json_of_sexp b]
let json_of_sexp x =
let rec walk x =
match x with
| Hint {hint = Str "num"; body = Str n} -> Json.Num (float_of_string n)
| Str s -> Json.Str s
| Arr xs -> Json.Arr (List.map walk xs)
| Hint {hint = Str "obj"; body = Arr kvs} ->
Json.Rec (List.map
(fun kv ->
(match kv with
| Arr [Str k; v] -> (k, walk v)
| _ -> raise (Syntax_error "Bad JSON-SEXP key-value")))
kvs)
| Hint {hint = Str "bool"; body = Str bs} -> Json.Flg (bool_of_string bs)
| Hint {hint = Str "null"; body = Arr []} -> Json.Nil
| Hint {hint = h; body = b} -> Json.Rec ["_hint", walk h; "_body", walk b]
in
Lwt.wrap1 walk x

29
streamutil.ml Normal file
View File

@ -0,0 +1,29 @@
(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Hop. *)
(* Hop is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Hop is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Lwt
let stream_to_string s =
lwt pieces = Lwt_stream.to_list s in
return (String.concat "" pieces)
let stream_generator f =
let mbox = Lwt_mvar.create_empty () in
let yield v = Lwt_mvar.put mbox (Some v) in
ignore (lwt () = f yield in
Lwt_mvar.put mbox None);
Lwt_stream.from (fun () -> Lwt_mvar.take mbox)

View File

@ -1,84 +0,0 @@
(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Hop. *)
(* Hop is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Hop is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
type t = Stream of (unit -> (string * bool * t) option)
let make f = Stream f
let run (Stream f) = f ()
let empty = Stream (fun () -> None)
let const v = Stream (fun () -> Some (v, false, empty))
let const_flush v = Stream (fun () -> Some (v, true, empty))
let flush_before s = Stream (fun () -> Some ("", true, s))
let flush = flush_before empty
let rec seq s1 s2 =
Stream (fun () ->
match run s1 with
| None -> run s2
| Some (v, f, k) -> Some (v, f, seq k s2))
let rec from_list vs =
Stream (fun () ->
match vs with
| [] -> None
| v :: vs -> Some (v, false, (from_list vs)))
let rec map f vs =
Stream (fun () ->
match vs with
| [] -> None
| v :: vs -> run (seq (f v) (map f vs)))
let rec from_iter f =
let cache = ref None in
Stream (fun () ->
match !cache with
| Some v -> v
| None ->
let result =
(match f () with
| Some (str, should_flush) -> Some (str, should_flush, from_iter f)
| None -> None)
in
cache := Some result;
result)
let rec iter f s =
match run s with
| None -> ()
| Some (v, flush, k) -> (f (v, flush); iter f k)
let rec to_list s =
match run s with
| None -> []
| Some (v, _, k) -> v :: to_list k
let rec to_string s =
String.concat "" (to_list s)
let rec switch_after' on_boundary limit s1 s2 =
if limit > 0 || not on_boundary
then Stream (fun () ->
match run s1 with
| None -> None
| Some (v, f, k) -> Some (v, f, switch_after' f (limit - String.length v) k s2))
else s2
let switch_after limit s1 s2 = switch_after' true limit s1 s2

View File

@ -15,6 +15,7 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Lwt
open Html
open Hof
open Datastructures
@ -56,7 +57,7 @@ let handle_req id r =
let cleanup_req id exn =
match Node.lookup id with
| Some n -> Node.unbind_all n
| None -> ()
| None -> return ()
let start (s, peername) =
let id = Node.name_of_string ("http-" ^ Uuid.create ()) in
@ -102,4 +103,4 @@ let init () =
register_dispatcher ("/_/server_stats", api_server_stats);
register_dispatcher ("/_/nodes", api_nodes);
register_dispatcher ("/_/node/", api_node_info);
ignore (Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start)
Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start

View File

@ -15,79 +15,84 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Lwt
open Hof
type outbound_message =
| Data of Sexp.t
| Heartbeat
| Shutdown
let rec message_stream ch =
let deliver_payload payload =
Some (Printf.sprintf "%d;%s;" (String.length payload) payload,
true,
Stringstream.make (message_stream ch))
in
let deliver_sexp_chunk sexp = deliver_payload (Json.to_string (Sexpjson.json_of_sexp sexp)) in
fun () ->
match Squeue.pop ch with
| Data sexp -> deliver_sexp_chunk sexp
| Heartbeat -> deliver_payload ""
| Shutdown -> None
let rec api_tap_source id r =
let ch = Squeue.create 10 in
let handle_message n sexp = Squeue.add (Data sexp) ch in
let mbox = Lwt_mvar.create
(Some (Message.subscribe (Sexp.Str (Node.local_container_name()),
Sexp.Str "", Sexp.Str "",
Sexp.Str "", Sexp.Str ""))) in
let handle_message n sexp = Lwt_mvar.put mbox (Some sexp) in
let n = Node.make "http_tap" handle_message in
if not (Node.bind (id, n))
then Httpd.http_error_html 500 "Internal ID collision" []
else
let id_block_and_padding =
Stringstream.const_flush (id.Node.label ^ ";" ^ String.make 2048 'h' ^ ";") in
handle_message n (Message.subscribe (Sexp.Str (Node.local_container_name()),
Sexp.Str "", Sexp.Str "",
Sexp.Str "", Sexp.Str ""));
Httpd.resp_generic 200 "Streaming"
[Httpd.text_content_type_header;
"Access-Control-Allow-Origin", "*"]
(Httpd.Variable
(Stringstream.switch_after 131072
(Stringstream.seq id_block_and_padding (Stringstream.make (message_stream ch)))
Stringstream.empty))
|> Httpd.add_disable_cache_headers
|> Httpd.add_date_header
|> Httpd.add_completion_callback
(fun _ ->
Node.unbind_all n;
Squeue.add Shutdown ch)
let shutdown () =
lwt () = Node.unbind_all n in
Lwt_mvar.put mbox None
in
let generator yield =
let body_counter = ref 0 in
let yield_and_count s =
body_counter := String.length s + !body_counter;
yield s
in
lwt () = yield_and_count (id.Node.label ^ ";" ^ String.make 2048 'h' ^ ";") in
let rec drain_mbox () =
match_lwt Lwt_mvar.take mbox with
| None -> return ()
| Some sexp ->
lwt json = Sexpjson.json_of_sexp sexp in
let payload = Json.to_string json in
lwt () = yield_and_count (Printf.sprintf "%d;%s;" (String.length payload) payload) in
lwt () =
if !body_counter >= 131072
then shutdown ()
else return ()
in
drain_mbox ()
in drain_mbox ()
in
match_lwt Node.bind (id, n) with
| false -> Httpd.http_error_html 500 "Internal ID collision" []
| true ->
Httpd.resp_generic 200 "Streaming"
[Httpd.text_content_type_header;
"Access-Control-Allow-Origin", "*"]
(Httpd.Variable (Streamutil.stream_generator generator))
|> Httpd.add_disable_cache_headers
|> Httpd.add_date_header
|> Httpd.add_completion_callback shutdown
let api_tap_sink irrelevant_id r =
let params = Httpd.parse_urlencoded (Httpd.string_of_content r.Httpd.req_body.Httpd.content) in
(* let stream_id = List.assoc "metadata.id" params in *)
lwt content = Httpd.string_of_content r.Httpd.req_body.Httpd.content in
lwt params = Httpd.parse_urlencoded content in
match Httpd.find_param "metadata.type" params with
| Some (Some "send") ->
| Some (Some "send") ->
(match Httpd.find_param "data" params with
| Some (Some data_str) ->
let data =
(try Sexpjson.sexp_of_json (Json.of_string data_str)
with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in
| Some (Some data_str) ->
lwt data =
(try return (Sexpjson.sexp_of_json (Json.of_string data_str))
with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in
(match Message.message_of_sexp data with
| Message.Post (Sexp.Str name, body, token) ->
Node.send_ignore' name body;
| Message.Post (Sexp.Str name, body, token) ->
lwt () = Node.send_ignore' name body in
Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content)
| _ ->
| _ ->
Httpd.http_error_html 406 "Message not understood" [])
| _ -> Httpd.http_error_html 406 "Bad data parameter" [])
| Some (Some "close") ->
| _ -> Httpd.http_error_html 406 "Bad data parameter" [])
| Some (Some "close") ->
Httpd.resp_generic_ok [] Httpd.empty_content
| _ ->
| _ ->
Httpd.http_error_html 406 "Unsupported metadata.type" []
let api_tap _ id r =
match r.Httpd.verb with
| "GET" -> api_tap_source id r
| "POST" -> api_tap_sink id r
| _ -> Httpd.http_error_html 400 "Unsupported tap method" []
| "GET" -> api_tap_source id r
| "POST" -> api_tap_sink id r
| _ -> Httpd.http_error_html 400 "Unsupported tap method" []
let init () =
Ui_main.register_dispatcher ("/_/tap", api_tap)

11
util.ml
View File

@ -92,3 +92,14 @@ let unhex s =
else loop (index + 1) (acc * 16 + unhex_char (String.get s index))
in
loop 0 0
let stream_to_string s =
lwt pieces = Lwt_stream.to_list s in
return (String.concat "" pieces)
let stream_generator f =
let mbox = Lwt_mvar.create_empty () in
let yield v = Lwt_mvar.put mbox (Some v) in
ignore (lwt () = f yield in
Lwt_mvar.put mbox None);
Lwt_stream.from (fun () -> Lwt_mvar.take mbox)