Put cmsg-compatible metamessages back
This commit is contained in:
parent
daa6cd717c
commit
a7dded3b99
|
@ -14,7 +14,7 @@ let classname = "direct"
|
|||
let unsubscribe info uuid =
|
||||
Util.with_mutex0 info.mtx
|
||||
(fun () ->
|
||||
match Subscription.delete info.subscriptions uuid with
|
||||
match Subscription.delete info.name info.subscriptions uuid with
|
||||
| Some sub ->
|
||||
(match sub.Subscription.filter with
|
||||
| Str binding_key ->
|
||||
|
@ -46,7 +46,8 @@ let route_message info n sexp =
|
|||
Util.with_mutex0 info.mtx
|
||||
(fun () ->
|
||||
let sub =
|
||||
Subscription.create info.subscriptions filter sink name reply_sink reply_name in
|
||||
Subscription.create
|
||||
info.name info.subscriptions filter sink name reply_sink reply_name in
|
||||
let old_set =
|
||||
(try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in
|
||||
let new_set = UuidSet.add sub.Subscription.uuid old_set in
|
||||
|
|
|
@ -8,9 +8,10 @@ let classes = ref StringMap.empty
|
|||
|
||||
let register_class name factory =
|
||||
if StringMap.mem name !classes
|
||||
then (fprintf stderr "ERROR: Duplicate node class name %s\n%!" name;
|
||||
then (Log.error "Duplicate node class name" [Str name];
|
||||
exit 1)
|
||||
else classes := StringMap.add name factory !classes
|
||||
else (Log.info "Registered node class" [Str name];
|
||||
classes := StringMap.add name factory !classes)
|
||||
|
||||
let lookup_class name =
|
||||
try Some (StringMap.find name !classes)
|
||||
|
|
|
@ -12,7 +12,7 @@ let classname = "fanout"
|
|||
|
||||
let unsubscribe info uuid =
|
||||
Util.with_mutex0 info.mtx
|
||||
(fun () -> ignore (Subscription.delete info.subscriptions uuid))
|
||||
(fun () -> ignore (Subscription.delete info.name info.subscriptions uuid))
|
||||
|
||||
let route_message info n sexp =
|
||||
match Message.message_of_sexp sexp with
|
||||
|
@ -25,7 +25,8 @@ let route_message info n sexp =
|
|||
| Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) ->
|
||||
Util.with_mutex0 info.mtx
|
||||
(fun () ->
|
||||
ignore (Subscription.create info.subscriptions filter sink name reply_sink reply_name))
|
||||
ignore (Subscription.create
|
||||
info.name info.subscriptions filter sink name reply_sink reply_name))
|
||||
| Message.Unsubscribe (Str token) ->
|
||||
unsubscribe info token
|
||||
| m ->
|
||||
|
|
|
@ -11,6 +11,14 @@
|
|||
"selector": "create-failed",
|
||||
"args": ["reason"]
|
||||
},
|
||||
{
|
||||
"selector": "subscribed",
|
||||
"args": ["source", "filter", "sink", "name"]
|
||||
},
|
||||
{
|
||||
"selector": "unsubscribed",
|
||||
"args": ["source", "filter", "sink", "name"]
|
||||
},
|
||||
{
|
||||
"selector": "post",
|
||||
"args": ["name", "body", "token"]
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
open Sexp
|
||||
|
||||
let announce_subscription source filter sink name on_off =
|
||||
Node.post_ignore "meta" (Str source)
|
||||
(if on_off
|
||||
then Message.subscribed (Str source, filter, Str sink, name)
|
||||
else Message.unsubscribed (Str source, filter, Str sink, name))
|
||||
(Str "")
|
||||
|
||||
let init () =
|
||||
Node.send_ignore "factory" (Message.create (Str "direct", Arr [Str "meta"], Str "", Str ""))
|
|
@ -15,6 +15,7 @@ let _ =
|
|||
Queuenode.init ();
|
||||
Fanoutnode.init ();
|
||||
Directnode.init ();
|
||||
Meta.init ();
|
||||
hook_log ();
|
||||
Amqp_relay.init ();
|
||||
(* Speedtest.init (); *)
|
||||
|
|
|
@ -29,7 +29,7 @@ let rec do_burst info n =
|
|||
let (body, new_backlog) = Fqueue.really_pop_front info.backlog in
|
||||
let (sub, new_waiters) = Fqueue.really_pop_front info.waiters in
|
||||
info.waiters <- new_waiters;
|
||||
if Subscription.send_to_subscription info.subscriptions sub body
|
||||
if Subscription.send_to_subscription info.name info.subscriptions sub body
|
||||
then
|
||||
(info.waiters <- Fqueue.push_back info.waiters sub;
|
||||
info.backlog <- new_backlog;
|
||||
|
@ -53,11 +53,12 @@ let shoveller info =
|
|||
loop ()
|
||||
| Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) ->
|
||||
let sub =
|
||||
Subscription.create info.subscriptions filter sink name reply_sink reply_name in
|
||||
Subscription.create
|
||||
info.name info.subscriptions filter sink name reply_sink reply_name in
|
||||
info.waiters <- Fqueue.push_back info.waiters sub;
|
||||
loop ()
|
||||
| Message.Unsubscribe (Str token) ->
|
||||
ignore (Subscription.delete info.subscriptions token);
|
||||
ignore (Subscription.delete info.name info.subscriptions token);
|
||||
loop ()
|
||||
| m ->
|
||||
Util.message_not_understood "queue" m;
|
||||
|
|
|
@ -12,7 +12,7 @@ type set_t = t StringMap.t ref
|
|||
|
||||
let new_set () = ref StringMap.empty
|
||||
|
||||
let create subs filter sink name reply_sink reply_name =
|
||||
let create source subs filter sink name reply_sink reply_name =
|
||||
let uuid = Uuid.create () in
|
||||
let sub = {
|
||||
live = true;
|
||||
|
@ -22,14 +22,16 @@ let create subs filter sink name reply_sink reply_name =
|
|||
name = name
|
||||
} in
|
||||
subs := StringMap.add uuid sub !subs;
|
||||
Meta.announce_subscription source filter sink name true;
|
||||
Node.post_ignore reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "");
|
||||
sub
|
||||
|
||||
let delete subs uuid =
|
||||
let delete source subs uuid =
|
||||
try
|
||||
let sub = StringMap.find uuid !subs in
|
||||
sub.live <- false;
|
||||
subs := StringMap.remove uuid !subs;
|
||||
Meta.announce_subscription source sub.filter sub.sink sub.name false;
|
||||
Some sub
|
||||
with Not_found ->
|
||||
None
|
||||
|
@ -46,5 +48,5 @@ let send_to_subscription' sub body delete_action =
|
|||
then true
|
||||
else (delete_action sub.uuid; false)
|
||||
|
||||
let send_to_subscription subs sub body =
|
||||
send_to_subscription' sub body (fun (uuid) -> delete subs uuid)
|
||||
let send_to_subscription source subs sub body =
|
||||
send_to_subscription' sub body (fun (uuid) -> delete source subs uuid)
|
||||
|
|
Loading…
Reference in New Issue