(* Copyright 2012 Tony Garnock-Jones . *) (* This file is part of Hop. *) (* Hop is free software: you can redistribute it and/or modify it *) (* under the terms of the GNU General Public License as published by the *) (* Free Software Foundation, either version 3 of the License, or (at your *) (* option) any later version. *) (* Hop is distributed in the hope that it will be useful, but *) (* WITHOUT ANY WARRANTY; without even the implied warranty of *) (* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) (* General Public License for more details. *) (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) open Lwt open Sexp open Datastructures open Status type t = { name: Node.name; subscriptions: Subscription.set_t; } let classname = "fanout" let unsubscribe info uuid = lwt _ = Subscription.delete info.name info.subscriptions (Bytes.of_string uuid) in return () let route_message info n sexp = match Message.message_of_sexp sexp with | Message.Post (Str name, body, token) -> let snapshot = info.subscriptions.Subscription.uuid_table in Lwt_list.iter_s (fun (uuid, sub) -> lwt _ = Subscription.send_to_subscription' sub body (unsubscribe info) in return ()) (BytesMap.bindings snapshot) | Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) -> lwt _ = (Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name) in return () | Message.Unsubscribe (Str token) -> unsubscribe info (Bytes.to_string token) | m -> Util.message_not_understood classname m let factory arg = match arg with | (Arr [Str name_str]) -> let info = { name = Node.name_of_bytes name_str; subscriptions = Subscription.new_set () } in replace_ok (Node.make_idempotent_named (Bytes.of_string classname) info.name return (route_message info)) (Str name_str) | _ -> return (Problem (litstr "bad-arg")) let init () = Factory.register_class (Bytes.of_string classname) factory