Wednesday, November 16, 2016

LinearArrayQueue - MPMC lock-free queue (part 1 of 4)

As a by-product of our research we discovered what (we believe to be) a new linked-list-of-arrays-based queue.
We have four different lock-free queues based on this idea, each with a slightly different twist to it, and slightly different properties, but all four are linearizable Multi-Producer-Multi-Consumer lock-free for enqueueing and dequeueing memory-unbounded queues. If you've seen this before in a paper/post/code, please put a link in the comments section so we can give due credit to the original authors (in case it's not us).

The first of the four is named LinearArrayQueue and you can get the source code in Java here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/array/LinearArrayQueue.java
and the source code in C++ here (with hazard pointers for memory reclamation):
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/array/LinearArrayQueue.hpp

The idea is simple, start from the Michael-Scott queue but instead of having one item per node, have an array of fixed size (BUFFER_SIZE) on each node (example code in C++):
    struct Node {
        std::atomic<T*>    items[BUFFER_SIZE];
        std::atomic<Node*> next;

        Node(T* item) : next{nullptr} {
            items[0].store(item, std::memory_order_relaxed);
            for (long i = 1; i < BUFFER_SIZE; i++) {
                items[i].store(nullptr, std::memory_order_relaxed);
            }
        }
    };

   
Each entry in the array may contain one of three possible values:
- A valid item that has been enqueued;
- nullptr, which means no item has yet been enqueued in that position;
- taken, a special value that means there was an item but it has been dequeued;   

    T* taken = (T*)new int();

An enqueuer will start from the entry zero of the array, looking for the first available entry with nullptr to add his item. The enqueuers compete with each other using a CAS to see who gets that particular entry, and try the next available entry in case of CAS failure.
    void enqueue(T* item, const int tid) {
        if (item == nullptr) throw std::invalid_argument("item can not be nullptr");
        while (true) {
            Node* ltail = hp.protect(kHpTail, tail, tid);
            if (ltail->items[BUFFER_SIZE-1].load() != nullptr) { // This node is full
                if (ltail != tail.load()) continue;
                Node* lnext = ltail->next.load();
                if (lnext == nullptr) {
                    Node* newNode = new Node(item);
                    if (ltail->casNext(nullptr, newNode)) {
                        casTail(ltail, newNode);
                        hp.clear(tid);
                        return;
                    }
                    delete newNode;
                } else {
                    casTail(ltail, lnext);
                }
                continue;
            }
            // Find the first null entry in items[] and try to CAS from null to item
            for (long i = 0; i < BUFFER_SIZE; i++) {
                if (ltail->items[i].load() != nullptr) continue;
                T* itemnull = nullptr;
                if (ltail->items[i].compare_exchange_strong(itemnull, item)) {
                    hp.clear(tid);
                    return;
                }
                if (ltail != tail.load()) break;
            }
        }
    }

   
If the last entry on the array is non-null (there is already an item there) then the node is "full" and a new node must be created and added to the list, and the tail advanced. This procedure is done using the Michael-Scott algorithm for enqueueing:
    void enqueue(T* item, const int tid) {
        if (item == nullptr) throw std::invalid_argument("item can not be nullptr");
        while (true) {
            Node* ltail = hp.protect(kHpTail, tail, tid);
            if (ltail->items[BUFFER_SIZE-1].load() != nullptr) { // This node is full
                if (ltail != tail.load()) continue;
                Node* lnext = ltail->next.load();
                if (lnext == nullptr) {
                    Node* newNode = new Node(item);
                    if (ltail->casNext(nullptr, newNode)) {
                        casTail(ltail, newNode);
                        hp.clear(tid);
                        return;
                    }
                    delete newNode;
                } else {
                    casTail(ltail, lnext);
                }
                continue;
            }
            // Find the first null entry in items[] and try to CAS from null to item
            for (long i = 0; i < BUFFER_SIZE; i++) {
                if (ltail->items[i].load() != nullptr) continue;
                T* itemnull = nullptr;
                if (ltail->items[i].compare_exchange_strong(itemnull, item)) {
                    hp.clear(tid);
                    return;
                }
                if (ltail != tail.load()) break;
            }
        }
    }


   
For the dequeues, we start from entry zero and search for the first non-taken entry in the array, and try to CAS from the current item to the special token "taken". If successful, the item has been dequeued, otherwise, try the next entry in the array that is not nullptr and not taken.
    T* dequeue(const int tid) {
        while (true) {
            Node* lhead = hp.protect(kHpHead, head, tid);
            if (lhead->items[BUFFER_SIZE-1].load() == taken) { // This node has been drained, check if there is another one
                Node* lnext = lhead->next.load();
                if (lnext == nullptr) { // No more nodes in the queue
                    hp.clear(tid);
                    return nullptr;
                }
                if (casHead(lhead, lnext)) hp.retire(lhead, tid);
                continue;
            }
            // Find the first non taken entry in items[] and try to CAS from item to taken
            for (long i = 0; i < BUFFER_SIZE; i++) {
                T* item = lhead->items[i].load();
                if (item == nullptr) {
                    hp.clear(tid);
                    return nullptr;            // This node is empty
                }
                if (item == taken) continue;
                if (lhead->items[i].compare_exchange_strong(item, taken)) {
                    hp.clear(tid);
                    return item;
                }
                if (lhead != head.load()) break;
            }
        }
    }

   
