From a7dded3b99f499677f95b6943c5655a329a40401 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Tue, 6 Mar 2012 17:05:57 -0500 Subject: [PATCH] Put cmsg-compatible metamessages back --- directnode.ml | 5 +++-- factory.ml | 5 +++-- fanoutnode.ml | 5 +++-- messages.json | 8 ++++++++ meta.ml | 11 +++++++++++ ocamlmsg.ml | 1 + queuenode.ml | 7 ++++--- subscription.ml | 10 ++++++---- 8 files changed, 39 insertions(+), 13 deletions(-) create mode 100644 meta.ml diff --git a/directnode.ml b/directnode.ml index f54f11d..ca43e38 100644 --- a/directnode.ml +++ b/directnode.ml @@ -14,7 +14,7 @@ let classname = "direct" let unsubscribe info uuid = Util.with_mutex0 info.mtx (fun () -> - match Subscription.delete info.subscriptions uuid with + match Subscription.delete info.name info.subscriptions uuid with | Some sub -> (match sub.Subscription.filter with | Str binding_key -> @@ -46,7 +46,8 @@ let route_message info n sexp = Util.with_mutex0 info.mtx (fun () -> let sub = - Subscription.create info.subscriptions filter sink name reply_sink reply_name in + Subscription.create + info.name info.subscriptions filter sink name reply_sink reply_name in let old_set = (try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in let new_set = UuidSet.add sub.Subscription.uuid old_set in diff --git a/factory.ml b/factory.ml index cbd5b65..8d3b02d 100644 --- a/factory.ml +++ b/factory.ml @@ -8,9 +8,10 @@ let classes = ref StringMap.empty let register_class name factory = if StringMap.mem name !classes - then (fprintf stderr "ERROR: Duplicate node class name %s\n%!" name; + then (Log.error "Duplicate node class name" [Str name]; exit 1) - else classes := StringMap.add name factory !classes + else (Log.info "Registered node class" [Str name]; + classes := StringMap.add name factory !classes) let lookup_class name = try Some (StringMap.find name !classes) diff --git a/fanoutnode.ml b/fanoutnode.ml index 88bf5c5..ad55f98 100644 --- a/fanoutnode.ml +++ b/fanoutnode.ml @@ -12,7 +12,7 @@ let classname = "fanout" let unsubscribe info uuid = Util.with_mutex0 info.mtx - (fun () -> ignore (Subscription.delete info.subscriptions uuid)) + (fun () -> ignore (Subscription.delete info.name info.subscriptions uuid)) let route_message info n sexp = match Message.message_of_sexp sexp with @@ -25,7 +25,8 @@ let route_message info n sexp = | Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) -> Util.with_mutex0 info.mtx (fun () -> - ignore (Subscription.create info.subscriptions filter sink name reply_sink reply_name)) + ignore (Subscription.create + info.name info.subscriptions filter sink name reply_sink reply_name)) | Message.Unsubscribe (Str token) -> unsubscribe info token | m -> diff --git a/messages.json b/messages.json index e9ba48b..0397e1d 100644 --- a/messages.json +++ b/messages.json @@ -11,6 +11,14 @@ "selector": "create-failed", "args": ["reason"] }, + { + "selector": "subscribed", + "args": ["source", "filter", "sink", "name"] + }, + { + "selector": "unsubscribed", + "args": ["source", "filter", "sink", "name"] + }, { "selector": "post", "args": ["name", "body", "token"] diff --git a/meta.ml b/meta.ml new file mode 100644 index 0000000..7c6f671 --- /dev/null +++ b/meta.ml @@ -0,0 +1,11 @@ +open Sexp + +let announce_subscription source filter sink name on_off = + Node.post_ignore "meta" (Str source) + (if on_off + then Message.subscribed (Str source, filter, Str sink, name) + else Message.unsubscribed (Str source, filter, Str sink, name)) + (Str "") + +let init () = + Node.send_ignore "factory" (Message.create (Str "direct", Arr [Str "meta"], Str "", Str "")) diff --git a/ocamlmsg.ml b/ocamlmsg.ml index 5db9cc2..8113234 100644 --- a/ocamlmsg.ml +++ b/ocamlmsg.ml @@ -15,6 +15,7 @@ let _ = Queuenode.init (); Fanoutnode.init (); Directnode.init (); + Meta.init (); hook_log (); Amqp_relay.init (); (* Speedtest.init (); *) diff --git a/queuenode.ml b/queuenode.ml index 2cab05e..6982ca3 100644 --- a/queuenode.ml +++ b/queuenode.ml @@ -29,7 +29,7 @@ let rec do_burst info n = let (body, new_backlog) = Fqueue.really_pop_front info.backlog in let (sub, new_waiters) = Fqueue.really_pop_front info.waiters in info.waiters <- new_waiters; - if Subscription.send_to_subscription info.subscriptions sub body + if Subscription.send_to_subscription info.name info.subscriptions sub body then (info.waiters <- Fqueue.push_back info.waiters sub; info.backlog <- new_backlog; @@ -53,11 +53,12 @@ let shoveller info = loop () | Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) -> let sub = - Subscription.create info.subscriptions filter sink name reply_sink reply_name in + Subscription.create + info.name info.subscriptions filter sink name reply_sink reply_name in info.waiters <- Fqueue.push_back info.waiters sub; loop () | Message.Unsubscribe (Str token) -> - ignore (Subscription.delete info.subscriptions token); + ignore (Subscription.delete info.name info.subscriptions token); loop () | m -> Util.message_not_understood "queue" m; diff --git a/subscription.ml b/subscription.ml index 2c66bde..72dc341 100644 --- a/subscription.ml +++ b/subscription.ml @@ -12,7 +12,7 @@ type set_t = t StringMap.t ref let new_set () = ref StringMap.empty -let create subs filter sink name reply_sink reply_name = +let create source subs filter sink name reply_sink reply_name = let uuid = Uuid.create () in let sub = { live = true; @@ -22,14 +22,16 @@ let create subs filter sink name reply_sink reply_name = name = name } in subs := StringMap.add uuid sub !subs; + Meta.announce_subscription source filter sink name true; Node.post_ignore reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str ""); sub -let delete subs uuid = +let delete source subs uuid = try let sub = StringMap.find uuid !subs in sub.live <- false; subs := StringMap.remove uuid !subs; + Meta.announce_subscription source sub.filter sub.sink sub.name false; Some sub with Not_found -> None @@ -46,5 +48,5 @@ let send_to_subscription' sub body delete_action = then true else (delete_action sub.uuid; false) -let send_to_subscription subs sub body = - send_to_subscription' sub body (fun (uuid) -> delete subs uuid) +let send_to_subscription source subs sub body = + send_to_subscription' sub body (fun (uuid) -> delete source subs uuid)