From 5380080758fc425f2900a9b5e3555067adb388b3 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 4 Mar 2012 13:03:38 -0500 Subject: [PATCH] Split out net.ml; create amq.direct exchange --- amqp_relay.ml | 6 ++++++ net.ml | 15 +++++++++++++++ ocamlmsg.ml | 25 ++++--------------------- 3 files changed, 25 insertions(+), 21 deletions(-) create mode 100644 net.ml diff --git a/amqp_relay.ml b/amqp_relay.ml index 0b2cfde..8f7f6e8 100644 --- a/amqp_relay.ml +++ b/amqp_relay.ml @@ -365,3 +365,9 @@ let amqp_mainloop conn n = let start (s, peername) = Connections.start_connection "amqp" issue_banner amqp_boot amqp_handler amqp_mainloop (s, peername) + +let init () = + Node.send_ignore "factory" (Message.create (Sexp.Str "direct", + Sexp.Arr [Sexp.Str "amq.direct"], + Sexp.Str "", Sexp.Str "")); + ignore (Util.create_thread "AMQP listener" None (Net.start_net Amqp_spec.port) start) diff --git a/net.ml b/net.ml new file mode 100644 index 0000000..0b01229 --- /dev/null +++ b/net.ml @@ -0,0 +1,15 @@ +open Unix + +let rec accept_loop sock connection_start_fn = + let (s, peername) = accept sock in + setsockopt s TCP_NODELAY true; + ignore (connection_start_fn (s, peername)); + accept_loop sock connection_start_fn + +let start_net port_number connection_start_fn = + let sock = socket PF_INET SOCK_STREAM 0 in + setsockopt sock SO_REUSEADDR true; + bind sock (ADDR_INET (inet_addr_of_string "0.0.0.0", port_number)); + listen sock 5; + Log.info "Accepting connections" [Sexp.Str (string_of_int port_number)]; + accept_loop sock connection_start_fn diff --git a/ocamlmsg.ml b/ocamlmsg.ml index d5ba88b..d7deaec 100644 --- a/ocamlmsg.ml +++ b/ocamlmsg.ml @@ -1,21 +1,3 @@ -open Unix -open Printf -open Thread - -let rec accept_loop sock connection_start_fn = - let (s, peername) = accept sock in - setsockopt s TCP_NODELAY true; - ignore (connection_start_fn (s, peername)); - accept_loop sock connection_start_fn - -let start_net port_number connection_start_fn = - let sock = socket PF_INET SOCK_STREAM 0 in - setsockopt sock SO_REUSEADDR true; - bind sock (ADDR_INET (inet_addr_of_string "0.0.0.0", port_number)); - listen sock 5; - Log.info "Accepting connections" [Sexp.Str (string_of_int port_number)]; - accept_loop sock connection_start_fn - let hook_log () = let old_hook = !Log.hook in let new_hook label body = @@ -25,12 +7,13 @@ let hook_log () = Log.hook := new_hook let _ = - printf "%s %s, %s %s\n%!" App_info.product App_info.version App_info.copyright App_info.licence; + Printf.printf "%s %s, %s %s\n%!" + App_info.product App_info.version App_info.copyright App_info.licence; Sys.set_signal Sys.sigpipe Sys.Signal_ignore; Uuid.init (); Factory.init (); Queuenode.init (); Directnode.init (); hook_log (); - ignore (Util.create_thread "AMQP listener" None (start_net Amqp_spec.port) Amqp_relay.start); - start_net 5671 Relay.start + Amqp_relay.init (); + Net.start_net 5671 Relay.start