hop-2012/server/node.ml

183 lines
5.2 KiB
OCaml

(* Copyright 2012 Tony Garnock-Jones <tonygarnockjones@gmail.com>. *)
(* This file is part of Hop. *)
(* Hop is free software: you can redistribute it and/or modify it *)
(* under the terms of the GNU General Public License as published by the *)
(* Free Software Foundation, either version 3 of the License, or (at your *)
(* option) any later version. *)
(* Hop is distributed in the hope that it will be useful, but *)
(* WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *)
(* General Public License for more details. *)
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
open Lwt
open Printf
open Datastructures
open Status
type handle_message_t = t -> Sexp.t -> unit Lwt.t
and t = {
mutable names: BytesSet.t;
mutable send_counter: int;
class_name: bytes;
handle_message: handle_message_t
}
and name = {
label: bytes;
mutable binding: t option
}
module NameTable = Weak.Make(struct
type t = name
let equal a b = (a.label = b.label)
let hash a = Hashtbl.hash a.label
end)
module NameSet = Set.Make(struct
type t = name
let compare a b = Bytes.compare a.label b.label
end)
let name_table = NameTable.create 100
let directory = ref NameSet.empty
let name_of_bytes str =
let template = {label = str; binding = None} in
NameTable.merge name_table template
let caching_name_of_bytes () =
let cache = ref None in
fun str ->
match !cache with
| Some ({label = k} as n) when k = str ->
n
| _ ->
let n = name_of_bytes str in
cache := Some n;
n
let local_container_name () = (Bytes.of_string "server")
let make class_name handler = {
names = BytesSet.empty;
send_counter = 0;
class_name = class_name;
handle_message = handler
}
let lookup name = name.binding
let all_node_names () = NameSet.elements !directory
let all_node_name_bytes () = List.map (fun x -> x.label) (all_node_names ())
(* Approximate because it doesn't lock or run in a transaction *)
let approx_exists name =
match name.binding with
| Some _ -> true
| None -> false
let bind (filter, node) =
if filter.label = Bytes.empty
then (ignore (Log.warn "Binding to empty name forbidden" []); return false)
else
match filter.binding with
| None ->
filter.binding <- Some node;
directory := NameSet.add filter !directory;
node.names <- BytesSet.add filter.label node.names;
ignore (Log.info "Node bound" [Sexp.Str filter.label; Sexp.Str node.class_name]);
return true
| Some _ ->
return false
(* For use in factory constructor functions, hence the odd return type and values *)
let make_named class_name node_name handler =
let node = make class_name handler in
match_lwt bind (node_name, node) with
| true -> return (Ok node)
| false -> return (Problem (Sexp.litstr "bind-failed"))
(* For use in factory constructor functions, hence the odd return type and values *)
let make_idempotent_named class_name node_name if_new_node handler =
match lookup node_name with
| Some n ->
return (if n.class_name = class_name
then Ok n
else Problem (Sexp.litstr "class-mismatch"))
| None ->
let node = make class_name handler in
match_lwt bind (node_name, node) with
| true -> lwt () = if_new_node () in return (Ok node)
| false -> return (Problem (Sexp.litstr "bind-failed"))
let unbind name =
match lookup name with
| Some n ->
ignore (Log.info "Node unbound" [Sexp.Str name.label; Sexp.Str n.class_name]);
n.names <- BytesSet.remove name.label n.names;
name.binding <- None;
directory := NameSet.remove name !directory;
return true
| None ->
return false
let unbind_all n =
lwt () =
Lwt_list.iter_s
(fun name -> lwt _ = unbind (name_of_bytes name) in return ())
(BytesSet.elements n.names)
in
n.names <- BytesSet.empty;
return ()
let send name body =
match lookup name with
| Some n ->
lwt () =
(try_lwt n.handle_message n body
with e ->
Log.warn "Node message handler raised exception"
[Sexp.Str name.label;
Sexp.str (Printexc.to_string e)])
in
n.send_counter <- n.send_counter + 1;
lwt () =
if n.send_counter >= 1000
then (n.send_counter <- 0; Lwt_unix.yield ())
else return ()
in
return true
| None ->
return false
let send' str body = send (name_of_bytes str) body
let post name label body token =
send name (Message.post (label, body, token))
let post' str label body token = post (name_of_bytes str) label body token
let bind_ignore (filter, node) =
match_lwt bind (filter, node) with
| true -> return ()
| false -> Log.warn "Duplicate binding" [Sexp.Str filter.label]
let send_ignore name body =
match_lwt send name body with
| true -> return ()
| false ->
if name.label = Bytes.empty
then return ()
else Log.warn "send to missing node" [Sexp.Str name.label; body]
let send_ignore' str body = send_ignore (name_of_bytes str) body
let post_ignore name label body token =
send_ignore name (Message.post (label, body, token))
let post_ignore' str label body token = post_ignore (name_of_bytes str) label body token