Only spawn queuenode threads if we are really creating a node
This commit is contained in:
parent
23c5ea314e
commit
03a165eae7
|
@ -82,7 +82,7 @@ let factory arg =
|
||||||
routing_table = StringMap.empty;
|
routing_table = StringMap.empty;
|
||||||
} in
|
} in
|
||||||
replace_ok
|
replace_ok
|
||||||
(Node.make_idempotent_named classname info.name (route_message info))
|
(Node.make_idempotent_named classname info.name return (route_message info))
|
||||||
(Str name_str)
|
(Str name_str)
|
||||||
| _ ->
|
| _ ->
|
||||||
return (Problem (Str "bad-arg"))
|
return (Problem (Str "bad-arg"))
|
||||||
|
|
|
@ -55,7 +55,7 @@ let factory arg =
|
||||||
subscriptions = Subscription.new_set ()
|
subscriptions = Subscription.new_set ()
|
||||||
} in
|
} in
|
||||||
replace_ok
|
replace_ok
|
||||||
(Node.make_idempotent_named classname info.name (route_message info))
|
(Node.make_idempotent_named classname info.name return (route_message info))
|
||||||
(Str name_str)
|
(Str name_str)
|
||||||
| _ ->
|
| _ ->
|
||||||
return (Problem (Str "bad-arg"))
|
return (Problem (Str "bad-arg"))
|
||||||
|
|
4
node.ml
4
node.ml
|
@ -85,7 +85,7 @@ let make_named class_name node_name handler =
|
||||||
| false -> return (Problem (Sexp.Str "bind-failed"))
|
| false -> return (Problem (Sexp.Str "bind-failed"))
|
||||||
|
|
||||||
(* For use in factory constructor functions, hence the odd return type and values *)
|
(* For use in factory constructor functions, hence the odd return type and values *)
|
||||||
let make_idempotent_named class_name node_name handler =
|
let make_idempotent_named class_name node_name if_new_node handler =
|
||||||
match lookup node_name with
|
match lookup node_name with
|
||||||
| Some n ->
|
| Some n ->
|
||||||
return (if n.class_name = class_name
|
return (if n.class_name = class_name
|
||||||
|
@ -94,7 +94,7 @@ let make_idempotent_named class_name node_name handler =
|
||||||
| None ->
|
| None ->
|
||||||
let node = make class_name handler in
|
let node = make class_name handler in
|
||||||
match_lwt bind (node_name, node) with
|
match_lwt bind (node_name, node) with
|
||||||
| true -> return (Ok node)
|
| true -> lwt () = if_new_node () in return (Ok node)
|
||||||
| false -> return (Problem (Sexp.Str "bind-failed"))
|
| false -> return (Problem (Sexp.Str "bind-failed"))
|
||||||
|
|
||||||
let unbind name =
|
let unbind name =
|
||||||
|
|
23
queuenode.ml
23
queuenode.ml
|
@ -34,11 +34,14 @@ type t = {
|
||||||
|
|
||||||
let classname = "queue"
|
let classname = "queue"
|
||||||
|
|
||||||
let report info n =
|
let report info =
|
||||||
Log.info (Printf.sprintf "do_burst %d backlog, %d waiters, %d ticks left\n%!"
|
while_lwt true do
|
||||||
info.backlog
|
lwt () = Log.info (Printf.sprintf "%s: %d backlog, %d waiters"
|
||||||
info.waiters
|
info.name.Node.label
|
||||||
n) []
|
info.backlog
|
||||||
|
info.waiters) [] in
|
||||||
|
Lwt_unix.sleep 1.0
|
||||||
|
done
|
||||||
|
|
||||||
let shoveller info =
|
let shoveller info =
|
||||||
let rec message_loop () =
|
let rec message_loop () =
|
||||||
|
@ -88,11 +91,15 @@ let queue_factory arg =
|
||||||
waiters_in = win;
|
waiters_in = win;
|
||||||
waiters_out = wout;
|
waiters_out = wout;
|
||||||
backlog = 0;
|
backlog = 0;
|
||||||
waiters = 0
|
waiters = 0;
|
||||||
} in
|
} in
|
||||||
ignore (Util.create_thread name_str None shoveller info);
|
|
||||||
replace_ok
|
replace_ok
|
||||||
(Node.make_idempotent_named classname info.name (queue_handler info))
|
(Node.make_idempotent_named classname info.name
|
||||||
|
(fun () ->
|
||||||
|
ignore (Util.create_thread name_str None shoveller info);
|
||||||
|
ignore (report info);
|
||||||
|
return ())
|
||||||
|
(queue_handler info))
|
||||||
(Str name_str)
|
(Str name_str)
|
||||||
| _ ->
|
| _ ->
|
||||||
return (Problem (Str "bad-arg"))
|
return (Problem (Str "bad-arg"))
|
||||||
|
|
Loading…
Reference in New Issue