diff --git a/amqp_relay.ml b/amqp_relay.ml index a74c295..63f7567 100644 --- a/amqp_relay.ml +++ b/amqp_relay.ml @@ -26,12 +26,12 @@ type connection_t = { mtx: Mutex.t; cin: in_channel; cout: out_channel; - name: Uuid.t; + name: Node.name; mutable input_buf: string; mutable output_buf: Buffer.t; mutable frame_max: int; mutable connection_closed: bool; - mutable recent_queue_name: string option; + mutable recent_queue_name: Node.name option; mutable delivery_tag: int } @@ -43,7 +43,7 @@ let amqp_boot (peername, mtx, cin, cout) = { mtx = mtx; cin = cin; cout = cout; - name = Uuid.create (); + name = Node.name_of_string (Uuid.create ()); input_buf = String.create initial_frame_size; output_buf = Buffer.create initial_frame_size; frame_max = initial_frame_size; @@ -251,7 +251,7 @@ let get_recent_queue_name conn = let expand_mrdq conn queue = match queue with | "" -> get_recent_queue_name conn - | other -> other + | other -> Node.name_of_string other let handle_method conn channel m = if channel > 1 then die channel_error "Unsupported channel number" else (); @@ -269,27 +269,27 @@ let handle_method conn channel m = | Channel_close_ok -> () | Exchange_declare (exchange, type_, passive, durable, no_wait, arguments) -> - Node.send_ignore "factory" (Message.create (Sexp.Str type_, - Sexp.Arr [Sexp.Str exchange], - Sexp.Str conn.name, - Sexp.Str "Exchange_declare_reply")) + Node.send_ignore' "factory" (Message.create (Sexp.Str type_, + Sexp.Arr [Sexp.Str exchange], + Sexp.Str conn.name.Node.label, + Sexp.Str "Exchange_declare_reply")) | Queue_declare (queue, passive, durable, exclusive, auto_delete, no_wait, arguments) -> let queue = (if queue = "" then Uuid.create () else queue) in - conn.recent_queue_name <- Some queue; - Node.send_ignore "factory" (Message.create (Sexp.Str "queue", - Sexp.Arr [Sexp.Str queue], - Sexp.Str conn.name, - Sexp.Str "Queue_declare_reply")) + conn.recent_queue_name <- Some (Node.name_of_string queue); + Node.send_ignore' "factory" (Message.create (Sexp.Str "queue", + Sexp.Arr [Sexp.Str queue], + Sexp.Str conn.name.Node.label, + Sexp.Str "Queue_declare_reply")) | Queue_bind (queue, exchange, routing_key, no_wait, arguments) -> let queue = expand_mrdq conn queue in if not (Node.approx_exists queue) - then send_warning conn not_found ("Queue "^queue^" not found") + then send_warning conn not_found ("Queue "^queue.Node.label^" not found") else - if Node.send exchange (Message.subscribe (Sexp.Str routing_key, - Sexp.Str queue, - Sexp.Str "", - Sexp.Str conn.name, - Sexp.Str "Queue_bind_reply")) + if Node.send' exchange (Message.subscribe (Sexp.Str routing_key, + Sexp.Str queue.Node.label, + Sexp.Str "", + Sexp.Str conn.name.Node.label, + Sexp.Str "Queue_bind_reply")) then () else send_warning conn not_found ("Exchange "^exchange^" not found") | Basic_consume (queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) -> @@ -297,16 +297,16 @@ let handle_method conn channel m = let consumer_tag = (if consumer_tag = "" then Uuid.create () else consumer_tag) in if Node.send queue (Message.subscribe (Sexp.Str "", - Sexp.Str conn.name, + Sexp.Str conn.name.Node.label, Sexp.Arr [Sexp.Str "delivery"; Sexp.Str consumer_tag], - Sexp.Str conn.name, + Sexp.Str conn.name.Node.label, Sexp.Arr [Sexp.Str "Basic_consume_reply"; Sexp.Str consumer_tag])) then () - else send_warning conn not_found ("Queue "^queue^" not found") + else send_warning conn not_found ("Queue "^queue.Node.label^" not found") | Basic_publish (exchange, routing_key, false, false) -> let (_, (body_size, properties)) = next_header conn in let body = recv_content_body conn body_size in - if Node.post exchange + if Node.post' exchange (Sexp.Str routing_key) (Sexp.Hint {Sexp.hint = Sexp.Str "amqp"; Sexp.body = Sexp.Arr [Sexp.Str exchange; @@ -400,11 +400,11 @@ let start (s, peername) = amqp_boot amqp_handler amqp_mainloop (s, peername) let init () = - Node.send_ignore "factory" (Message.create (Sexp.Str "direct", - Sexp.Arr [Sexp.Str "amq.direct"], - Sexp.Str "", Sexp.Str "")); - Node.send_ignore "factory" (Message.create (Sexp.Str "fanout", - Sexp.Arr [Sexp.Str "amq.fanout"], - Sexp.Str "", Sexp.Str "")); + Node.send_ignore' "factory" (Message.create (Sexp.Str "direct", + Sexp.Arr [Sexp.Str "amq.direct"], + Sexp.Str "", Sexp.Str "")); + Node.send_ignore' "factory" (Message.create (Sexp.Str "fanout", + Sexp.Arr [Sexp.Str "amq.fanout"], + Sexp.Str "", Sexp.Str "")); ignore (Util.create_daemon_thread "AMQP listener" None (Net.start_net "AMQP" Amqp_spec.port) start) diff --git a/directnode.ml b/directnode.ml index 9effdba..d34f514 100644 --- a/directnode.ml +++ b/directnode.ml @@ -20,7 +20,7 @@ open Datastructures open Status type t = { - name: string; + name: Node.name; subscriptions: Subscription.set_t; mtx: Mutex.t; mutable routing_table: UuidSet.t StringMap.t; @@ -76,14 +76,16 @@ let route_message info n sexp = let factory arg = match arg with - | (Arr [Str name]) -> + | (Arr [Str name_str]) -> let info = { - name = name; + name = Node.name_of_string name_str; subscriptions = Subscription.new_set (); mtx = Mutex.create (); routing_table = StringMap.empty; } in - replace_ok (Node.make_idempotent_named classname name (route_message info)) (Str name) + replace_ok + (Node.make_idempotent_named classname info.name (route_message info)) + (Str name_str) | _ -> Problem (Str "bad-arg") diff --git a/factory.ml b/factory.ml index 9c551ab..5afbd33 100644 --- a/factory.ml +++ b/factory.ml @@ -59,9 +59,9 @@ let factory_handler n sexp = Log.warn "Node class not found" [Str classname]; Message.create_failed (Arr [Str "factory"; Str "class-not-found"]) in - Node.post_ignore reply_sink (Str reply_name) reply (Str "") + Node.post_ignore' reply_sink (Str reply_name) reply (Str "") | m -> Util.message_not_understood "factory" m let init () = - Node.bind_ignore ("factory", Node.make "factory" factory_handler) + Node.bind_ignore (Node.name_of_string "factory", Node.make "factory" factory_handler) diff --git a/fanoutnode.ml b/fanoutnode.ml index ad5417c..b25abdd 100644 --- a/fanoutnode.ml +++ b/fanoutnode.ml @@ -20,7 +20,7 @@ open Datastructures open Status type t = { - name: string; + name: Node.name; subscriptions: Subscription.set_t; mtx: Mutex.t; } @@ -51,13 +51,15 @@ let route_message info n sexp = let factory arg = match arg with - | (Arr [Str name]) -> + | (Arr [Str name_str]) -> let info = { - name = name; + name = Node.name_of_string name_str; subscriptions = Subscription.new_set (); mtx = Mutex.create (); } in - replace_ok (Node.make_idempotent_named classname name (route_message info)) (Str name) + replace_ok + (Node.make_idempotent_named classname info.name (route_message info)) + (Str name_str) | _ -> Problem (Str "bad-arg") diff --git a/hop_server.ml b/hop_server.ml index 3dc3783..7a50779 100644 --- a/hop_server.ml +++ b/hop_server.ml @@ -15,10 +15,12 @@ (* You should have received a copy of the GNU General Public License *) (* along with Hop. If not, see . *) +let n_system_log = Node.name_of_string "system.log" + let hook_log () = let old_hook = !Log.hook in let new_hook label body = - ignore (Node.post "system.log" (Sexp.Str label) body (Sexp.Str "")); + ignore (Node.post n_system_log (Sexp.Str label) body (Sexp.Str "")); old_hook label body in Log.hook := new_hook diff --git a/meta.ml b/meta.ml index 602ea49..521ea84 100644 --- a/meta.ml +++ b/meta.ml @@ -17,12 +17,14 @@ open Sexp +let n_meta = Node.name_of_string "meta" + let announce_subscription source filter sink name on_off = - Node.post_ignore "meta" (Str source) + Node.post_ignore n_meta (Str source.Node.label) (if on_off - then Message.subscribed (Str source, filter, Str sink, name) - else Message.unsubscribed (Str source, filter, Str sink, name)) + then Message.subscribed (Str source.Node.label, filter, Str sink, name) + else Message.unsubscribed (Str source.Node.label, filter, Str sink, name)) (Str "") let init () = - Node.send_ignore "factory" (Message.create (Str "direct", Arr [Str "meta"], Str "", Str "")) + Node.send_ignore' "factory" (Message.create (Str "direct", Arr [Str "meta"], Str "", Str "")) diff --git a/node.ml b/node.ml index 2a67b81..e62f7b5 100644 --- a/node.ml +++ b/node.ml @@ -25,9 +25,29 @@ and t = { class_name: string; handle_message: handle_message_t } +and name = { + label: string; + mutable binding: t option + } + +module NameTable = Weak.Make(struct + type t = name + let equal a b = (a.label = b.label) + let hash a = Hashtbl.hash a.label +end) +module NameSet = Set.Make(struct + type t = name + let compare a b = String.compare a.label b.label +end) let mutex = Mutex.create () -let directory = ref StringMap.empty +let name_table = NameTable.create 100 +let directory = ref NameSet.empty + +let name_of_string str = + Util.with_mutex0 mutex (fun () -> + let template = {label = str; binding = None} in + NameTable.merge name_table template) let local_container_name () = "server" @@ -37,27 +57,27 @@ let make class_name handler = { handle_message = handler } -let lookup name = - try Some (StringMap.find name !directory) - with Not_found -> None +let lookup name = name.binding -let all_node_names () = string_map_keys !directory +let all_node_names () = NameSet.elements !directory +let all_node_name_strings () = List.map (fun x -> x.label) (all_node_names ()) (* Approximate because it doesn't lock or run in a transaction *) -let approx_exists name = StringMap.mem name !directory +let approx_exists name = + match name.binding with + | Some _ -> true + | None -> false let bind (filter, node) = - if filter = "" + if filter.label = "" then (Log.warn "Binding to empty name forbidden" []; false) else - Util.with_mutex0 mutex - (fun () -> - if StringMap.mem filter !directory - then false - else (directory := StringMap.add filter node !directory; - node.names <- StringSet.add filter node.names; - Log.info "Node bound" [Sexp.Str filter; Sexp.Str node.class_name]; - true)) + Util.with_mutex0 mutex (fun () -> + filter.binding <- Some node; + directory := NameSet.add filter !directory; + node.names <- StringSet.add filter.label node.names; + Log.info "Node bound" [Sexp.Str filter.label; Sexp.Str node.class_name]; + true) (* For use in factory constructor functions, hence the odd return type and values *) let make_named class_name node_name handler = @@ -76,19 +96,19 @@ let make_idempotent_named class_name node_name handler = if bind (node_name, node) then Ok node else Problem (Sexp.Str "bind-failed") let unbind name = - Util.with_mutex0 mutex - (fun () -> - match lookup name with - | Some n -> - Log.info "Node unbound" [Sexp.Str name; Sexp.Str n.class_name]; - n.names <- StringSet.remove name n.names; - directory := StringMap.remove name !directory; - true - | None -> - false) + Util.with_mutex0 mutex (fun () -> + match lookup name with + | Some n -> + Log.info "Node unbound" [Sexp.Str name.label; Sexp.Str n.class_name]; + n.names <- StringSet.remove name.label n.names; + name.binding <- None; + directory := NameSet.remove name !directory; + true + | None -> + false) let unbind_all n = - StringSet.iter (fun name -> ignore (unbind name)) n.names; + StringSet.iter (fun name -> ignore (unbind (name_of_string name))) n.names; n.names <- StringSet.empty let send name body = @@ -97,23 +117,31 @@ let send name body = (try n.handle_message n body with e -> Log.warn "Node message handler raised exception" - [Sexp.Str name; + [Sexp.Str name.label; Sexp.Str (Printexc.to_string e)]); true | None -> false +let send' str body = send (name_of_string str) body + let post name label body token = send name (Message.post (label, body, token)) +let post' str label body token = post (name_of_string str) label body token + let bind_ignore (filter, node) = if bind (filter, node) then () - else Log.warn "Duplicate binding" [Sexp.Str filter] + else Log.warn "Duplicate binding" [Sexp.Str filter.label] let send_ignore name body = - if send name body || name = "" + if send name body || name.label = "" then () - else Log.warn "send to missing node" [Sexp.Str name; body] + else Log.warn "send to missing node" [Sexp.Str name.label; body] + +let send_ignore' str body = send_ignore (name_of_string str) body let post_ignore name label body token = send_ignore name (Message.post (label, body, token)) + +let post_ignore' str label body token = post_ignore (name_of_string str) label body token diff --git a/queuenode.ml b/queuenode.ml index 4a597ba..6206892 100644 --- a/queuenode.ml +++ b/queuenode.ml @@ -19,7 +19,7 @@ open Sexp open Status type t = { - name: string; + name: Node.name; subscriptions: Subscription.set_t; ch: Message.t Squeue.t; mutable backlog: Sexp.t Queue.t; @@ -83,17 +83,17 @@ let shoveller info = let queue_factory arg = match arg with - | (Arr [Str name]) -> + | (Arr [Str name_str]) -> let info = { - name = name; + name = Node.name_of_string name_str; subscriptions = Subscription.new_set (); ch = Squeue.create 1000; backlog = Queue.create (); waiters = Queue.create () } in - ignore (Util.create_thread name None shoveller info); + ignore (Util.create_thread name_str None shoveller info); let queue_handler n sexp = Squeue.add (Message.message_of_sexp sexp) info.ch in - replace_ok (Node.make_idempotent_named classname name queue_handler) (Str name) + replace_ok (Node.make_idempotent_named classname info.name queue_handler) (Str name_str) | _ -> Problem (Str "bad-arg") diff --git a/relay.ml b/relay.ml index 609afe0..4e87d9c 100644 --- a/relay.ml +++ b/relay.ml @@ -31,17 +31,17 @@ let send_sexp_syntax_error ch explanation = let dispatch_message n ch m = match m with | Message.Post (Str name, body, token) -> - Node.send_ignore name body + Node.send_ignore' name body | Message.Subscribe (Str filter, sink, name, Str reply_sink, Str reply_name) -> - if Node.bind(filter, n) - then Node.post_ignore + if Node.bind (Node.name_of_string filter, n) + then Node.post_ignore' reply_sink (Str reply_name) (Message.subscribe_ok (Str filter)) (Str "") else Log.warn "Bind failed" [Str filter] | Message.Unsubscribe (Str token) -> - if Node.unbind token + if Node.unbind (Node.name_of_string token) then () else Log.warn "Unbind failed" [Str token] | _ -> diff --git a/subscription.ml b/subscription.ml index ab95b1c..c590e86 100644 --- a/subscription.ml +++ b/subscription.ml @@ -21,7 +21,7 @@ type t = { mutable live: bool; uuid: Uuid.t; filter: Sexp.t; - sink: string; + sink: Node.name; name: Sexp.t } @@ -31,8 +31,9 @@ let new_set () = ref StringMap.empty let count subs = StringMap.cardinal !subs -let create source subs filter sink name reply_sink reply_name = +let create source subs filter sink_str name reply_sink reply_name = let uuid = Uuid.create () in + let sink = Node.name_of_string sink_str in let sub = { live = true; uuid = uuid; @@ -41,8 +42,8 @@ let create source subs filter sink name reply_sink reply_name = name = name } in subs := StringMap.add uuid sub !subs; - Meta.announce_subscription source filter sink name true; - Node.post_ignore reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str ""); + Meta.announce_subscription source filter sink_str name true; + Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str ""); sub let delete source subs uuid = @@ -50,7 +51,7 @@ let delete source subs uuid = let sub = StringMap.find uuid !subs in sub.live <- false; subs := StringMap.remove uuid !subs; - Meta.announce_subscription source sub.filter sub.sink sub.name false; + Meta.announce_subscription source sub.filter sub.sink.Node.label sub.name false; Some sub with Not_found -> None diff --git a/ui_main.ml b/ui_main.ml index e2fc772..280c20d 100644 --- a/ui_main.ml +++ b/ui_main.ml @@ -59,7 +59,7 @@ let cleanup_req id exn = | None -> () let start (s, peername) = - let id = "http-" ^ Uuid.create () in + let id = Node.name_of_string ("http-" ^ Uuid.create ()) in Util.create_thread (Connections.endpoint_name peername ^ " HTTP service") (Some (cleanup_req id)) (Httpd.main (handle_req id)) @@ -75,11 +75,11 @@ let api_server_stats _ id r = |> Httpd.add_date_header let api_nodes _ id r = - Json.resp_ok [] (Json.Rec ["nodes", Json.Arr (List.map Json.str (Node.all_node_names ()))]) + Json.resp_ok [] (Json.Rec ["nodes", Json.Arr (List.map Json.str (Node.all_node_name_strings ()))]) |> Httpd.add_date_header let api_node_info suffix id r = - (match Node.lookup suffix with + (match Node.lookup (Node.name_of_string suffix) with | Some n -> Json.resp_ok [] (Json.Rec ["names", Json.Arr (List.map Json.str (StringSet.elements n.Node.names)); diff --git a/ui_relay.ml b/ui_relay.ml index f93875b..84cbecb 100644 --- a/ui_relay.ml +++ b/ui_relay.ml @@ -42,7 +42,8 @@ let rec api_tap_source id r = if not (Node.bind (id, n)) then Httpd.http_error_html 500 "Internal ID collision" [] else - let id_block_and_padding = Stringstream.const_flush (id ^ ";" ^ String.make 2048 'h' ^ ";") in + let id_block_and_padding = + Stringstream.const_flush (id.Node.label ^ ";" ^ String.make 2048 'h' ^ ";") in handle_message n (Message.subscribe (Sexp.Str (Node.local_container_name()), Sexp.Str "", Sexp.Str "", Sexp.Str "", Sexp.Str "")); @@ -72,7 +73,7 @@ let api_tap_sink irrelevant_id r = with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in (match Message.message_of_sexp data with | Message.Post (Sexp.Str name, body, token) -> - Node.send_ignore name body; + Node.send_ignore' name body; Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content) | _ -> Httpd.http_error_html 406 "Message not understood" [])