From 740d67ed0276aa992b25ac398c06e755995c5031 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Mon, 5 Mar 2012 16:56:43 -0500 Subject: [PATCH] Add fanout exchange-type and create amq.fanout exchange at startup --- amqp_relay.ml | 3 +++ fanoutnode.ml | 47 +++++++++++++++++++++++++++++++++++++++++++++++ ocamlmsg.ml | 1 + 3 files changed, 51 insertions(+) create mode 100644 fanoutnode.ml diff --git a/amqp_relay.ml b/amqp_relay.ml index 95ecdfe..f768f8b 100644 --- a/amqp_relay.ml +++ b/amqp_relay.ml @@ -384,4 +384,7 @@ let init () = Node.send_ignore "factory" (Message.create (Sexp.Str "direct", Sexp.Arr [Sexp.Str "amq.direct"], Sexp.Str "", Sexp.Str "")); + Node.send_ignore "factory" (Message.create (Sexp.Str "fanout", + Sexp.Arr [Sexp.Str "amq.fanout"], + Sexp.Str "", Sexp.Str "")); ignore (Util.create_thread "AMQP listener" None (Net.start_net Amqp_spec.port) start) diff --git a/fanoutnode.ml b/fanoutnode.ml new file mode 100644 index 0000000..88bf5c5 --- /dev/null +++ b/fanoutnode.ml @@ -0,0 +1,47 @@ +open Sexp +open Datastructures +open Status + +type t = { + name: string; + subscriptions: Subscription.set_t; + mtx: Mutex.t; + } + +let classname = "fanout" + +let unsubscribe info uuid = + Util.with_mutex0 info.mtx + (fun () -> ignore (Subscription.delete info.subscriptions uuid)) + +let route_message info n sexp = + match Message.message_of_sexp sexp with + | Message.Post (Str name, body, token) -> + let snapshot = !(info.subscriptions) in + StringMap.iter + (fun uuid sub -> + ignore (Subscription.send_to_subscription' sub body (unsubscribe info))) + snapshot + | Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) -> + Util.with_mutex0 info.mtx + (fun () -> + ignore (Subscription.create info.subscriptions filter sink name reply_sink reply_name)) + | Message.Unsubscribe (Str token) -> + unsubscribe info token + | m -> + Util.message_not_understood classname m + +let factory arg = + match arg with + | (Arr [Str name]) -> + let info = { + name = name; + subscriptions = Subscription.new_set (); + mtx = Mutex.create (); + } in + replace_ok (Node.make_idempotent_named classname name (route_message info)) (Str name) + | _ -> + Problem (Str "bad-arg") + +let init () = + Factory.register_class classname factory diff --git a/ocamlmsg.ml b/ocamlmsg.ml index d7deaec..fc6647e 100644 --- a/ocamlmsg.ml +++ b/ocamlmsg.ml @@ -13,6 +13,7 @@ let _ = Uuid.init (); Factory.init (); Queuenode.init (); + Fanoutnode.init (); Directnode.init (); hook_log (); Amqp_relay.init ();