Yield the CPU every 1000 transmissions to a given node, and synchronously deliver.
This commit is contained in:
parent
03a165eae7
commit
536f1a03d2
21
node.ml
21
node.ml
|
@ -22,10 +22,11 @@ open Status
|
||||||
|
|
||||||
type handle_message_t = t -> Sexp.t -> unit Lwt.t
|
type handle_message_t = t -> Sexp.t -> unit Lwt.t
|
||||||
and t = {
|
and t = {
|
||||||
mutable names: StringSet.t;
|
mutable names: StringSet.t;
|
||||||
class_name: string;
|
mutable send_counter: int;
|
||||||
handle_message: handle_message_t
|
class_name: string;
|
||||||
}
|
handle_message: handle_message_t
|
||||||
|
}
|
||||||
and name = {
|
and name = {
|
||||||
label: string;
|
label: string;
|
||||||
mutable binding: t option
|
mutable binding: t option
|
||||||
|
@ -52,6 +53,7 @@ let local_container_name () = "server"
|
||||||
|
|
||||||
let make class_name handler = {
|
let make class_name handler = {
|
||||||
names = StringSet.empty;
|
names = StringSet.empty;
|
||||||
|
send_counter = 0;
|
||||||
class_name = class_name;
|
class_name = class_name;
|
||||||
handle_message = handler
|
handle_message = handler
|
||||||
}
|
}
|
||||||
|
@ -120,12 +122,19 @@ let unbind_all n =
|
||||||
let send name body =
|
let send name body =
|
||||||
match lookup name with
|
match lookup name with
|
||||||
| Some n ->
|
| Some n ->
|
||||||
ignore
|
lwt () =
|
||||||
(try_lwt n.handle_message n body
|
(try_lwt n.handle_message n body
|
||||||
with e ->
|
with e ->
|
||||||
Log.warn "Node message handler raised exception"
|
Log.warn "Node message handler raised exception"
|
||||||
[Sexp.Str name.label;
|
[Sexp.Str name.label;
|
||||||
Sexp.Str (Printexc.to_string e)]);
|
Sexp.Str (Printexc.to_string e)])
|
||||||
|
in
|
||||||
|
n.send_counter <- n.send_counter + 1;
|
||||||
|
lwt () =
|
||||||
|
if n.send_counter >= 1000
|
||||||
|
then (n.send_counter <- 0; Lwt_unix.yield ())
|
||||||
|
else return ()
|
||||||
|
in
|
||||||
return true
|
return true
|
||||||
| None ->
|
| None ->
|
||||||
return false
|
return false
|
||||||
|
|
Loading…
Reference in New Issue