Today's variant is named LazyIndexArrayQueue and you can get the Java source code here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/array/LazyIndexArrayQueue.java
and the C++ source code here (with memory reclamation):
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/array/LazyIndexArrayQueue.hpp
The LazyIndexArrayQueue is very similar to LinearArrayQueue. The main difference lies in having one enqidx and one deqidx in each node:
struct Node {
std::atomic<int> deqidx;
std::atomic<T*> items[BUFFER_SIZE];
std::atomic<int> enqidx;
std::atomic<Node*> next;
Node(T* item) : deqidx{0}, enqidx{0}, next{nullptr} {
items[0].store(item, std::memory_order_relaxed);
for (long i = 1; i < BUFFER_SIZE; i++) {
items[i].store(nullptr, std::memory_order_relaxed);
}
}
};
The enqidx variable stores the index of the next available index in the array of items to enqueue at. This index may be behind the "real" index of the next available entry in the array, but it will never be ahead of it.
An enqueuer reads the current index and does a linear search starting from this value:
void enqueue(T* item, const int tid) {
if (item == nullptr) throw std::invalid_argument("item can not be nullptr");
while (true) {
Node* ltail = hp.protect(kHpTail, tail, tid);
if (ltail->items[BUFFER_SIZE-1].load() != nullptr) { // This node is full
if (ltail != tail.load()) continue;
Node* lnext = ltail->next.load();
if (lnext == nullptr) {
Node* newNode = new Node(item);
if (ltail->casNext(nullptr, newNode)) {
casTail(ltail, newNode);
hp.clear(tid);
return;
}
delete newNode;
} else {
casTail(ltail, lnext);
}
continue;
}
for (long i = ltail->enqidx.load(); i < BUFFER_SIZE; i++) {
if (ltail->items[i].load() != nullptr) continue;
T* itemnull = nullptr;
if (ltail->items[i].compare_exchange_strong(itemnull, item)) {
ltail->enqidx.store(i+1, std::memory_order_release);
hp.clear(tid);
return;
}
if (ltail != tail.load()) break;
}
}
}
Similarly to the enqueue case, the deqidx variable stores the index of the next non-taken entry in the array of items. This index may be behind the "real" index of the next item in the array, but it will never be ahead of it.
An dequeuer reads the current index and does a linear search starting from this value:
T* dequeue(const int tid) {
while (true) {
Node* lhead = hp.protect(kHpHead, head, tid);
if (lhead->items[BUFFER_SIZE-1].load() == taken) { // This node has been drained, check if there is another one
Node* lnext = lhead->next.load();
if (lnext == nullptr) { // No more nodes in the queue
hp.clear(tid);
return nullptr;
}
if (casHead(lhead, lnext)) hp.retire(lhead, tid);
continue;
}
for (long i = lhead->deqidx.load(); i < BUFFER_SIZE; i++) {
T* item = lhead->items[i].load();
if (item == nullptr) {
hp.clear(tid);
return nullptr; // This node is empty
}
if (item == taken) continue;
if (lhead->items[i].compare_exchange_strong(item, taken)) {
lhead->deqidx.store(i+1, std::memory_order_release);
hp.clear(tid);
return item;
}
if (lhead != head.load()) break;
}
}
}
This is incredibly simple, and yet amazingly fast. Just take a look at the benchmarks to see how effective this approach can be:
Unlike the previous two approaches that go up to 3x the Michael-Scott queue, the LazyIndexArrayQueue can go up to 5x ... not bad.
At first thought, we may be led to think that this lazy index approach will be bad in scenarios with oversubscription, but it's not.
It can happen that a thread goes to sleep after doing its enqueue/dequeue operation but before updating the enqidx/deqidx, which will cause the enqidx/deqidx to go back when it does get updated (i.e. point to an entry in the beginning of the items array), which will incur a higher cost in the next round of threads to find the next available entry. This cost is payed only by the next enqueuer/dequeuer (or batch of enqueuers/dequeues) which will then update the enqidx/deqidx. More probable, by the time the delayed thread wakes up and moves the enqidx/deqidx backwards, that node is full/empty and the tail/head is already pointing to a node further down the list of nodes, and therefore have no impact whatsoever on the node that is currently being enqueued/dequeued on.
Also, the contention on the enqidx and deqidx is not problematic, most likely because we're doing just an atomic store, and anyways the bottleneck is usually on the CAS operations in the items array.
On the next post we will complete the series with the fourth and fastest of these queues, in fact, the lock-free queue on the next post is (currently) the fastest lock-free linearizable MPMC memory-unbounded queue on the planet :)
 
No comments:
Post a Comment