hop-2012/fanoutnode.ml

68 lines
2.1 KiB
OCaml
Raw Normal View History

2012-03-07 18:23:41 +00:00
(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
2012-05-01 21:36:38 +00:00
(* This file is part of Hop. *)
2012-03-07 18:23:41 +00:00
2012-05-01 21:36:38 +00:00
(* Hop is free software: you can redistribute it and/or modify it *)
2012-03-07 18:23:41 +00:00
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
2012-05-01 21:36:38 +00:00
(* Hop is distributed in the hope that it will be useful, but *)
2012-03-07 18:23:41 +00:00
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
2012-05-01 21:36:38 +00:00
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
2012-03-07 18:23:41 +00:00
open Sexp
open Datastructures
open Status
type t = {
name: Node.name;
subscriptions: Subscription.set_t;
mtx: Mutex.t;
}
let classname = "fanout"
let unsubscribe info uuid =
Util.with_mutex0 info.mtx
2012-03-06 22:05:57 +00:00
(fun () -> ignore (Subscription.delete info.name info.subscriptions uuid))
let route_message info n sexp =
match Message.message_of_sexp sexp with
| Message.Post (Str name, body, token) ->
let snapshot = !(info.subscriptions) in
StringMap.iter
(fun uuid sub ->
ignore (Subscription.send_to_subscription' sub body (unsubscribe info)))
snapshot
| Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) ->
Util.with_mutex0 info.mtx
(fun () ->
2012-03-06 22:05:57 +00:00
ignore (Subscription.create
info.name info.subscriptions filter sink name reply_sink reply_name))
| Message.Unsubscribe (Str token) ->
unsubscribe info token
| m ->
Util.message_not_understood classname m
let factory arg =
match arg with
| (Arr [Str name_str]) ->
let info = {
name = Node.name_of_string name_str;
subscriptions = Subscription.new_set ();
mtx = Mutex.create ();
} in
replace_ok
(Node.make_idempotent_named classname info.name (route_message info))
(Str name_str)
| _ ->
Problem (Str "bad-arg")
let init () =
Factory.register_class classname factory