Dedup subscriptions
This commit is contained in:
parent
18f09b324b
commit
e094c17f73
|
@ -17,6 +17,7 @@
|
|||
|
||||
module StringSet = Set.Make(String)
|
||||
module StringMap = Map.Make(String)
|
||||
module SexpMap = Map.Make(Sexp)
|
||||
module UuidSet = StringSet
|
||||
|
||||
let string_map_keys m = StringMap.fold (fun k _ acc -> k :: acc) m []
|
||||
|
|
|
@ -60,14 +60,17 @@ let route_message info n sexp =
|
|||
return ())
|
||||
(UuidSet.elements matching)
|
||||
| Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) ->
|
||||
lwt sub =
|
||||
Subscription.create
|
||||
info.name info.subscriptions filter sink name reply_sink reply_name in
|
||||
let old_set =
|
||||
(try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in
|
||||
let new_set = UuidSet.add sub.Subscription.uuid old_set in
|
||||
info.routing_table <- StringMap.add binding_key new_set info.routing_table;
|
||||
return ()
|
||||
(match_lwt
|
||||
Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name
|
||||
with
|
||||
| Subscription.New sub ->
|
||||
let old_set =
|
||||
(try StringMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in
|
||||
let new_set = UuidSet.add sub.Subscription.uuid old_set in
|
||||
info.routing_table <- StringMap.add binding_key new_set info.routing_table;
|
||||
return ()
|
||||
| Subscription.Old sub ->
|
||||
return ())
|
||||
| Message.Unsubscribe (Str token) ->
|
||||
unsubscribe info token
|
||||
| m ->
|
||||
|
|
|
@ -33,7 +33,7 @@ let unsubscribe info uuid =
|
|||
let route_message info n sexp =
|
||||
match Message.message_of_sexp sexp with
|
||||
| Message.Post (Str name, body, token) ->
|
||||
let snapshot = !(info.subscriptions) in
|
||||
let snapshot = info.subscriptions.Subscription.uuid_table in
|
||||
Lwt_list.iter_s
|
||||
(fun (uuid, sub) ->
|
||||
lwt _ = Subscription.send_to_subscription' sub body (unsubscribe info) in return ())
|
||||
|
|
15
queuenode.ml
15
queuenode.ml
|
@ -70,12 +70,15 @@ let queue_handler info n sexp =
|
|||
info.backlog_out (Some body);
|
||||
return ()
|
||||
| Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) ->
|
||||
lwt sub =
|
||||
Subscription.create
|
||||
info.name info.subscriptions filter sink name reply_sink reply_name in
|
||||
info.waiters <- info.waiters + 1;
|
||||
info.waiters_out (Some sub);
|
||||
return ()
|
||||
(match_lwt
|
||||
Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name
|
||||
with
|
||||
| Subscription.New sub ->
|
||||
info.waiters <- info.waiters + 1;
|
||||
info.waiters_out (Some sub);
|
||||
return ()
|
||||
| Subscription.Old sub ->
|
||||
return ())
|
||||
| Message.Unsubscribe (Str token) ->
|
||||
ignore (Subscription.delete info.name info.subscriptions token);
|
||||
info.waiters <- info.waiters - 1;
|
||||
|
|
2
sexp.ml
2
sexp.ml
|
@ -28,6 +28,8 @@ and t =
|
|||
| Hint of display_hint_t
|
||||
| Arr of t list
|
||||
|
||||
let compare a b = Pervasives.compare a b
|
||||
|
||||
let generic_output_sexp write x =
|
||||
let rec walk x =
|
||||
match x with
|
||||
|
|
|
@ -26,41 +26,64 @@ type t = {
|
|||
name: Sexp.t
|
||||
}
|
||||
|
||||
type set_t = t StringMap.t ref
|
||||
type creation_t =
|
||||
| Old of t
|
||||
| New of t
|
||||
|
||||
let new_set () = ref StringMap.empty
|
||||
type set_t = {
|
||||
mutable subscription_table: Uuid.t SexpMap.t;
|
||||
mutable uuid_table: t StringMap.t
|
||||
}
|
||||
|
||||
let count subs = StringMap.cardinal !subs
|
||||
let new_set () = {
|
||||
subscription_table = SexpMap.empty;
|
||||
uuid_table = StringMap.empty
|
||||
}
|
||||
|
||||
let count subs = SexpMap.cardinal subs.subscription_table
|
||||
|
||||
let key_from sink_str name filter = Sexp.Arr [Sexp.Str sink_str; name; filter]
|
||||
|
||||
let create source subs filter sink_str name reply_sink reply_name =
|
||||
let uuid = Uuid.create () in
|
||||
let sink = Node.name_of_string sink_str in
|
||||
let sub = {
|
||||
live = true;
|
||||
uuid = uuid;
|
||||
filter = filter;
|
||||
sink = sink;
|
||||
name = name
|
||||
} in
|
||||
subs := StringMap.add uuid sub !subs;
|
||||
lwt () = Lwt.join [
|
||||
Meta.announce_subscription source filter sink_str name true;
|
||||
Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "")
|
||||
] in
|
||||
return sub
|
||||
let key = key_from sink_str name filter in
|
||||
try
|
||||
let uuid = SexpMap.find key subs.subscription_table in
|
||||
lwt () =
|
||||
Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "")
|
||||
in
|
||||
return (Old (StringMap.find uuid subs.uuid_table))
|
||||
with Not_found ->
|
||||
let uuid = Uuid.create () in
|
||||
let sink = Node.name_of_string sink_str in
|
||||
let sub = {
|
||||
live = true;
|
||||
uuid = uuid;
|
||||
filter = filter;
|
||||
sink = sink;
|
||||
name = name
|
||||
} in
|
||||
subs.uuid_table <- StringMap.add uuid sub subs.uuid_table;
|
||||
subs.subscription_table <- SexpMap.add key uuid subs.subscription_table;
|
||||
lwt () = Lwt.join [
|
||||
Meta.announce_subscription source filter sink_str name true;
|
||||
Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "")
|
||||
] in
|
||||
return (New sub)
|
||||
|
||||
let delete source subs uuid =
|
||||
try_lwt
|
||||
let sub = StringMap.find uuid !subs in
|
||||
try
|
||||
let sub = StringMap.find uuid subs.uuid_table in
|
||||
sub.live <- false;
|
||||
subs := StringMap.remove uuid !subs;
|
||||
subs.uuid_table <- StringMap.remove uuid subs.uuid_table;
|
||||
let key = key_from sub.sink.Node.label sub.name sub.filter in
|
||||
subs.subscription_table <- SexpMap.remove key subs.subscription_table;
|
||||
lwt () = Meta.announce_subscription source sub.filter sub.sink.Node.label sub.name false in
|
||||
return (Some sub)
|
||||
with Not_found ->
|
||||
return None
|
||||
|
||||
let lookup subs uuid =
|
||||
try Some (StringMap.find uuid !subs)
|
||||
try Some (StringMap.find uuid subs.uuid_table)
|
||||
with Not_found -> None
|
||||
|
||||
let send_to_subscription' sub body delete_action =
|
||||
|
|
Loading…
Reference in New Issue