From 4cc9164c2e2832b7b6890b94f22ef4d34fb0af97 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Sun, 8 Jan 2012 13:17:59 -0500 Subject: [PATCH] Use a shared queue instead of an Event.channel --- queuenode.ml | 10 +++++----- squeue.ml | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) create mode 100644 squeue.ml diff --git a/queuenode.ml b/queuenode.ml index e49e907..f1c4cf7 100644 --- a/queuenode.ml +++ b/queuenode.ml @@ -3,7 +3,7 @@ open Sexp type t = { name: string; subscriptions: Subscription.set_t; - ch: Message.t Event.channel; + ch: Message.t Squeue.t; mutable backlog: Sexp.t Fqueue.t; mutable waiters: Subscription.t Fqueue.t; } @@ -36,9 +36,9 @@ let rec do_burst info n = let rec process_and_wait info = if not (do_burst info 1000) - then Event.sync (Event.receive info.ch) + then Squeue.pop info.ch else - match Event.poll (Event.receive info.ch) with + match Squeue.peek info.ch with | Some m -> m | None -> process_and_wait info @@ -69,12 +69,12 @@ let queue_factory arg = let info = { name = name; subscriptions = Subscription.new_set (); - ch = Event.new_channel (); + ch = Squeue.create 1000; backlog = Fqueue.empty; waiters = Fqueue.empty } in ignore (Util.create_thread name None shoveller info); - let queue_handler n sexp = Event.sync (Event.send info.ch (Message.message_of_sexp sexp)) in + let queue_handler n sexp = Squeue.add (Message.message_of_sexp sexp) info.ch in Node.make_named classname name queue_handler | _ -> Some (Str "bad-arg") diff --git a/squeue.ml b/squeue.ml new file mode 100644 index 0000000..4bd4e69 --- /dev/null +++ b/squeue.ml @@ -0,0 +1,50 @@ +(* Shared queue *) + +type 'a t = { + mtx: Mutex.t; + mutable capacity: int; + nonfull: Condition.t; + nonempty: Condition.t; + queue: 'a Queue.t + } + +let create n = { + mtx = Mutex.create (); + capacity = n; + nonfull = Condition.create (); + nonempty = Condition.create (); + queue = Queue.create () +} + +let add v q = + Mutex.lock q.mtx; + while q.capacity < 1 do + Condition.wait q.nonfull q.mtx + done; + q.capacity <- q.capacity - 1; + Queue.add v q.queue; + Condition.signal q.nonempty; + Mutex.unlock q.mtx + +let pop q = + Mutex.lock q.mtx; + while Queue.is_empty q.queue do + Condition.wait q.nonempty q.mtx + done; + let result = Queue.pop q.queue in + q.capacity <- q.capacity + 1; + Condition.signal q.nonfull; + Mutex.unlock q.mtx; + result + +let peek q = + Mutex.lock q.mtx; + let result = + if Queue.is_empty q.queue + then None + else (q.capacity <- q.capacity + 1; + Condition.signal q.nonfull; + Some (Queue.pop q.queue)) + in + Mutex.unlock q.mtx; + result