Intern node names to permit faster routing.

This commit is contained in:
Tony Garnock-Jones 2012-05-05 11:46:48 -04:00
parent 2f80a54ffe
commit 51cbec0565
12 changed files with 131 additions and 93 deletions

View File

@ -26,12 +26,12 @@ type connection_t = {
mtx: Mutex.t;
cin: in_channel;
cout: out_channel;
name: Uuid.t;
name: Node.name;
mutable input_buf: string;
mutable output_buf: Buffer.t;
mutable frame_max: int;
mutable connection_closed: bool;
mutable recent_queue_name: string option;
mutable recent_queue_name: Node.name option;
mutable delivery_tag: int
}
@ -43,7 +43,7 @@ let amqp_boot (peername, mtx, cin, cout) = {
mtx = mtx;
cin = cin;
cout = cout;
name = Uuid.create ();
name = Node.name_of_string (Uuid.create ());
input_buf = String.create initial_frame_size;
output_buf = Buffer.create initial_frame_size;
frame_max = initial_frame_size;
@ -251,7 +251,7 @@ let get_recent_queue_name conn =
let expand_mrdq conn queue =
match queue with
| "" -> get_recent_queue_name conn
| other -> other
| other -> Node.name_of_string other
let handle_method conn channel m =
if channel > 1 then die channel_error "Unsupported channel number" else ();
@ -269,27 +269,27 @@ let handle_method conn channel m =
| Channel_close_ok ->
()
| Exchange_declare (exchange, type_, passive, durable, no_wait, arguments) ->
Node.send_ignore "factory" (Message.create (Sexp.Str type_,
Sexp.Arr [Sexp.Str exchange],
Sexp.Str conn.name,
Sexp.Str "Exchange_declare_reply"))
Node.send_ignore' "factory" (Message.create (Sexp.Str type_,
Sexp.Arr [Sexp.Str exchange],
Sexp.Str conn.name.Node.label,
Sexp.Str "Exchange_declare_reply"))
| Queue_declare (queue, passive, durable, exclusive, auto_delete, no_wait, arguments) ->
let queue = (if queue = "" then Uuid.create () else queue) in
conn.recent_queue_name <- Some queue;
Node.send_ignore "factory" (Message.create (Sexp.Str "queue",
Sexp.Arr [Sexp.Str queue],
Sexp.Str conn.name,
Sexp.Str "Queue_declare_reply"))
conn.recent_queue_name <- Some (Node.name_of_string queue);
Node.send_ignore' "factory" (Message.create (Sexp.Str "queue",
Sexp.Arr [Sexp.Str queue],
Sexp.Str conn.name.Node.label,
Sexp.Str "Queue_declare_reply"))
| Queue_bind (queue, exchange, routing_key, no_wait, arguments) ->
let queue = expand_mrdq conn queue in
if not (Node.approx_exists queue)
then send_warning conn not_found ("Queue "^queue^" not found")
then send_warning conn not_found ("Queue "^queue.Node.label^" not found")
else
if Node.send exchange (Message.subscribe (Sexp.Str routing_key,
Sexp.Str queue,
Sexp.Str "",
Sexp.Str conn.name,
Sexp.Str "Queue_bind_reply"))
if Node.send' exchange (Message.subscribe (Sexp.Str routing_key,
Sexp.Str queue.Node.label,
Sexp.Str "",
Sexp.Str conn.name.Node.label,
Sexp.Str "Queue_bind_reply"))
then ()
else send_warning conn not_found ("Exchange "^exchange^" not found")
| Basic_consume (queue, consumer_tag, no_local, no_ack, exclusive, no_wait, arguments) ->
@ -297,16 +297,16 @@ let handle_method conn channel m =
let consumer_tag = (if consumer_tag = "" then Uuid.create () else consumer_tag) in
if Node.send queue (Message.subscribe
(Sexp.Str "",
Sexp.Str conn.name,
Sexp.Str conn.name.Node.label,
Sexp.Arr [Sexp.Str "delivery"; Sexp.Str consumer_tag],
Sexp.Str conn.name,
Sexp.Str conn.name.Node.label,
Sexp.Arr [Sexp.Str "Basic_consume_reply"; Sexp.Str consumer_tag]))
then ()
else send_warning conn not_found ("Queue "^queue^" not found")
else send_warning conn not_found ("Queue "^queue.Node.label^" not found")
| Basic_publish (exchange, routing_key, false, false) ->
let (_, (body_size, properties)) = next_header conn in
let body = recv_content_body conn body_size in
if Node.post exchange
if Node.post' exchange
(Sexp.Str routing_key)
(Sexp.Hint {Sexp.hint = Sexp.Str "amqp";
Sexp.body = Sexp.Arr [Sexp.Str exchange;
@ -400,11 +400,11 @@ let start (s, peername) =
amqp_boot amqp_handler amqp_mainloop (s, peername)
let init () =
Node.send_ignore "factory" (Message.create (Sexp.Str "direct",
Sexp.Arr [Sexp.Str "amq.direct"],
Sexp.Str "", Sexp.Str ""));
Node.send_ignore "factory" (Message.create (Sexp.Str "fanout",
Sexp.Arr [Sexp.Str "amq.fanout"],
Sexp.Str "", Sexp.Str ""));
Node.send_ignore' "factory" (Message.create (Sexp.Str "direct",
Sexp.Arr [Sexp.Str "amq.direct"],
Sexp.Str "", Sexp.Str ""));
Node.send_ignore' "factory" (Message.create (Sexp.Str "fanout",
Sexp.Arr [Sexp.Str "amq.fanout"],
Sexp.Str "", Sexp.Str ""));
ignore (Util.create_daemon_thread
"AMQP listener" None (Net.start_net "AMQP" Amqp_spec.port) start)

