From e094c17f737d81f121a94e77cbe1d5e5a0ad6c05 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 10 May 2012 15:22:58 -0400 Subject: [PATCH] Dedup subscriptions --- datastructures.ml | 1 + directnode.ml | 19 ++++++++------ fanoutnode.ml | 2 +- queuenode.ml | 15 ++++++----- sexp.ml | 2 ++ subscription.ml | 67 +++++++++++++++++++++++++++++++---------------- 6 files changed, 69 insertions(+), 37 deletions(-) diff --git a/datastructures.ml b/datastructures.ml index 1b2d987..093c136 100644 --- a/datastructures.ml +++ b/datastructures.ml @@ -17,6 +17,7 @@ module StringSet = Set.Make(String) module StringMap = Map.Make(String) +module SexpMap = Map.Make(Sexp) module UuidSet = StringSet let string_map_keys m = StringMap.fold (fun k _ acc -> k :: acc) m [] diff --git a/directnode.ml b/directnode.ml index cc503c3..a9e7993 100644 --- a/directnode.ml +++ b/directnode.ml @@ -60,14 +60,17 @@ let route_message info n sexp = return ()) (UuidSet.elements matching) | Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) -> - lwt sub = - Subscription.create - info.name info.subscriptions filter sink name reply_sink reply_name in - let old_set = - (try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in - let new_set = UuidSet.add sub.Subscription.uuid old_set in - info.routing_table <- StringMap.add binding_key new_set info.routing_table; - return () + (match_lwt + Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name + with + | Subscription.New sub -> + let old_set = + (try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in + let new_set = UuidSet.add sub.Subscription.uuid old_set in + info.routing_table <- StringMap.add binding_key new_set info.routing_table; + return () + | Subscription.Old sub -> + return ()) | Message.Unsubscribe (Str token) -> unsubscribe info token | m -> diff --git a/fanoutnode.ml b/fanoutnode.ml index b726a29..dcfdf7a 100644 --- a/fanoutnode.ml +++ b/fanoutnode.ml @@ -33,7 +33,7 @@ let unsubscribe info uuid = let route_message info n sexp = match Message.message_of_sexp sexp with | Message.Post (Str name, body, token) -> - let snapshot = !(info.subscriptions) in + let snapshot = info.subscriptions.Subscription.uuid_table in Lwt_list.iter_s (fun (uuid, sub) -> lwt _ = Subscription.send_to_subscription' sub body (unsubscribe info) in return ()) diff --git a/queuenode.ml b/queuenode.ml index b32980b..00b57dc 100644 --- a/queuenode.ml +++ b/queuenode.ml @@ -70,12 +70,15 @@ let queue_handler info n sexp = info.backlog_out (Some body); return () | Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) -> - lwt sub = - Subscription.create - info.name info.subscriptions filter sink name reply_sink reply_name in - info.waiters <- info.waiters + 1; - info.waiters_out (Some sub); - return () + (match_lwt + Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name + with + | Subscription.New sub -> + info.waiters <- info.waiters + 1; + info.waiters_out (Some sub); + return () + | Subscription.Old sub -> + return ()) | Message.Unsubscribe (Str token) -> ignore (Subscription.delete info.name info.subscriptions token); info.waiters <- info.waiters - 1; diff --git a/sexp.ml b/sexp.ml index 05e5782..37332fe 100644 --- a/sexp.ml +++ b/sexp.ml @@ -28,6 +28,8 @@ and t = | Hint of display_hint_t | Arr of t list +let compare a b = Pervasives.compare a b + let generic_output_sexp write x = let rec walk x = match x with diff --git a/subscription.ml b/subscription.ml index ed88f01..c462b23 100644 --- a/subscription.ml +++ b/subscription.ml @@ -26,41 +26,64 @@ type t = { name: Sexp.t } -type set_t = t StringMap.t ref +type creation_t = + | Old of t + | New of t -let new_set () = ref StringMap.empty +type set_t = { + mutable subscription_table: Uuid.t SexpMap.t; + mutable uuid_table: t StringMap.t +} -let count subs = StringMap.cardinal !subs +let new_set () = { + subscription_table = SexpMap.empty; + uuid_table = StringMap.empty +} + +let count subs = SexpMap.cardinal subs.subscription_table + +let key_from sink_str name filter = Sexp.Arr [Sexp.Str sink_str; name; filter] let create source subs filter sink_str name reply_sink reply_name = - let uuid = Uuid.create () in - let sink = Node.name_of_string sink_str in - let sub = { - live = true; - uuid = uuid; - filter = filter; - sink = sink; - name = name - } in - subs := StringMap.add uuid sub !subs; - lwt () = Lwt.join [ - Meta.announce_subscription source filter sink_str name true; - Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "") - ] in - return sub + let key = key_from sink_str name filter in + try + let uuid = SexpMap.find key subs.subscription_table in + lwt () = + Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "") + in + return (Old (StringMap.find uuid subs.uuid_table)) + with Not_found -> + let uuid = Uuid.create () in + let sink = Node.name_of_string sink_str in + let sub = { + live = true; + uuid = uuid; + filter = filter; + sink = sink; + name = name + } in + subs.uuid_table <- StringMap.add uuid sub subs.uuid_table; + subs.subscription_table <- SexpMap.add key uuid subs.subscription_table; + lwt () = Lwt.join [ + Meta.announce_subscription source filter sink_str name true; + Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "") + ] in + return (New sub) let delete source subs uuid = - try_lwt - let sub = StringMap.find uuid !subs in + try + let sub = StringMap.find uuid subs.uuid_table in sub.live <- false; - subs := StringMap.remove uuid !subs; + subs.uuid_table <- StringMap.remove uuid subs.uuid_table; + let key = key_from sub.sink.Node.label sub.name sub.filter in + subs.subscription_table <- SexpMap.remove key subs.subscription_table; lwt () = Meta.announce_subscription source sub.filter sub.sink.Node.label sub.name false in return (Some sub) with Not_found -> return None let lookup subs uuid = - try Some (StringMap.find uuid !subs) + try Some (StringMap.find uuid subs.uuid_table) with Not_found -> None let send_to_subscription' sub body delete_action =