Use a shared queue instead of an Event.channel
This commit is contained in:
parent
a11fdc0500
commit
4cc9164c2e
10
queuenode.ml
10
queuenode.ml
|
@ -3,7 +3,7 @@ open Sexp
|
||||||
type t = {
|
type t = {
|
||||||
name: string;
|
name: string;
|
||||||
subscriptions: Subscription.set_t;
|
subscriptions: Subscription.set_t;
|
||||||
ch: Message.t Event.channel;
|
ch: Message.t Squeue.t;
|
||||||
mutable backlog: Sexp.t Fqueue.t;
|
mutable backlog: Sexp.t Fqueue.t;
|
||||||
mutable waiters: Subscription.t Fqueue.t;
|
mutable waiters: Subscription.t Fqueue.t;
|
||||||
}
|
}
|
||||||
|
@ -36,9 +36,9 @@ let rec do_burst info n =
|
||||||
|
|
||||||
let rec process_and_wait info =
|
let rec process_and_wait info =
|
||||||
if not (do_burst info 1000)
|
if not (do_burst info 1000)
|
||||||
then Event.sync (Event.receive info.ch)
|
then Squeue.pop info.ch
|
||||||
else
|
else
|
||||||
match Event.poll (Event.receive info.ch) with
|
match Squeue.peek info.ch with
|
||||||
| Some m -> m
|
| Some m -> m
|
||||||
| None -> process_and_wait info
|
| None -> process_and_wait info
|
||||||
|
|
||||||
|
@ -69,12 +69,12 @@ let queue_factory arg =
|
||||||
let info = {
|
let info = {
|
||||||
name = name;
|
name = name;
|
||||||
subscriptions = Subscription.new_set ();
|
subscriptions = Subscription.new_set ();
|
||||||
ch = Event.new_channel ();
|
ch = Squeue.create 1000;
|
||||||
backlog = Fqueue.empty;
|
backlog = Fqueue.empty;
|
||||||
waiters = Fqueue.empty
|
waiters = Fqueue.empty
|
||||||
} in
|
} in
|
||||||
ignore (Util.create_thread name None shoveller info);
|
ignore (Util.create_thread name None shoveller info);
|
||||||
let queue_handler n sexp = Event.sync (Event.send info.ch (Message.message_of_sexp sexp)) in
|
let queue_handler n sexp = Squeue.add (Message.message_of_sexp sexp) info.ch in
|
||||||
Node.make_named classname name queue_handler
|
Node.make_named classname name queue_handler
|
||||||
| _ ->
|
| _ ->
|
||||||
Some (Str "bad-arg")
|
Some (Str "bad-arg")
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
(* Shared queue *)
|
||||||
|
|
||||||
|
type 'a t = {
|
||||||
|
mtx: Mutex.t;
|
||||||
|
mutable capacity: int;
|
||||||
|
nonfull: Condition.t;
|
||||||
|
nonempty: Condition.t;
|
||||||
|
queue: 'a Queue.t
|
||||||
|
}
|
||||||
|
|
||||||
|
let create n = {
|
||||||
|
mtx = Mutex.create ();
|
||||||
|
capacity = n;
|
||||||
|
nonfull = Condition.create ();
|
||||||
|
nonempty = Condition.create ();
|
||||||
|
queue = Queue.create ()
|
||||||
|
}
|
||||||
|
|
||||||
|
let add v q =
|
||||||
|
Mutex.lock q.mtx;
|
||||||
|
while q.capacity < 1 do
|
||||||
|
Condition.wait q.nonfull q.mtx
|
||||||
|
done;
|
||||||
|
q.capacity <- q.capacity - 1;
|
||||||
|
Queue.add v q.queue;
|
||||||
|
Condition.signal q.nonempty;
|
||||||
|
Mutex.unlock q.mtx
|
||||||
|
|
||||||
|
let pop q =
|
||||||
|
Mutex.lock q.mtx;
|
||||||
|
while Queue.is_empty q.queue do
|
||||||
|
Condition.wait q.nonempty q.mtx
|
||||||
|
done;
|
||||||
|
let result = Queue.pop q.queue in
|
||||||
|
q.capacity <- q.capacity + 1;
|
||||||
|
Condition.signal q.nonfull;
|
||||||
|
Mutex.unlock q.mtx;
|
||||||
|
result
|
||||||
|
|
||||||
|
let peek q =
|
||||||
|
Mutex.lock q.mtx;
|
||||||
|
let result =
|
||||||
|
if Queue.is_empty q.queue
|
||||||
|
then None
|
||||||
|
else (q.capacity <- q.capacity + 1;
|
||||||
|
Condition.signal q.nonfull;
|
||||||
|
Some (Queue.pop q.queue))
|
||||||
|
in
|
||||||
|
Mutex.unlock q.mtx;
|
||||||
|
result
|
Loading…
Reference in New Issue