Array-based shared-queues give a small performance increase on Linux, based on very rough testing
This commit is contained in:
parent
ca3bc78501
commit
7aba3d5659
30
squeue.ml
30
squeue.ml
|
@ -22,7 +22,9 @@ type 'a t = {
|
|||
mutable capacity: int;
|
||||
nonfull: Condition.t;
|
||||
nonempty: Condition.t;
|
||||
queue: 'a Queue.t
|
||||
queue: 'a array;
|
||||
mutable read_pointer: int;
|
||||
mutable write_pointer: int;
|
||||
}
|
||||
|
||||
let create n = {
|
||||
|
@ -30,7 +32,9 @@ let create n = {
|
|||
capacity = n;
|
||||
nonfull = Condition.create ();
|
||||
nonempty = Condition.create ();
|
||||
queue = Queue.create ()
|
||||
queue = Array.make n (Obj.magic None);
|
||||
read_pointer = 0;
|
||||
write_pointer = 0
|
||||
}
|
||||
|
||||
let approx_capacity q = q.capacity
|
||||
|
@ -41,16 +45,21 @@ let add v q =
|
|||
Condition.wait q.nonfull q.mtx
|
||||
done;
|
||||
q.capacity <- q.capacity - 1;
|
||||
Queue.add v q.queue;
|
||||
Array.set q.queue q.write_pointer v;
|
||||
q.write_pointer <- (q.write_pointer + 1) mod (Array.length q.queue);
|
||||
Condition.signal q.nonempty;
|
||||
Mutex.unlock q.mtx
|
||||
|
||||
let _locked_empty q =
|
||||
q.capacity = (Array.length q.queue)
|
||||
|
||||
let pop q =
|
||||
Mutex.lock q.mtx;
|
||||
while Queue.is_empty q.queue do
|
||||
while _locked_empty q do
|
||||
Condition.wait q.nonempty q.mtx
|
||||
done;
|
||||
let result = Queue.pop q.queue in
|
||||
let result = Array.get q.queue q.read_pointer in
|
||||
q.read_pointer <- (q.read_pointer + 1) mod (Array.length q.queue);
|
||||
q.capacity <- q.capacity + 1;
|
||||
Condition.signal q.nonfull;
|
||||
Mutex.unlock q.mtx;
|
||||
|
@ -59,11 +68,14 @@ let pop q =
|
|||
let peek q =
|
||||
Mutex.lock q.mtx;
|
||||
let result =
|
||||
if Queue.is_empty q.queue
|
||||
if _locked_empty q
|
||||
then None
|
||||
else (q.capacity <- q.capacity + 1;
|
||||
Condition.signal q.nonfull;
|
||||
Some (Queue.pop q.queue))
|
||||
else
|
||||
(let result = Array.get q.queue q.read_pointer in
|
||||
q.read_pointer <- (q.read_pointer + 1) mod (Array.length q.queue);
|
||||
q.capacity <- q.capacity + 1;
|
||||
Condition.signal q.nonfull;
|
||||
Some result)
|
||||
in
|
||||
Mutex.unlock q.mtx;
|
||||
result
|
||||
|
|
|
@ -22,9 +22,7 @@ type 'a t = {
|
|||
mutable capacity: int;
|
||||
nonfull: Condition.t;
|
||||
nonempty: Condition.t;
|
||||
queue: 'a array;
|
||||
mutable read_pointer: int;
|
||||
mutable write_pointer: int;
|
||||
queue: 'a Queue.t
|
||||
}
|
||||
|
||||
let create n = {
|
||||
|
@ -32,9 +30,7 @@ let create n = {
|
|||
capacity = n;
|
||||
nonfull = Condition.create ();
|
||||
nonempty = Condition.create ();
|
||||
queue = Array.make n (Obj.magic None);
|
||||
read_pointer = 0;
|
||||
write_pointer = 0
|
||||
queue = Queue.create ()
|
||||
}
|
||||
|
||||
let approx_capacity q = q.capacity
|
||||
|
@ -45,21 +41,16 @@ let add v q =
|
|||
Condition.wait q.nonfull q.mtx
|
||||
done;
|
||||
q.capacity <- q.capacity - 1;
|
||||
Array.set q.queue q.write_pointer v;
|
||||
q.write_pointer <- (q.write_pointer + 1) mod (Array.length q.queue);
|
||||
Queue.add v q.queue;
|
||||
Condition.signal q.nonempty;
|
||||
Mutex.unlock q.mtx
|
||||
|
||||
let _locked_empty q =
|
||||
q.capacity = (Array.length q.queue)
|
||||
|
||||
let pop q =
|
||||
Mutex.lock q.mtx;
|
||||
while _locked_empty q do
|
||||
while Queue.is_empty q.queue do
|
||||
Condition.wait q.nonempty q.mtx
|
||||
done;
|
||||
let result = Array.get q.queue q.read_pointer in
|
||||
q.read_pointer <- (q.read_pointer + 1) mod (Array.length q.queue);
|
||||
let result = Queue.pop q.queue in
|
||||
q.capacity <- q.capacity + 1;
|
||||
Condition.signal q.nonfull;
|
||||
Mutex.unlock q.mtx;
|
||||
|
@ -68,14 +59,11 @@ let pop q =
|
|||
let peek q =
|
||||
Mutex.lock q.mtx;
|
||||
let result =
|
||||
if _locked_empty q
|
||||
if Queue.is_empty q.queue
|
||||
then None
|
||||
else
|
||||
(let result = Array.get q.queue q.read_pointer in
|
||||
q.read_pointer <- (q.read_pointer + 1) mod (Array.length q.queue);
|
||||
q.capacity <- q.capacity + 1;
|
||||
Condition.signal q.nonfull;
|
||||
Some result)
|
||||
else (q.capacity <- q.capacity + 1;
|
||||
Condition.signal q.nonfull;
|
||||
Some (Queue.pop q.queue))
|
||||
in
|
||||
Mutex.unlock q.mtx;
|
||||
result
|
Loading…
Reference in New Issue