(* Copyright 2012 Tony Garnock-Jones . *) (* This file is part of Hop. *) (* Hop is free software: you can redistribute it and/or modify it *) (* under the terms of the GNU General Public License as published by the *) (* Free Software Foundation, either version 3 of the License, or (at your *) (* option) any later version. *) (* Hop is distributed in the hope that it will be useful, but *) (* WITHOUT ANY WARRANTY; without even the implied warranty of *) (* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *) (* General Public License for more details. *) (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) open Lwt open Printf open Datastructures open Status type handle_message_t = t -> Sexp.t -> unit Lwt.t and t = { mutable names: BytesSet.t; mutable send_counter: int; class_name: bytes; handle_message: handle_message_t } and name = { label: bytes; mutable binding: t option } module NameTable = Weak.Make(struct type t = name let equal a b = (a.label = b.label) let hash a = Hashtbl.hash a.label end) module NameSet = Set.Make(struct type t = name let compare a b = Bytes.compare a.label b.label end) let name_table = NameTable.create 100 let directory = ref NameSet.empty let name_of_bytes str = let template = {label = str; binding = None} in NameTable.merge name_table template let caching_name_of_bytes () = let cache = ref None in fun str -> match !cache with | Some ({label = k} as n) when k = str -> n | _ -> let n = name_of_bytes str in cache := Some n; n let local_container_name () = (Bytes.of_string "server") let make class_name handler = { names = BytesSet.empty; send_counter = 0; class_name = class_name; handle_message = handler } let lookup name = name.binding let all_node_names () = NameSet.elements !directory let all_node_name_bytes () = List.map (fun x -> x.label) (all_node_names ()) (* Approximate because it doesn't lock or run in a transaction *) let approx_exists name = match name.binding with | Some _ -> true | None -> false let bind (filter, node) = if filter.label = Bytes.empty then (ignore (Log.warn "Binding to empty name forbidden" []); return false) else match filter.binding with | None -> filter.binding <- Some node; directory := NameSet.add filter !directory; node.names <- BytesSet.add filter.label node.names; ignore (Log.info "Node bound" [Sexp.Str filter.label; Sexp.Str node.class_name]); return true | Some _ -> return false (* For use in factory constructor functions, hence the odd return type and values *) let make_named class_name node_name handler = let node = make class_name handler in match_lwt bind (node_name, node) with | true -> return (Ok node) | false -> return (Problem (Sexp.litstr "bind-failed")) (* For use in factory constructor functions, hence the odd return type and values *) let make_idempotent_named class_name node_name if_new_node handler = match lookup node_name with | Some n -> return (if n.class_name = class_name then Ok n else Problem (Sexp.litstr "class-mismatch")) | None -> let node = make class_name handler in match_lwt bind (node_name, node) with | true -> lwt () = if_new_node () in return (Ok node) | false -> return (Problem (Sexp.litstr "bind-failed")) let unbind name = match lookup name with | Some n -> ignore (Log.info "Node unbound" [Sexp.Str name.label; Sexp.Str n.class_name]); n.names <- BytesSet.remove name.label n.names; name.binding <- None; directory := NameSet.remove name !directory; return true | None -> return false let unbind_all n = lwt () = Lwt_list.iter_s (fun name -> lwt _ = unbind (name_of_bytes name) in return ()) (BytesSet.elements n.names) in n.names <- BytesSet.empty; return () let send name body = match lookup name with | Some n -> lwt () = (try_lwt n.handle_message n body with e -> Log.warn "Node message handler raised exception" [Sexp.Str name.label; Sexp.str (Printexc.to_string e)]) in n.send_counter <- n.send_counter + 1; lwt () = if n.send_counter >= 1000 then (n.send_counter <- 0; Lwt_unix.yield ()) else return () in return true | None -> return false let send' str body = send (name_of_bytes str) body let post name label body token = send name (Message.post (label, body, token)) let post' str label body token = post (name_of_bytes str) label body token let bind_ignore (filter, node) = match_lwt bind (filter, node) with | true -> return () | false -> Log.warn "Duplicate binding" [Sexp.Str filter.label] let send_ignore name body = match_lwt send name body with | true -> return () | false -> if name.label = Bytes.empty then return () else Log.warn "send to missing node" [Sexp.Str name.label; body] let send_ignore' str body = send_ignore (name_of_bytes str) body let post_ignore name label body token = send_ignore name (Message.post (label, body, token)) let post_ignore' str label body token = post_ignore (name_of_bytes str) label body token