hop-2012/server/directnode.ml

95 lines
3.3 KiB
OCaml

(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Hop. *)
(* Hop is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Hop is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Lwt
open Sexp
open Datastructures
open Status
type t = {
name: Node.name;
subscriptions: Subscription.set_t;
mutable routing_table: UuidSet.t BytesMap.t;
}
let classname = "direct"
let unsubscribe info uuid =
match_lwt Subscription.delete info.name info.subscriptions (Bytes.of_string uuid) with
| Some sub ->
(match sub.Subscription.filter with
| Str binding_key ->
(try
let old_set = BytesMap.find binding_key info.routing_table in
let new_set = UuidSet.remove (Bytes.of_string sub.Subscription.uuid) old_set in
if UuidSet.is_empty new_set
then info.routing_table <- BytesMap.remove binding_key info.routing_table
else info.routing_table <- BytesMap.add binding_key new_set info.routing_table
with Not_found ->
());
return ()
| _ -> return ())
| None -> return ()
let route_message info n sexp =
match Message.message_of_sexp sexp with
| Message.Post (Str name, body, token) ->
let routing_snapshot = info.routing_table in
let matching = (try BytesMap.find name routing_snapshot with Not_found -> UuidSet.empty) in
Lwt_list.iter_s
(fun (uuid) ->
match Subscription.lookup info.subscriptions uuid with
| Some sub ->
lwt _ = Subscription.send_to_subscription' sub body (unsubscribe info) in
return ()
| None ->
return ())
(UuidSet.elements matching)
| Message.Subscribe (Str binding_key as filter, Str sink, name, Str reply_sink, reply_name) ->
(match_lwt
Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name
with
| Subscription.New sub ->
let old_set =
(try BytesMap.find binding_key info.routing_table with Not_found -> UuidSet.empty) in
let new_set = UuidSet.add (Bytes.of_string sub.Subscription.uuid) old_set in
info.routing_table <- BytesMap.add binding_key new_set info.routing_table;
return ()
| Subscription.Old sub ->
return ())
| Message.Unsubscribe (Str token) ->
unsubscribe info (Bytes.to_string token)
| m ->
Util.message_not_understood classname m
let factory arg =
match arg with
| (Arr [Str name_str]) ->
let info = {
name = Node.name_of_bytes name_str;
subscriptions = Subscription.new_set ();
routing_table = BytesMap.empty;
} in
replace_ok
(Node.make_idempotent_named (Bytes.of_string classname) info.name return (route_message info))
(Str name_str)
| _ ->
return (Problem (litstr "bad-arg"))
let init () =
Factory.register_class (Bytes.of_string classname) factory