diff --git a/directnode.ml b/directnode.ml index 1485233..f54f11d 100644 --- a/directnode.ml +++ b/directnode.ml @@ -14,7 +14,7 @@ let classname = "direct" let unsubscribe info uuid = Util.with_mutex0 info.mtx (fun () -> - match Subscription.delete info.name info.subscriptions uuid with + match Subscription.delete info.subscriptions uuid with | Some sub -> (match sub.Subscription.filter with | Str binding_key -> @@ -38,17 +38,15 @@ let route_message info n sexp = (fun (uuid) -> match Subscription.lookup info.subscriptions uuid with | Some sub -> - ignore (Subscription.send_to_subscription' info.name info.subscriptions sub body - (unsubscribe info)) - | None -> ()) + ignore (Subscription.send_to_subscription' sub body (unsubscribe info)) + | None -> + ()) matching | Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) -> Util.with_mutex0 info.mtx (fun () -> let sub = - Subscription.create info.name info.subscriptions - filter sink name reply_sink reply_name - in + Subscription.create info.subscriptions filter sink name reply_sink reply_name in let old_set = (try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in let new_set = UuidSet.add sub.Subscription.uuid old_set in diff --git a/queuenode.ml b/queuenode.ml index bbcde71..2cab05e 100644 --- a/queuenode.ml +++ b/queuenode.ml @@ -29,7 +29,7 @@ let rec do_burst info n = let (body, new_backlog) = Fqueue.really_pop_front info.backlog in let (sub, new_waiters) = Fqueue.really_pop_front info.waiters in info.waiters <- new_waiters; - if Subscription.send_to_subscription info.name info.subscriptions sub body + if Subscription.send_to_subscription info.subscriptions sub body then (info.waiters <- Fqueue.push_back info.waiters sub; info.backlog <- new_backlog; @@ -53,13 +53,11 @@ let shoveller info = loop () | Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) -> let sub = - Subscription.create info.name info.subscriptions - filter sink name reply_sink reply_name - in + Subscription.create info.subscriptions filter sink name reply_sink reply_name in info.waiters <- Fqueue.push_back info.waiters sub; loop () | Message.Unsubscribe (Str token) -> - ignore (Subscription.delete info.name info.subscriptions token); + ignore (Subscription.delete info.subscriptions token); loop () | m -> Util.message_not_understood "queue" m; diff --git a/subscription.ml b/subscription.ml index d7d6e95..2c66bde 100644 --- a/subscription.ml +++ b/subscription.ml @@ -12,7 +12,7 @@ type set_t = t StringMap.t ref let new_set () = ref StringMap.empty -let create source subs filter sink name reply_sink reply_name = +let create subs filter sink name reply_sink reply_name = let uuid = Uuid.create () in let sub = { live = true; @@ -25,7 +25,7 @@ let create source subs filter sink name reply_sink reply_name = Node.post_ignore reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str ""); sub -let delete source subs uuid = +let delete subs uuid = try let sub = StringMap.find uuid !subs in sub.live <- false; @@ -38,7 +38,7 @@ let lookup subs uuid = try Some (StringMap.find uuid !subs) with Not_found -> None -let send_to_subscription' source subs sub body delete_action = +let send_to_subscription' sub body delete_action = if not sub.live then false else @@ -46,5 +46,5 @@ let send_to_subscription' source subs sub body delete_action = then true else (delete_action sub.uuid; false) -let send_to_subscription source subs sub body = - send_to_subscription' source subs sub body (fun (uuid) -> delete source subs uuid) +let send_to_subscription subs sub body = + send_to_subscription' sub body (fun (uuid) -> delete subs uuid)