Switch to mutable backlog/waiter queues.

This commit is contained in:
Tony Garnock-Jones 2012-05-05 09:47:20 -04:00
parent ee5a079010
commit b43fe2974c
1 changed files with 14 additions and 15 deletions

View File

@ -22,8 +22,8 @@ type t = {
name: string; name: string;
subscriptions: Subscription.set_t; subscriptions: Subscription.set_t;
ch: Message.t Squeue.t; ch: Message.t Squeue.t;
mutable backlog: Sexp.t Fqueue.t; mutable backlog: Sexp.t Queue.t;
mutable waiters: Subscription.t Fqueue.t; mutable waiters: Subscription.t Queue.t;
} }
let classname = "queue" let classname = "queue"
@ -31,25 +31,24 @@ let classname = "queue"
let report info n = let report info n =
Log.info (Printf.sprintf "do_burst %d capacity, %d backlog, %d waiters, %d ticks left\n%!" Log.info (Printf.sprintf "do_burst %d capacity, %d backlog, %d waiters, %d ticks left\n%!"
(Squeue.approx_capacity info.ch) (Squeue.approx_capacity info.ch)
(Fqueue.length info.backlog) (Queue.length info.backlog)
(Fqueue.length info.waiters) (Queue.length info.waiters)
n) [] n) []
let rec do_burst info n = let rec do_burst info n =
(* report info n; *) (* report info n; *)
if Fqueue.is_empty info.backlog then false if Queue.is_empty info.backlog then false
else else
if Fqueue.is_empty info.waiters then false if Queue.is_empty info.waiters then false
else else
if n = 0 then true (* maybe more work available, but should poll for outside events *) if n = 0 then true (* maybe more work available, but should poll for outside events *)
else else
let (body, new_backlog) = Fqueue.really_pop_front info.backlog in let body = Queue.peek info.backlog in
let (sub, new_waiters) = Fqueue.really_pop_front info.waiters in let sub = Queue.pop info.waiters in
info.waiters <- new_waiters;
if Subscription.send_to_subscription info.name info.subscriptions sub body if Subscription.send_to_subscription info.name info.subscriptions sub body
then then
(info.waiters <- Fqueue.push_back info.waiters sub; (Queue.push sub info.waiters;
info.backlog <- new_backlog; ignore (Queue.pop info.backlog);
do_burst info (n - 1)) do_burst info (n - 1))
else else
do_burst info n do_burst info n
@ -66,13 +65,13 @@ let shoveller info =
let rec loop () = let rec loop () =
match process_and_wait info with match process_and_wait info with
| Message.Post (name, body, token) -> | Message.Post (name, body, token) ->
info.backlog <- Fqueue.push_back info.backlog body; Queue.push body info.backlog;
loop () loop ()
| Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) -> | Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) ->
let sub = let sub =
Subscription.create Subscription.create
info.name info.subscriptions filter sink name reply_sink reply_name in info.name info.subscriptions filter sink name reply_sink reply_name in
info.waiters <- Fqueue.push_back info.waiters sub; Queue.push sub info.waiters;
loop () loop ()
| Message.Unsubscribe (Str token) -> | Message.Unsubscribe (Str token) ->
ignore (Subscription.delete info.name info.subscriptions token); ignore (Subscription.delete info.name info.subscriptions token);
@ -89,8 +88,8 @@ let queue_factory arg =
name = name; name = name;
subscriptions = Subscription.new_set (); subscriptions = Subscription.new_set ();
ch = Squeue.create 1000; ch = Squeue.create 1000;
backlog = Fqueue.empty; backlog = Queue.create ();
waiters = Fqueue.empty waiters = Queue.create ()
} in } in
ignore (Util.create_thread name None shoveller info); ignore (Util.create_thread name None shoveller info);
let queue_handler n sexp = Squeue.add (Message.message_of_sexp sexp) info.ch in let queue_handler n sexp = Squeue.add (Message.message_of_sexp sexp) info.ch in