diff --git a/amqp_relay.ml b/amqp_relay.ml index 97c864a..c03c2e8 100644 --- a/amqp_relay.ml +++ b/amqp_relay.ml @@ -265,7 +265,7 @@ let handle_method conn channel m = Sexp.Str "Queue_declare_reply")) | Queue_bind (queue, exchange, routing_key, no_wait, arguments) -> let queue = expand_mrdq conn queue in - if not (Node.exists queue) + if not (Node.approx_exists queue) then send_warning conn not_found ("Queue "^queue^" not found") else if Node.send exchange (Message.subscribe (Sexp.Str routing_key, diff --git a/connections.ml b/connections.ml index 5d71af2..d98eeae 100644 --- a/connections.ml +++ b/connections.ml @@ -3,6 +3,7 @@ open Printf open Thread open Sexp +let connection_mtx = Mutex.create () let connection_count = ref 0 let endpoint_name n = @@ -48,9 +49,9 @@ let connection_main class_name peername cin cout issue_banner boot_fn node_fn ma let start_connection' class_name issue_banner boot_fn node_fn mainloop (s, peername) = let cin = in_channel_of_descr s in let cout = out_channel_of_descr s in - connection_count := !connection_count + 1; + Util.with_mutex0 connection_mtx (fun () -> connection_count := !connection_count + 1); connection_main class_name peername cin cout issue_banner boot_fn node_fn mainloop; - connection_count := !connection_count - 1; + Util.with_mutex0 connection_mtx (fun () -> connection_count := !connection_count - 1); (try flush cout with _ -> ()); close s diff --git a/factory.ml b/factory.ml index 8d3b02d..78f31e6 100644 --- a/factory.ml +++ b/factory.ml @@ -4,14 +4,17 @@ open Datastructures type factory_t = Sexp.t -> (Sexp.t, Sexp.t) Status.t +let mutex = Mutex.create () let classes = ref StringMap.empty let register_class name factory = - if StringMap.mem name !classes - then (Log.error "Duplicate node class name" [Str name]; - exit 1) - else (Log.info "Registered node class" [Str name]; - classes := StringMap.add name factory !classes) + Util.with_mutex0 mutex + (fun () -> + if StringMap.mem name !classes + then (Log.error "Duplicate node class name" [Str name]; + exit 1) + else (Log.info "Registered node class" [Str name]; + classes := StringMap.add name factory !classes)) let lookup_class name = try Some (StringMap.find name !classes) diff --git a/node.ml b/node.ml index 294f0c2..1721a43 100644 --- a/node.ml +++ b/node.ml @@ -9,6 +9,7 @@ and t = { handle_message: handle_message_t } +let mutex = Mutex.create () let directory = ref StringMap.empty let local_container_name () = "server" @@ -23,18 +24,21 @@ let lookup name = try Some (StringMap.find name !directory) with Not_found -> None -let exists name = StringMap.mem name !directory +(* Approximate because it doesn't lock or run in a transaction *) +let approx_exists name = StringMap.mem name !directory let bind (filter, node) = if filter = "" then (Log.warn "Binding to empty name forbidden" []; false) else - if StringMap.mem filter !directory - then false - else (directory := StringMap.add filter node !directory; - node.names <- StringSet.add filter node.names; - Log.info "Node bound" [Sexp.Str filter; Sexp.Str node.class_name]; - true) + Util.with_mutex0 mutex + (fun () -> + if StringMap.mem filter !directory + then false + else (directory := StringMap.add filter node !directory; + node.names <- StringSet.add filter node.names; + Log.info "Node bound" [Sexp.Str filter; Sexp.Str node.class_name]; + true)) (* For use in factory constructor functions, hence the odd return type and values *) let make_named class_name node_name handler = @@ -53,14 +57,16 @@ let make_idempotent_named class_name node_name handler = if bind (node_name, node) then Ok node else Problem (Sexp.Str "bind-failed") let unbind name = - match lookup name with - | Some n -> - Log.info "Node unbound" [Sexp.Str name; Sexp.Str n.class_name]; - n.names <- StringSet.remove name n.names; - directory := StringMap.remove name !directory; - true - | None -> - false + Util.with_mutex0 mutex + (fun () -> + match lookup name with + | Some n -> + Log.info "Node unbound" [Sexp.Str name; Sexp.Str n.class_name]; + n.names <- StringSet.remove name n.names; + directory := StringMap.remove name !directory; + true + | None -> + false) let unbind_all n = StringSet.iter (fun name -> ignore (unbind name)) n.names;