(* Copyright 2012 Tony Garnock-Jones . *) (* This file is part of Hop. *) (* Hop is free software: you can redistribute it and/or modify it *) (* under the terms of the GNU General Public License as published by the *) (* Free Software Foundation, either version 3 of the License, or (at your *) (* option) any later version. *) (* Hop is distributed in the hope that it will be useful, but *) (* WITHOUT ANY WARRANTY; without even the implied warranty of *) (* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) (* General Public License for more details. *) (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) open Lwt open Sexp open Datastructures open Status type t = { name: Node.name; subscriptions: Subscription.set_t; mutable routing_table: UuidSet.t BytesMap.t; } let classname = "direct" let unsubscribe info uuid = match_lwt Subscription.delete info.name info.subscriptions (Bytes.of_string uuid) with | Some sub -> (match sub.Subscription.filter with | Str binding_key -> (try let old_set = BytesMap.find binding_key info.routing_table in let new_set = UuidSet.remove (Bytes.of_string sub.Subscription.uuid) old_set in if UuidSet.is_empty new_set then info.routing_table <- BytesMap.remove binding_key info.routing_table else info.routing_table <- BytesMap.add binding_key new_set info.routing_table with Not_found -> ()); return () | _ -> return ()) | None -> return () let route_message info n sexp = match Message.message_of_sexp sexp with | Message.Post (Str name, body, token) -> let routing_snapshot = info.routing_table in let matching = (try BytesMap.find name routing_snapshot with Not_found -> UuidSet.empty) in Lwt_list.iter_s (fun (uuid) -> match Subscription.lookup info.subscriptions uuid with | Some sub -> lwt _ = Subscription.send_to_subscription' sub body (unsubscribe info) in return () | None -> return ()) (UuidSet.elements matching) | Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) -> (match_lwt Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name with | Subscription.New sub -> let old_set = (try BytesMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in let new_set = UuidSet.add (Bytes.of_string sub.Subscription.uuid) old_set in info.routing_table <- BytesMap.add binding_key new_set info.routing_table; return () | Subscription.Old sub -> return ()) | Message.Unsubscribe (Str token) -> unsubscribe info (Bytes.to_string token) | m -> Util.message_not_understood classname m let factory arg = match arg with | (Arr [Str name_str]) -> let info = { name = Node.name_of_bytes name_str; subscriptions = Subscription.new_set (); routing_table = BytesMap.empty; } in replace_ok (Node.make_idempotent_named (Bytes.of_string classname) info.name return (route_message info)) (Str name_str) | _ -> return (Problem (litstr "bad-arg")) let init () = Factory.register_class (Bytes.of_string classname) factory