diff --git a/amqp_relay.ml b/amqp_relay.ml index f057d38..506c486 100644 --- a/amqp_relay.ml +++ b/amqp_relay.ml @@ -406,4 +406,5 @@ let init () = Node.send_ignore "factory" (Message.create (Sexp.Str "fanout", Sexp.Arr [Sexp.Str "amq.fanout"], Sexp.Str "", Sexp.Str "")); - ignore (Util.create_thread "AMQP listener" None (Net.start_net "AMQP" Amqp_spec.port) start) + ignore (Util.create_daemon_thread + "AMQP listener" None (Net.start_net "AMQP" Amqp_spec.port) start) diff --git a/config.ml b/config.ml new file mode 100644 index 0000000..5706a1e --- /dev/null +++ b/config.ml @@ -0,0 +1,47 @@ +(* Copyright 2012 Tony Garnock-Jones . *) + +(* This file is part of Ocamlmsg. *) + +(* Ocamlmsg is free software: you can redistribute it and/or modify it *) +(* under the terms of the GNU General Public License as published by the *) +(* Free Software Foundation, either version 3 of the License, or (at your *) +(* option) any later version. *) + +(* Ocamlmsg is distributed in the hope that it will be useful, but *) +(* WITHOUT ANY WARRANTY; without even the implied warranty of *) +(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) +(* General Public License for more details. *) + +(* You should have received a copy of the GNU General Public License *) +(* along with Ocamlmsg. If not, see . *) + +open Hof + +let config = ref [] + +let get key = + try Some (List.assoc key !config) with Not_found -> None + +let get' key default_value = + try (List.assoc key !config) with Not_found -> default_value + +let push k v = + config := (k, v) :: !config + +let get_all key = + List.filter (fun (k, v) -> k = key) !config + |> List.rev_map (fun (k, v) -> v) + +let init () = + let argv = Sys.argv in + let argc = Array.length argv in + let rec loop index current_key = + if index >= argc + then () + else + (let opt = argv.(index) in + if Util.starts_with opt "--" + then loop (index + 1) (String.sub opt 2 (String.length opt - 2)) + else (push current_key opt; + loop (index + 1) current_key)) + in loop 1 "" diff --git a/net.ml b/net.ml index 9044cce..5581282 100644 --- a/net.ml +++ b/net.ml @@ -28,5 +28,6 @@ let start_net protocol_name port_number connection_start_fn = setsockopt sock SO_REUSEADDR true; bind sock (ADDR_INET (inet_addr_of_string "0.0.0.0", port_number)); listen sock 5; + Server_control.milestone (protocol_name ^ " ready"); Log.info "Accepting connections" [Sexp.Str protocol_name; Sexp.Str (string_of_int port_number)]; accept_loop sock connection_start_fn diff --git a/ocamlmsg.ml b/ocamlmsg.ml index f940a7b..92d0e85 100644 --- a/ocamlmsg.ml +++ b/ocamlmsg.ml @@ -23,11 +23,20 @@ let hook_log () = in Log.hook := new_hook +let create_ready_file () = + match Config.get "ready-file" with + | Some ready_file_path -> + Log.info "Creating ready file" [Sexp.Str ready_file_path]; + close_out (open_out ready_file_path) + | None -> + () + let _ = Printf.printf "%s %s, %s\n%s\n%!" App_info.product App_info.version App_info.copyright App_info.licence_blurb; Sys.set_signal Sys.sigpipe Sys.Signal_ignore; Uuid.init (); + Config.init (); Factory.init (); Queuenode.init (); Fanoutnode.init (); @@ -38,4 +47,11 @@ let _ = Ui_main.init (); Ui_relay.init (); (* Speedtest.init (); *) - Net.start_net "Hop" 5671 Relay.start + Relay.init (); + Server_control.run_until "AMQP ready"; + Server_control.run_until "HTTP ready"; + Server_control.run_until "Hop ready"; + if Server_control.is_running () + then (create_ready_file (); + Server_control.run_forever ()) + else () diff --git a/relay.ml b/relay.ml index 4a8dae2..1793bae 100644 --- a/relay.ml +++ b/relay.ml @@ -76,3 +76,6 @@ let relay_mainloop (peername, mtx, cin, cout) n = let start (s, peername) = Connections.start_connection "relay" issue_banner relay_boot relay_handler relay_mainloop (s, peername) + +let init () = + ignore (Util.create_daemon_thread "Hop listener" None (Net.start_net "Hop" 5671) start) diff --git a/server_control.ml b/server_control.ml new file mode 100644 index 0000000..c31a47f --- /dev/null +++ b/server_control.ml @@ -0,0 +1,64 @@ +(* Copyright 2012 Tony Garnock-Jones . *) + +(* This file is part of Ocamlmsg. *) + +(* Ocamlmsg is free software: you can redistribute it and/or modify it *) +(* under the terms of the GNU General Public License as published by the *) +(* Free Software Foundation, either version 3 of the License, or (at your *) +(* option) any later version. *) + +(* Ocamlmsg is distributed in the hope that it will be useful, but *) +(* WITHOUT ANY WARRANTY; without even the implied warranty of *) +(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) +(* General Public License for more details. *) + +(* You should have received a copy of the GNU General Public License *) +(* along with Ocamlmsg. If not, see . *) + +open Datastructures + +let continue_running = ref true +let control_queue = Squeue.create 1 + +let achieved_milestones = ref StringSet.empty + +let milestone name = + Squeue.add (`Milestone name) control_queue + +let shutdown_now details = + Squeue.add (`Shutdown details) control_queue + +let is_milestone_achieved m = + match m with + | Some m' -> + StringSet.mem m' !achieved_milestones + | None -> + false + +let rec run' until_milestone = + match is_milestone_achieved until_milestone with + | true -> + () + | false -> + (match Squeue.pop control_queue with + | `Shutdown details -> + Log.error "Shutting down server" details; + continue_running := false; + () + | `Milestone name -> + Log.info "Achieved milestone" [Sexp.Str name]; + achieved_milestones := StringSet.add name !achieved_milestones; + run' until_milestone) + +let is_running () = !continue_running + +let run_until milestone = + if !continue_running + then (Log.info "Waiting for milestone" [Sexp.Str milestone]; + run' (Some milestone)) + else () + +let run_forever () = + if !continue_running + then run' None + else () diff --git a/ui_main.ml b/ui_main.ml index 61a6539..e34a063 100644 --- a/ui_main.ml +++ b/ui_main.ml @@ -53,7 +53,7 @@ let handle_req id r = | "GET" | "HEAD" -> Httpd_file.resp_file (Filename.concat "./web" r.Httpd.path) | _ -> Httpd.http_error_html 400 ("Unsupported HTTP method "^r.Httpd.verb) [] -let cleanup_req id () = +let cleanup_req id exn = match Node.lookup id with | Some n -> Node.unbind_all n | None -> () @@ -92,4 +92,4 @@ let init () = register_dispatcher ("/_/server_stats", api_server_stats); register_dispatcher ("/_/nodes", api_nodes); register_dispatcher ("/_/node/", api_node_info); - ignore (Util.create_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start) + ignore (Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start) diff --git a/util.ml b/util.ml index 088495e..a72458f 100644 --- a/util.ml +++ b/util.ml @@ -28,11 +28,20 @@ let create_thread name cleanup main initarg = with e -> Log.warn "Thread died with exception" [Str name; Str (Printexc.to_string e)]; (match cleanup with - | Some cleaner -> cleaner () + | Some cleaner -> cleaner e | None -> ()) in Thread.create guarded_main initarg +let daemon_thread_died name nested_cleaner e = + (match nested_cleaner with + | Some c -> c e + | None -> ()); + Server_control.shutdown_now [Sexp.Str "Daemon thread exited"; Sexp.Str name] + +let create_daemon_thread name cleanup main initarg = + create_thread name (Some (daemon_thread_died name cleanup)) main initarg + let with_mutex m f arg = Mutex.lock m; try