diff --git a/directnode.ml b/directnode.ml index 6594f0c..cc503c3 100644 --- a/directnode.ml +++ b/directnode.ml @@ -82,7 +82,7 @@ let factory arg = routing_table = StringMap.empty; } in replace_ok - (Node.make_idempotent_named classname info.name (route_message info)) + (Node.make_idempotent_named classname info.name return (route_message info)) (Str name_str) | _ -> return (Problem (Str "bad-arg")) diff --git a/fanoutnode.ml b/fanoutnode.ml index f788b3e..b726a29 100644 --- a/fanoutnode.ml +++ b/fanoutnode.ml @@ -55,7 +55,7 @@ let factory arg = subscriptions = Subscription.new_set () } in replace_ok - (Node.make_idempotent_named classname info.name (route_message info)) + (Node.make_idempotent_named classname info.name return (route_message info)) (Str name_str) | _ -> return (Problem (Str "bad-arg")) diff --git a/node.ml b/node.ml index d1f0de9..0074192 100644 --- a/node.ml +++ b/node.ml @@ -85,7 +85,7 @@ let make_named class_name node_name handler = | false -> return (Problem (Sexp.Str "bind-failed")) (* For use in factory constructor functions, hence the odd return type and values *) -let make_idempotent_named class_name node_name handler = +let make_idempotent_named class_name node_name if_new_node handler = match lookup node_name with | Some n -> return (if n.class_name = class_name @@ -94,7 +94,7 @@ let make_idempotent_named class_name node_name handler = | None -> let node = make class_name handler in match_lwt bind (node_name, node) with - | true -> return (Ok node) + | true -> lwt () = if_new_node () in return (Ok node) | false -> return (Problem (Sexp.Str "bind-failed")) let unbind name = diff --git a/queuenode.ml b/queuenode.ml index 4ea7f14..1bc8721 100644 --- a/queuenode.ml +++ b/queuenode.ml @@ -34,11 +34,14 @@ type t = { let classname = "queue" -let report info n = - Log.info (Printf.sprintf "do_burst %d backlog, %d waiters, %d ticks left\n%!" - info.backlog - info.waiters - n) [] +let report info = + while_lwt true do + lwt () = Log.info (Printf.sprintf "%s: %d backlog, %d waiters" + info.name.Node.label + info.backlog + info.waiters) [] in + Lwt_unix.sleep 1.0 + done let shoveller info = let rec message_loop () = @@ -88,11 +91,15 @@ let queue_factory arg = waiters_in = win; waiters_out = wout; backlog = 0; - waiters = 0 + waiters = 0; } in - ignore (Util.create_thread name_str None shoveller info); replace_ok - (Node.make_idempotent_named classname info.name (queue_handler info)) + (Node.make_idempotent_named classname info.name + (fun () -> + ignore (Util.create_thread name_str None shoveller info); + ignore (report info); + return ()) + (queue_handler info)) (Str name_str) | _ -> return (Problem (Str "bad-arg"))