Config parsing; server control; milestones; ready-file.

This commit is contained in:
Tony Garnock-Jones 2012-05-01 15:37:39 -04:00
parent 829ab7b906
commit dd969a4b31
8 changed files with 146 additions and 5 deletions

View File

@ -406,4 +406,5 @@ let init () =
Node.send_ignore "factory" (Message.create (Sexp.Str "fanout",
Sexp.Arr [Sexp.Str "amq.fanout"],
Sexp.Str "", Sexp.Str ""));
ignore (Util.create_thread "AMQP listener" None (Net.start_net "AMQP" Amqp_spec.port) start)
ignore (Util.create_daemon_thread
"AMQP listener" None (Net.start_net "AMQP" Amqp_spec.port) start)

47
config.ml Normal file
View File

@ -0,0 +1,47 @@
(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Ocamlmsg. *)
(* Ocamlmsg is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Ocamlmsg is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Ocamlmsg. If not, see <http://www.gnu.org/licenses/>. *)
open Hof
let config = ref []
let get key =
try Some (List.assoc key !config) with Not_found -> None
let get' key default_value =
try (List.assoc key !config) with Not_found -> default_value
let push k v =
config := (k, v) :: !config
let get_all key =
List.filter (fun (k, v) -> k = key) !config
|> List.rev_map (fun (k, v) -> v)
let init () =
let argv = Sys.argv in
let argc = Array.length argv in
let rec loop index current_key =
if index >= argc
then ()
else
(let opt = argv.(index) in
if Util.starts_with opt "--"
then loop (index + 1) (String.sub opt 2 (String.length opt - 2))
else (push current_key opt;
loop (index + 1) current_key))
in loop 1 ""

1
net.ml
View File

@ -28,5 +28,6 @@ let start_net protocol_name port_number connection_start_fn =
setsockopt sock SO_REUSEADDR true;
bind sock (ADDR_INET (inet_addr_of_string "0.0.0.0", port_number));
listen sock 5;
Server_control.milestone (protocol_name ^ " ready");
Log.info "Accepting connections" [Sexp.Str protocol_name; Sexp.Str (string_of_int port_number)];
accept_loop sock connection_start_fn

View File

@ -23,11 +23,20 @@ let hook_log () =
in
Log.hook := new_hook
let create_ready_file () =
match Config.get "ready-file" with
| Some ready_file_path ->
Log.info "Creating ready file" [Sexp.Str ready_file_path];
close_out (open_out ready_file_path)
| None ->
()
let _ =
Printf.printf "%s %s, %s\n%s\n%!"
App_info.product App_info.version App_info.copyright App_info.licence_blurb;
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
Uuid.init ();
Config.init ();
Factory.init ();
Queuenode.init ();
Fanoutnode.init ();
@ -38,4 +47,11 @@ let _ =
Ui_main.init ();
Ui_relay.init ();
(* Speedtest.init (); *)
Net.start_net "Hop" 5671 Relay.start
Relay.init ();
Server_control.run_until "AMQP ready";
Server_control.run_until "HTTP ready";
Server_control.run_until "Hop ready";
if Server_control.is_running ()
then (create_ready_file ();
Server_control.run_forever ())
else ()

View File

@ -76,3 +76,6 @@ let relay_mainloop (peername, mtx, cin, cout) n =
let start (s, peername) =
Connections.start_connection "relay" issue_banner
relay_boot relay_handler relay_mainloop (s, peername)
let init () =
ignore (Util.create_daemon_thread "Hop listener" None (Net.start_net "Hop" 5671) start)

64
server_control.ml Normal file
View File

@ -0,0 +1,64 @@
(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Ocamlmsg. *)
(* Ocamlmsg is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Ocamlmsg is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Ocamlmsg. If not, see <http://www.gnu.org/licenses/>. *)
open Datastructures
let continue_running = ref true
let control_queue = Squeue.create 1
let achieved_milestones = ref StringSet.empty
let milestone name =
Squeue.add (`Milestone name) control_queue
let shutdown_now details =
Squeue.add (`Shutdown details) control_queue
let is_milestone_achieved m =
match m with
| Some m' ->
StringSet.mem m' !achieved_milestones
| None ->
false
let rec run' until_milestone =
match is_milestone_achieved until_milestone with
| true ->
()
| false ->
(match Squeue.pop control_queue with
| `Shutdown details ->
Log.error "Shutting down server" details;
continue_running := false;
()
| `Milestone name ->
Log.info "Achieved milestone" [Sexp.Str name];
achieved_milestones := StringSet.add name !achieved_milestones;
run' until_milestone)
let is_running () = !continue_running
let run_until milestone =
if !continue_running
then (Log.info "Waiting for milestone" [Sexp.Str milestone];
run' (Some milestone))
else ()
let run_forever () =
if !continue_running
then run' None
else ()

View File

@ -53,7 +53,7 @@ let handle_req id r =
| "GET" | "HEAD" -> Httpd_file.resp_file (Filename.concat "./web" r.Httpd.path)
| _ -> Httpd.http_error_html 400 ("Unsupported HTTP method "^r.Httpd.verb) []
let cleanup_req id () =
let cleanup_req id exn =
match Node.lookup id with
| Some n -> Node.unbind_all n
| None -> ()
@ -92,4 +92,4 @@ let init () =
register_dispatcher ("/_/server_stats", api_server_stats);
register_dispatcher ("/_/nodes", api_nodes);
register_dispatcher ("/_/node/", api_node_info);
ignore (Util.create_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start)
ignore (Util.create_daemon_thread "HTTP listener" None (Net.start_net "HTTP" 5678) start)

11
util.ml
View File

@ -28,11 +28,20 @@ let create_thread name cleanup main initarg =
with e ->
Log.warn "Thread died with exception" [Str name; Str (Printexc.to_string e)];
(match cleanup with
| Some cleaner -> cleaner ()
| Some cleaner -> cleaner e
| None -> ())
in
Thread.create guarded_main initarg
let daemon_thread_died name nested_cleaner e =
(match nested_cleaner with
| Some c -> c e
| None -> ());
Server_control.shutdown_now [Sexp.Str "Daemon thread exited"; Sexp.Str name]
let create_daemon_thread name cleanup main initarg =
create_thread name (Some (daemon_thread_died name cleanup)) main initarg
let with_mutex m f arg =
Mutex.lock m;
try