(* Copyright 2012 Tony Garnock-Jones . *) (* This file is part of Hop. *) (* Hop is free software: you can redistribute it and/or modify it *) (* under the terms of the GNU General Public License as published by the *) (* Free Software Foundation, either version 3 of the License, or (at your *) (* option) any later version. *) (* Hop is distributed in the hope that it will be useful, but *) (* WITHOUT ANY WARRANTY; without even the implied warranty of *) (* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) (* General Public License for more details. *) (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) open Lwt open Hof open Datastructures let all_sources = ref BytesMap.empty let rec api_tap_source id r = let mbox = Lwt_mvar.create (Some (Message.subscribe (Sexp.Str (Node.local_container_name()), Sexp.emptystr, Sexp.emptystr, Sexp.emptystr, Sexp.emptystr))) in let handle_message n sexp = Lwt_mvar.put mbox (Some sexp) in let n = Node.make (Bytes.of_string "http_tap") handle_message in all_sources := BytesMap.add id n !all_sources; let shutdown () = all_sources := BytesMap.remove id !all_sources; lwt () = Node.unbind_all n in Lwt_mvar.put mbox None in let generator yield = let body_counter = ref 0 in let yield_and_count s = let bs = Bytes.of_string s in body_counter := Bytes.length bs + !body_counter; yield bs in lwt () = yield_and_count ((Bytes.to_string id) ^ ";" ^ String.make 2048 'h' ^ ";") in let rec drain_mbox () = match_lwt Lwt_mvar.take mbox with | None -> return () | Some sexp -> lwt json = Sexpjson.json_of_sexp sexp in let payload = Json.to_string json in lwt () = yield_and_count (Printf.sprintf "%d;%s;" (String.length payload) payload) in lwt () = if !body_counter >= 131072 then shutdown () else return () in drain_mbox () in drain_mbox () in Httpd.resp_generic 200 "Streaming" [Httpd.text_content_type_header; "Access-Control-Allow-Origin", "*"] (Httpd.Variable (Streamutil.stream_generator generator)) |> Httpd.add_disable_cache_headers |> Httpd.add_date_header |> Httpd.add_completion_callback shutdown let dispatch_message n m = match m with | Message.Post (Sexp.Str name, body, token) -> lwt () = Node.send_ignore' name body in Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content) | Message.Subscribe (Sexp.Str filter, sink, name, Sexp.Str reply_sink, Sexp.Str reply_name) -> (match_lwt Node.bind (Node.name_of_bytes filter, n) with | true -> lwt () = Node.post_ignore' reply_sink (Sexp.Str reply_name) (Message.subscribe_ok (Sexp.Str filter)) Sexp.emptystr in Httpd.resp_generic 204 "Bound" [] (Httpd.empty_content) | false -> lwt () = Log.warn "Bind failed" [Sexp.Str filter] in Httpd.http_error_html 409 "Bind failed" []) | Message.Unsubscribe (Sexp.Str token) -> (match_lwt Node.unbind (Node.name_of_bytes token) with | true -> Httpd.resp_generic 204 "Unbound" [] (Httpd.empty_content) | false -> lwt () = Log.warn "Unbind failed" [Sexp.Str token] in Httpd.http_error_html 409 "Unbind failed" []) | _ -> Httpd.http_error_html 406 "Message not understood" [] let api_tap_sink irrelevant_id r = lwt content = Httpd.bytes_of_content r.Httpd.req_body.Httpd.content in lwt params = Httpd.parse_urlencoded (Bytes.to_string content) in match Httpd.find_param "metadata.type" params with | Some (Some "send") -> (match Httpd.find_param "metadata.id" params with | Some (Some id) -> (match (try Some (BytesMap.find (Bytes.of_string id) !all_sources) with Not_found -> None) with | Some n -> (match Httpd.find_param "data" params with | Some (Some data_str) -> lwt data = (try return (Sexpjson.sexp_of_json (Json.of_string data_str)) with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in dispatch_message n (Message.message_of_sexp data) | _ -> Httpd.http_error_html 406 "Bad data parameter" []) | None -> Httpd.http_error_html 406 "Tap ID not found" []) | _ -> Httpd.http_error_html 406 "Missing metadata.id" []) | Some (Some "close") -> Httpd.resp_generic_ok [] Httpd.empty_content | _ -> Httpd.http_error_html 406 "Unsupported metadata.type" [] let api_tap _ id r = match r.Httpd.verb with | "GET" -> api_tap_source (Bytes.of_string id) r | "POST" -> api_tap_sink (Bytes.of_string id) r | _ -> Httpd.http_error_html 400 "Unsupported tap method" [] let init () = Ui_main.register_dispatcher ("/_/tap", api_tap)