From b43fe2974cac7bd434b004271a0b7d6929f726fd Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sat, 5 May 2012 09:47:20 -0400 Subject: [PATCH] Switch to mutable backlog/waiter queues. --- queuenode.ml | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/queuenode.ml b/queuenode.ml index f1a4e50..4a597ba 100644 --- a/queuenode.ml +++ b/queuenode.ml @@ -22,8 +22,8 @@ type t = { name: string; subscriptions: Subscription.set_t; ch: Message.t Squeue.t; - mutable backlog: Sexp.t Fqueue.t; - mutable waiters: Subscription.t Fqueue.t; + mutable backlog: Sexp.t Queue.t; + mutable waiters: Subscription.t Queue.t; } let classname = "queue" @@ -31,25 +31,24 @@ let classname = "queue" let report info n = Log.info (Printf.sprintf "do_burst %d capacity, %d backlog, %d waiters, %d ticks left\n%!" (Squeue.approx_capacity info.ch) - (Fqueue.length info.backlog) - (Fqueue.length info.waiters) + (Queue.length info.backlog) + (Queue.length info.waiters) n) [] let rec do_burst info n = (* report info n; *) - if Fqueue.is_empty info.backlog then false + if Queue.is_empty info.backlog then false else - if Fqueue.is_empty info.waiters then false + if Queue.is_empty info.waiters then false else if n = 0 then true (* maybe more work available, but should poll for outside events *) else - let (body, new_backlog) = Fqueue.really_pop_front info.backlog in - let (sub, new_waiters) = Fqueue.really_pop_front info.waiters in - info.waiters <- new_waiters; + let body = Queue.peek info.backlog in + let sub = Queue.pop info.waiters in if Subscription.send_to_subscription info.name info.subscriptions sub body then - (info.waiters <- Fqueue.push_back info.waiters sub; - info.backlog <- new_backlog; + (Queue.push sub info.waiters; + ignore (Queue.pop info.backlog); do_burst info (n - 1)) else do_burst info n @@ -66,13 +65,13 @@ let shoveller info = let rec loop () = match process_and_wait info with | Message.Post (name, body, token) -> - info.backlog <- Fqueue.push_back info.backlog body; + Queue.push body info.backlog; loop () | Message.Subscribe (filter, Str sink, name, Str reply_sink, reply_name) -> let sub = Subscription.create info.name info.subscriptions filter sink name reply_sink reply_name in - info.waiters <- Fqueue.push_back info.waiters sub; + Queue.push sub info.waiters; loop () | Message.Unsubscribe (Str token) -> ignore (Subscription.delete info.name info.subscriptions token); @@ -89,8 +88,8 @@ let queue_factory arg = name = name; subscriptions = Subscription.new_set (); ch = Squeue.create 1000; - backlog = Fqueue.empty; - waiters = Fqueue.empty + backlog = Queue.create (); + waiters = Queue.create () } in ignore (Util.create_thread name None shoveller info); let queue_handler n sexp = Squeue.add (Message.message_of_sexp sexp) info.ch in