Add fanout exchange-type and create amq.fanout exchange at startup

This commit is contained in:
Tony Garnock-Jones 2012-03-05 16:56:43 -05:00
parent 3868235d92
commit 740d67ed02
3 changed files with 51 additions and 0 deletions

View File

@ -384,4 +384,7 @@ let init () =
Node.send_ignore "factory" (Message.create (Sexp.Str "direct",
Sexp.Arr [Sexp.Str "amq.direct"],
Sexp.Str "", Sexp.Str ""));
Node.send_ignore "factory" (Message.create (Sexp.Str "fanout",
Sexp.Arr [Sexp.Str "amq.fanout"],
Sexp.Str "", Sexp.Str ""));
ignore (Util.create_thread "AMQP listener" None (Net.start_net Amqp_spec.port) start)

47
fanoutnode.ml Normal file
View File

@ -0,0 +1,47 @@
open Sexp
open Datastructures
open Status
type t = {
name: string;
subscriptions: Subscription.set_t;
mtx: Mutex.t;
}
let classname = "fanout"
let unsubscribe info uuid =
Util.with_mutex0 info.mtx
(fun () -> ignore (Subscription.delete info.subscriptions uuid))
let route_message info n sexp =
match Message.message_of_sexp sexp with
| Message.Post (Str name, body, token) ->
let snapshot = !(info.subscriptions) in
StringMap.iter
(fun uuid sub ->
ignore (Subscription.send_to_subscription' sub body (unsubscribe info)))
snapshot
| Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) ->
Util.with_mutex0 info.mtx
(fun () ->
ignore (Subscription.create info.subscriptions filter sink name reply_sink reply_name))
| Message.Unsubscribe (Str token) ->
unsubscribe info token
| m ->
Util.message_not_understood classname m
let factory arg =
match arg with
| (Arr [Str name]) ->
let info = {
name = name;
subscriptions = Subscription.new_set ();
mtx = Mutex.create ();
} in
replace_ok (Node.make_idempotent_named classname name (route_message info)) (Str name)
| _ ->
Problem (Str "bad-arg")
let init () =
Factory.register_class classname factory

View File

@ -13,6 +13,7 @@ let _ =
Uuid.init ();
Factory.init ();
Queuenode.init ();
Fanoutnode.init ();
Directnode.init ();
hook_log ();
Amqp_relay.init ();