View File

@ -20,7 +20,7 @@ open Datastructures
open Status
type t = {
name: string;
name: Node.name;
subscriptions: Subscription.set_t;
mtx: Mutex.t;
mutable routing_table: UuidSet.t StringMap.t;
@ -76,14 +76,16 @@ let route_message info n sexp =
let factory arg =
match arg with
| (Arr [Str name]) ->
| (Arr [Str name_str]) ->
let info = {
name = name;
name = Node.name_of_string name_str;
subscriptions = Subscription.new_set ();
mtx = Mutex.create ();
routing_table = StringMap.empty;
} in
replace_ok (Node.make_idempotent_named classname name (route_message info)) (Str name)
replace_ok
(Node.make_idempotent_named classname info.name (route_message info))
(Str name_str)
| _ ->
Problem (Str "bad-arg")

View File

@ -59,9 +59,9 @@ let factory_handler n sexp =
Log.warn "Node class not found" [Str classname];
Message.create_failed (Arr [Str "factory"; Str "class-not-found"])
in
Node.post_ignore reply_sink (Str reply_name) reply (Str "")
Node.post_ignore' reply_sink (Str reply_name) reply (Str "")
| m ->
Util.message_not_understood "factory" m
let init () =
Node.bind_ignore ("factory", Node.make "factory" factory_handler)
Node.bind_ignore (Node.name_of_string "factory", Node.make "factory" factory_handler)

View File

@ -20,7 +20,7 @@ open Datastructures
open Status
type t = {
name: string;
name: Node.name;
subscriptions: Subscription.set_t;
mtx: Mutex.t;
}
@ -51,13 +51,15 @@ let route_message info n sexp =
let factory arg =
match arg with
| (Arr [Str name]) ->
| (Arr [Str name_str]) ->
let info = {
name = name;
name = Node.name_of_string name_str;
subscriptions = Subscription.new_set ();
mtx = Mutex.create ();
} in
replace_ok (Node.make_idempotent_named classname name (route_message info)) (Str name)
replace_ok
(Node.make_idempotent_named classname info.name (route_message info))
(Str name_str)
| _ ->
Problem (Str "bad-arg")

View File

@ -15,10 +15,12 @@
(* You should have received a copy of the GNU General Public License *)
(* along with Hop. If not, see <http://www.gnu.org/licenses/>. *)
let n_system_log = Node.name_of_string "system.log"
let hook_log () =
let old_hook = !Log.hook in
let new_hook label body =
ignore (Node.post "system.log" (Sexp.Str label) body (Sexp.Str ""));
ignore (Node.post n_system_log (Sexp.Str label) body (Sexp.Str ""));
old_hook label body
in
Log.hook := new_hook

10
meta.ml
View File

@ -17,12 +17,14 @@
open Sexp
let n_meta = Node.name_of_string "meta"
let announce_subscription source filter sink name on_off =
Node.post_ignore "meta" (Str source)
Node.post_ignore n_meta (Str source.Node.label)
(if on_off
then Message.subscribed (Str source, filter, Str sink, name)
else Message.unsubscribed (Str source, filter, Str sink, name))
then Message.subscribed (Str source.Node.label, filter, Str sink, name)
else Message.unsubscribed (Str source.Node.label, filter, Str sink, name))
(Str "")
let init () =
Node.send_ignore "factory" (Message.create (Str "direct", Arr [Str "meta"], Str "", Str ""))
Node.send_ignore' "factory" (Message.create (Str "direct", Arr [Str "meta"], Str "", Str ""))

88
node.ml
View File

@ -25,9 +25,29 @@ and t = {
class_name: string;
handle_message: handle_message_t
}
and name = {
label: string;
mutable binding: t option
}
module NameTable = Weak.Make(struct
type t = name
let equal a b = (a.label = b.label)
let hash a = Hashtbl.hash a.label
end)
module NameSet = Set.Make(struct
type t = name
let compare a b = String.compare a.label b.label
end)
let mutex = Mutex.create ()
let directory = ref StringMap.empty
let name_table = NameTable.create 100
let directory = ref NameSet.empty
let name_of_string str =
Util.with_mutex0 mutex (fun () ->
let template = {label = str; binding = None} in
NameTable.merge name_table template)
let local_container_name () = "server"
@ -37,27 +57,27 @@ let make class_name handler = {
handle_message = handler
}
let lookup name =
try Some (StringMap.find name !directory)
with Not_found -> None
let lookup name = name.binding
let all_node_names () = string_map_keys !directory
let all_node_names () = NameSet.elements !directory
let all_node_name_strings () = List.map (fun x -> x.label) (all_node_names ())
(* Approximate because it doesn't lock or run in a transaction *)
let approx_exists name = StringMap.mem name !directory
let approx_exists name =
match name.binding with
| Some _ -> true
| None -> false
let bind (filter, node) =
if filter = ""
if filter.label = ""
then (Log.warn "Binding to empty name forbidden" []; false)
else
Util.with_mutex0 mutex
(fun () ->
if StringMap.mem filter !directory
then false
else (directory := StringMap.add filter node !directory;
node.names <- StringSet.add filter node.names;
Log.info "Node bound" [Sexp.Str filter; Sexp.Str node.class_name];
true))
Util.with_mutex0 mutex (fun () ->
filter.binding <- Some node;
directory := NameSet.add filter !directory;
node.names <- StringSet.add filter.label node.names;
Log.info "Node bound" [Sexp.Str filter.label; Sexp.Str node.class_name];
true)
(* For use in factory constructor functions, hence the odd return type and values *)
let make_named class_name node_name handler =
@ -76,19 +96,19 @@ let make_idempotent_named class_name node_name handler =
if bind (node_name, node) then Ok node else Problem (Sexp.Str "bind-failed")
let unbind name =
Util.with_mutex0 mutex
(fun () ->
match lookup name with
| Some n ->
Log.info "Node unbound" [Sexp.Str name; Sexp.Str n.class_name];
n.names <- StringSet.remove name n.names;
directory := StringMap.remove name !directory;
true
| None ->
false)
Util.with_mutex0 mutex (fun () ->
match lookup name with
| Some n ->
Log.info "Node unbound" [Sexp.Str name.label; Sexp.Str n.class_name];
n.names <- StringSet.remove name.label n.names;
name.binding <- None;
directory := NameSet.remove name !directory;
true
| None ->
false)
let unbind_all n =
StringSet.iter (fun name -> ignore (unbind name)) n.names;
StringSet.iter (fun name -> ignore (unbind (name_of_string name))) n.names;
n.names <- StringSet.empty
let send name body =
@ -97,23 +117,31 @@ let send name body =
(try n.handle_message n body
with e ->
Log.warn "Node message handler raised exception"
[Sexp.Str name;
[Sexp.Str name.label;
Sexp.Str (Printexc.to_string e)]);
true
| None -> false
let send' str body = send (name_of_string str) body
let post name label body token =
send name (Message.post (label, body, token))
let post' str label body token = post (name_of_string str) label body token
let bind_ignore (filter, node) =
if bind (filter, node)
then ()
else Log.warn "Duplicate binding" [Sexp.Str filter]
else Log.warn "Duplicate binding" [Sexp.Str filter.label]
let send_ignore name body =
if send name body || name = ""
if send name body || name.label = ""
then ()
else Log.warn "send to missing node" [Sexp.Str name; body]
else Log.warn "send to missing node" [Sexp.Str name.label; body]
let send_ignore' str body = send_ignore (name_of_string str) body
let post_ignore name label body token =
send_ignore name (Message.post (label, body, token))
let post_ignore' str label body token = post_ignore (name_of_string str) label body token

View File

@ -19,7 +19,7 @@ open Sexp
open Status
type t = {
name: string;
name: Node.name;
subscriptions: Subscription.set_t;
ch: Message.t Squeue.t;
mutable backlog: Sexp.t Queue.t;
@ -83,17 +83,17 @@ let shoveller info =
let queue_factory arg =
match arg with
| (Arr [Str name]) ->
| (Arr [Str name_str]) ->
let info = {
name = name;
name = Node.name_of_string name_str;
subscriptions = Subscription.new_set ();
ch = Squeue.create 1000;
backlog = Queue.create ();
waiters = Queue.create ()
} in
ignore (Util.create_thread name None shoveller info);
ignore (Util.create_thread name_str None shoveller info);
let queue_handler n sexp = Squeue.add (Message.message_of_sexp sexp) info.ch in
replace_ok (Node.make_idempotent_named classname name queue_handler) (Str name)
replace_ok (Node.make_idempotent_named classname info.name queue_handler) (Str name_str)
| _ ->
Problem (Str "bad-arg")

View File

@ -31,17 +31,17 @@ let send_sexp_syntax_error ch explanation =
let dispatch_message n ch m =
match m with
| Message.Post (Str name, body, token) ->
Node.send_ignore name body
Node.send_ignore' name body
| Message.Subscribe (Str filter, sink, name, Str reply_sink, Str reply_name) ->
if Node.bind(filter, n)
then Node.post_ignore
if Node.bind (Node.name_of_string filter, n)
then Node.post_ignore'
reply_sink
(Str reply_name)
(Message.subscribe_ok (Str filter))
(Str "")
else Log.warn "Bind failed" [Str filter]
| Message.Unsubscribe (Str token) ->
if Node.unbind token
if Node.unbind (Node.name_of_string token)
then ()
else Log.warn "Unbind failed" [Str token]
| _ ->

View File

@ -21,7 +21,7 @@ type t = {
mutable live: bool;
uuid: Uuid.t;
filter: Sexp.t;
sink: string;
sink: Node.name;
name: Sexp.t
}
@ -31,8 +31,9 @@ let new_set () = ref StringMap.empty
let count subs = StringMap.cardinal !subs
let create source subs filter sink name reply_sink reply_name =
let create source subs filter sink_str name reply_sink reply_name =
let uuid = Uuid.create () in
let sink = Node.name_of_string sink_str in
let sub = {
live = true;
uuid = uuid;
@ -41,8 +42,8 @@ let create source subs filter sink name reply_sink reply_name =
name = name
} in
subs := StringMap.add uuid sub !subs;
Meta.announce_subscription source filter sink name true;
Node.post_ignore reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "");
Meta.announce_subscription source filter sink_str name true;
Node.post_ignore' reply_sink reply_name (Message.subscribe_ok (Sexp.Str uuid)) (Sexp.Str "");
sub
let delete source subs uuid =
@ -50,7 +51,7 @@ let delete source subs uuid =
let sub = StringMap.find uuid !subs in
sub.live <- false;
subs := StringMap.remove uuid !subs;
Meta.announce_subscription source sub.filter sub.sink sub.name false;
Meta.announce_subscription source sub.filter sub.sink.Node.label sub.name false;
Some sub
with Not_found ->
None

View File

@ -59,7 +59,7 @@ let cleanup_req id exn =
| None -> ()
let start (s, peername) =
let id = "http-" ^ Uuid.create () in
let id = Node.name_of_string ("http-" ^ Uuid.create ()) in
Util.create_thread (Connections.endpoint_name peername ^ " HTTP service")
(Some (cleanup_req id))
(Httpd.main (handle_req id))
@ -75,11 +75,11 @@ let api_server_stats _ id r =
|> Httpd.add_date_header
let api_nodes _ id r =
Json.resp_ok [] (Json.Rec ["nodes", Json.Arr (List.map Json.str (Node.all_node_names ()))])
Json.resp_ok [] (Json.Rec ["nodes", Json.Arr (List.map Json.str (Node.all_node_name_strings ()))])
|> Httpd.add_date_header
let api_node_info suffix id r =
(match Node.lookup suffix with
(match Node.lookup (Node.name_of_string suffix) with
| Some n ->
Json.resp_ok [] (Json.Rec
["names", Json.Arr (List.map Json.str (StringSet.elements n.Node.names));

View File

@ -42,7 +42,8 @@ let rec api_tap_source id r =
if not (Node.bind (id, n))
then Httpd.http_error_html 500 "Internal ID collision" []
else
let id_block_and_padding = Stringstream.const_flush (id ^ ";" ^ String.make 2048 'h' ^ ";") in
let id_block_and_padding =
Stringstream.const_flush (id.Node.label ^ ";" ^ String.make 2048 'h' ^ ";") in
handle_message n (Message.subscribe (Sexp.Str (Node.local_container_name()),
Sexp.Str "", Sexp.Str "",
Sexp.Str "", Sexp.Str ""));
@ -72,7 +73,7 @@ let api_tap_sink irrelevant_id r =
with _ -> Httpd.http_error_html 406 "Bad data parameter" []) in
(match Message.message_of_sexp data with
| Message.Post (Sexp.Str name, body, token) ->
Node.send_ignore name body;
Node.send_ignore' name body;
Httpd.resp_generic 202 "Accepted" [] (Httpd.empty_content)
| _ ->
Httpd.http_error_html 406 "Message not understood" [])