Avoid a few gratuitous off-fast-path races
This commit is contained in:
parent
c9441e50da
commit
b69a89b574
|
@ -265,7 +265,7 @@ let handle_method conn channel m =
|
|||
Sexp.Str "Queue_declare_reply"))
|
||||
| Queue_bind (queue, exchange, routing_key, no_wait, arguments) ->
|
||||
let queue = expand_mrdq conn queue in
|
||||
if not (Node.exists queue)
|
||||
if not (Node.approx_exists queue)
|
||||
then send_warning conn not_found ("Queue "^queue^" not found")
|
||||
else
|
||||
if Node.send exchange (Message.subscribe (Sexp.Str routing_key,
|
||||
|
|
|
@ -3,6 +3,7 @@ open Printf
|
|||
open Thread
|
||||
open Sexp
|
||||
|
||||
let connection_mtx = Mutex.create ()
|
||||
let connection_count = ref 0
|
||||
|
||||
let endpoint_name n =
|
||||
|
@ -48,9 +49,9 @@ let connection_main class_name peername cin cout issue_banner boot_fn node_fn ma
|
|||
let start_connection' class_name issue_banner boot_fn node_fn mainloop (s, peername) =
|
||||
let cin = in_channel_of_descr s in
|
||||
let cout = out_channel_of_descr s in
|
||||
connection_count := !connection_count + 1;
|
||||
Util.with_mutex0 connection_mtx (fun () -> connection_count := !connection_count + 1);
|
||||
connection_main class_name peername cin cout issue_banner boot_fn node_fn mainloop;
|
||||
connection_count := !connection_count - 1;
|
||||
Util.with_mutex0 connection_mtx (fun () -> connection_count := !connection_count - 1);
|
||||
(try flush cout with _ -> ());
|
||||
close s
|
||||
|
||||
|
|
13
factory.ml
13
factory.ml
|
@ -4,14 +4,17 @@ open Datastructures
|
|||
|
||||
type factory_t = Sexp.t -> (Sexp.t, Sexp.t) Status.t
|
||||
|
||||
let mutex = Mutex.create ()
|
||||
let classes = ref StringMap.empty
|
||||
|
||||
let register_class name factory =
|
||||
if StringMap.mem name !classes
|
||||
then (Log.error "Duplicate node class name" [Str name];
|
||||
exit 1)
|
||||
else (Log.info "Registered node class" [Str name];
|
||||
classes := StringMap.add name factory !classes)
|
||||
Util.with_mutex0 mutex
|
||||
(fun () ->
|
||||
if StringMap.mem name !classes
|
||||
then (Log.error "Duplicate node class name" [Str name];
|
||||
exit 1)
|
||||
else (Log.info "Registered node class" [Str name];
|
||||
classes := StringMap.add name factory !classes))
|
||||
|
||||
let lookup_class name =
|
||||
try Some (StringMap.find name !classes)
|
||||
|
|
36
node.ml
36
node.ml
|
@ -9,6 +9,7 @@ and t = {
|
|||
handle_message: handle_message_t
|
||||
}
|
||||
|
||||
let mutex = Mutex.create ()
|
||||
let directory = ref StringMap.empty
|
||||
|
||||
let local_container_name () = "server"
|
||||
|
@ -23,18 +24,21 @@ let lookup name =
|
|||
try Some (StringMap.find name !directory)
|
||||
with Not_found -> None
|
||||
|
||||
let exists name = StringMap.mem name !directory
|
||||
(* Approximate because it doesn't lock or run in a transaction *)
|
||||
let approx_exists name = StringMap.mem name !directory
|
||||
|
||||
let bind (filter, node) =
|
||||
if filter = ""
|
||||
then (Log.warn "Binding to empty name forbidden" []; false)
|
||||
else
|
||||
if StringMap.mem filter !directory
|
||||
then false
|
||||
else (directory := StringMap.add filter node !directory;
|
||||
node.names <- StringSet.add filter node.names;
|
||||
Log.info "Node bound" [Sexp.Str filter; Sexp.Str node.class_name];
|
||||
true)
|
||||
Util.with_mutex0 mutex
|
||||
(fun () ->
|
||||
if StringMap.mem filter !directory
|
||||
then false
|
||||
else (directory := StringMap.add filter node !directory;
|
||||
node.names <- StringSet.add filter node.names;
|
||||
Log.info "Node bound" [Sexp.Str filter; Sexp.Str node.class_name];
|
||||
true))
|
||||
|
||||
(* For use in factory constructor functions, hence the odd return type and values *)
|
||||
let make_named class_name node_name handler =
|
||||
|
@ -53,14 +57,16 @@ let make_idempotent_named class_name node_name handler =
|
|||
if bind (node_name, node) then Ok node else Problem (Sexp.Str "bind-failed")
|
||||
|
||||
let unbind name =
|
||||
match lookup name with
|
||||
| Some n ->
|
||||
Log.info "Node unbound" [Sexp.Str name; Sexp.Str n.class_name];
|
||||
n.names <- StringSet.remove name n.names;
|
||||
directory := StringMap.remove name !directory;
|
||||
true
|
||||
| None ->
|
||||
false
|
||||
Util.with_mutex0 mutex
|
||||
(fun () ->
|
||||
match lookup name with
|
||||
| Some n ->
|
||||
Log.info "Node unbound" [Sexp.Str name; Sexp.Str n.class_name];
|
||||
n.names <- StringSet.remove name n.names;
|
||||
directory := StringMap.remove name !directory;
|
||||
true
|
||||
| None ->
|
||||
false)
|
||||
|
||||
let unbind_all n =
|
||||
StringSet.iter (fun name -> ignore (unbind name)) n.names;
|
||||
|
|
Loading…
Reference in New Issue