If nullptr is found, it means that there are no more items in the queue and the queue is empty
    T* dequeue(const int tid) {
        while (true) {
            Node* lhead = hp.protect(kHpHead, head, tid);
            if (lhead->items[BUFFER_SIZE-1].load() == taken) { // This node has been drained, check if there is another one
                Node* lnext = lhead->next.load();
                if (lnext == nullptr) { // No more nodes in the queue
                    hp.clear(tid);
                    return nullptr;
                }
                if (casHead(lhead, lnext)) hp.retire(lhead, tid);
                continue;
            }
            // Find the first non taken entry in items[] and try to CAS from item to taken
            for (long i = 0; i < BUFFER_SIZE; i++) {
                T* item = lhead->items[i].load();
                if (item == nullptr) {
                    hp.clear(tid);
                    return nullptr;            // This node is empty
                }
                if (item == taken) continue;
                if (lhead->items[i].compare_exchange_strong(item, taken)) {
                    hp.clear(tid);
                    return item;
                }
                if (lhead != head.load()) break;
            }
        }
    }


When the last entry in the array is seen to be already at "taken", it means all other entries are also "taken", which means that the node has been drained of all its items. The dequeuer must now move to the next node (if it exists) and advance the head appropriately.
    T* dequeue(const int tid) {
        while (true) {
            Node* lhead = hp.protect(kHpHead, head, tid);
            if (lhead->items[BUFFER_SIZE-1].load() == taken) { // This node has been drained, check if there is another one
                Node* lnext = lhead->next.load();
                if (lnext == nullptr) { // No more nodes in the queue
                    hp.clear(tid);
                    return nullptr;
                }
                if (casHead(lhead, lnext)) hp.retire(lhead, tid);
                continue;
            }
            // Find the first non taken entry in items[] and try to CAS from item to taken
            for (long i = 0; i < BUFFER_SIZE; i++) {
                T* item = lhead->items[i].load();
                if (item == nullptr) {
                    hp.clear(tid);
                    return nullptr;            // This node is empty
                }
                if (item == taken) continue;
                if (lhead->items[i].compare_exchange_strong(item, taken)) {
                    hp.clear(tid);
                    return item;
                }
                if (lhead != head.load()) break;
            }
        }
    }

   
That's it, nothing special about it.

The main disadvantage of this technique is that each enqueuer/dequeuer has to start from the beginning of the array even if the array is almost full. Traversing an array is fast but not if the array is huge, so the smaller the array the smaller the penalty for starting from the beginning.
On the other hand, the larger the array, the less overhead in creating a new node (with a new array) and doing a CAS on the Node.next and another CAS to advance the tail, and subsequently to advance the head.
The sweet-spot depends on the contention and the number of cores available on your system, but an array with a BUFFER_SIZE of 32 seems to reap most of the benefits without too much of a cost and without wasting too much memory when the queue is empty.

When uncontended, the enqueuer will need just one CAS to enqueue an item, and the dequeuer will need one CAS to dequeue it. If a new node needs to be created then two extra CAS will be performed for the enqueue, but these happen with a probability of 1/BUFFER_SIZE.
The Michael-Scott queue already does 1 CAS per dequeue, but it has higher contention because the CAS is always done on the head variable. The advantage of the LinearArrayQueue comes from the single CAS per enqueue (on average) which the Michael-Scott needs two CAS.





Here are some microbenchmarks made in C++, comparing the Michael-Scott lock-free queue with the LinearArrayQueue using different array sizes:








If you want an explanation of each plot, just checkout our paper on the CRTurnQueue or the previous post about the Encapsulator queue:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/crturnqueue-2016.pdf
http://concurrencyfreaks.blogspot.com/2016/11/the-encapsulator-queue-part-3.html


On the next post we're going to see a slightly better way of doing this when the array size is large.

4 comments:

  1. In the Java version, how about extending AtomicReferenceArray of Object in Node and use +1 slot for the next pointer? It certainly needs more casting but maybe that gets optimized away.

    Might be worth pulling "node.items" out into a local variable in dequeue() to avoid re-reading "items" due to the atomics.

    Self-linking is commented out but wouldn't that disrupt dequeuers that are in the same progress of walking the nodes (they'd run in circles)?

    ReplyDelete
    Replies
    1. Hi David,

      1) Dude!!! You're jumping ahead by two posts :)
      Yes we can have the next pointer in the node, but we have to be careful so there is no contention on it. The best we could come up with will be shown in two posts, but here is a preview:
      https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/array/LazyIndexArrayQueue.java#L75
      ... not sure this was what you were talking about.

      2) node.items is final, so I was hoping the compiler/JIT would do it for me. I'll have to try to see if it makes a difference.

      3) The trick with self-linking is the same as in Michael-Scott (ConcurrentLinkedQueue), only the nodes previous to "head" will be self-linked (by definition) and when a dequeuer (or enqueuer) finds a self-linked node, it must re-read head (or tail respectively).

      Thanks for the comments!

      Delete
    2. 1) If the whole Node is an AtomicReferenceArray (or in fact an Object[] + Unsafe) you can pad "next" as far as you wish from the normal elements. Of course, this doesn't work if you need non-object fields such as the enq/deq in the LazyIndexArrayQueue you linked.

      2) I usually don't take the risk and read such fields out as early as possible into local variables. I think only a few people know really what's happening under the hood otherwise (Shipilev).

      3) Sure, I didn't see any "if (node.next == node) { node = head; continue; }" in the code.

      Delete
    3. 1) Ahhhhh cool trick of having the next in one of the entries of the AtomicReferenceArray, I'll have to try that.
      Thanks!

      Delete