Clean up Subscription API a little
This commit is contained in:
parent
dbe0124314
commit
3868235d92
|
@ -14,7 +14,7 @@ let classname = "direct"
|
|||
let unsubscribe info uuid =
|
||||
Util.with_mutex0 info.mtx
|
||||
(fun () ->
|
||||
match Subscription.delete info.name info.subscriptions uuid with
|
||||
match Subscription.delete info.subscriptions uuid with
|
||||
| Some sub ->
|
||||
(match sub.Subscription.filter with
|
||||
| Str binding_key ->
|
||||
|
@ -38,17 +38,15 @@ let route_message info n sexp =
|
|||
(fun (uuid) ->
|
||||
match Subscription.lookup info.subscriptions uuid with
|
||||
| Some sub ->
|
||||
ignore (Subscription.send_to_subscription' info.name info.subscriptions sub body
|
||||
(unsubscribe info))
|
||||
| None -> ())
|
||||
ignore (Subscription.send_to_subscription' sub body (unsubscribe info))
|
||||
| None ->
|
||||
())
|
||||
matching
|
||||
| Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) ->
|
||||
Util.with_mutex0 info.mtx
|
||||
(fun () ->
|
||||
let sub =
|
||||
Subscription.create info.name info.subscriptions
|
||||
filter sink name reply_sink reply_name
|
||||
in
|
||||
Subscription.create info.subscriptions filter sink name reply_sink reply_name in
|
||||
let old_set =
|
||||
(try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in
|
||||
let new_set = UuidSet.add sub.Subscription.uuid old_set in
|
||||
|
|
|
@ -29,7 +29,7 @@ let rec do_burst info n =
|
|||
let (body, new_backlog) = Fqueue.really_pop_front info.backlog in
|
||||
let (sub, new_waiters) = Fqueue.really_pop_front info.waiters in
|
||||
info.waiters <- new_waiters;
|
||||
if Subscription.send_to_subscription info.name info.subscriptions sub body
|
||||
if Subscription.send_to_subscription info.subscriptions sub body
|
||||
then
|
||||
(info.waiters <- Fqueue.push_back info.waiters sub;
|
||||
info.backlog <- new_backlog;
|
||||
|
@ -53,13 +53,11 @@ let shoveller info =
|
|||
loop ()
|
||||
| Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) ->
|
||||
let sub =
|
||||
Subscription.create info.name info.subscriptions
|
||||
filter sink name reply_sink reply_name
|
||||
in
|
||||
Subscription.create info.subscriptions filter sink name reply_sink reply_name in
|
||||
info.waiters <- Fqueue.push_back info.waiters sub;
|
||||
loop ()
|
||||
| Message.Unsubscribe (Str token) ->
|
||||
ignore (Subscription.delete info.name info.subscriptions token);
|
||||
ignore (Subscription.delete info.subscriptions token);
|
||||
loop ()
|
||||
| m ->
|
||||
Util.message_not_understood "queue" m;
|
||||
|
|
|
@ -12,7 +12,7 @@ type set_t = t StringMap.t ref
|
|||
|
||||
let new_set () = ref StringMap.empty
|
||||
|
||||
let create source subs filter sink name reply_sink reply_name =
|
||||
let create subs filter sink name reply_sink reply_name =
|
||||
let uuid = Uuid.create () in
|
||||
let sub = {
|
||||
live = true;
|
||||
|
@ -25,7 +25,7 @@ let create source subs filter sink name reply_sink reply_name =
|
|||
Node.post_ignore reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "");
|
||||
sub
|
||||
|
||||
let delete source subs uuid =
|
||||
let delete subs uuid =
|
||||
try
|
||||
let sub = StringMap.find uuid !subs in
|
||||
sub.live <- false;
|
||||
|
@ -38,7 +38,7 @@ let lookup subs uuid =
|
|||
try Some (StringMap.find uuid !subs)
|
||||
with Not_found -> None
|
||||
|
||||
let send_to_subscription' source subs sub body delete_action =
|
||||
let send_to_subscription' sub body delete_action =
|
||||
if not sub.live
|
||||
then false
|
||||
else
|
||||
|
@ -46,5 +46,5 @@ let send_to_subscription' source subs sub body delete_action =
|
|||
then true
|
||||
else (delete_action sub.uuid; false)
|
||||
|
||||
let send_to_subscription source subs sub body =
|
||||
send_to_subscription' source subs sub body (fun (uuid) -> delete source subs uuid)
|
||||
let send_to_subscription subs sub body =
|
||||
send_to_subscription' sub body (fun (uuid) -> delete subs uuid)
|
||||
|
|
Loading…
Reference in New Issue