Sunday, November 27, 2016

FAAArrayQueue - MPMC lock-free queue (part 4 of 4)

On this post we're going to show what is currently the fastest lock-free queue on the planet, or to be more precise, the fastest portable linearizable MPMC memory-unbounded lock-free queue. This is the fourth and last of the four linked-list-of-arrays-based queues with lock-free enqueues and dequeues.

If you want to jump directly to the code, you can get it on github in C++ (with memory reclamation):
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/array/FAAArrayQueue.hpp
or in Java:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/array/FAAArrayQueue.java


Benchmarks

Saying that FAAArrayQueue is the fastest lock-free queue in the planet is a bold claim, but from what everybody tells us, the fastest lock-free queue is the LCRQ made by Adam Morrison and Yehuda Afek in 2013 http://www.cs.tau.ac.il/~mad/publications/ppopp2013-x86queues.pdf
and as it so happens, the FAAArrayQueue is slightly slower and sometimes slightly faster than LCRQ, as you can see on the benchmarks below, all in C++, and all with arrays of 1024 entries per node (except Michael-Scott which has a single item per node):








The reason why FAAArray is a bit below LCRQ in the Enq-Deq tests is because of cache locality: in a single-enqueue-single-dequeue benchmark the LCRQ will keep re-using the same array, which means better cache locality, no new nodes created, and therefore, better throughput.
We're using an array of 1024 entries per node, but if we use a larger array then this difference fades away, but I won't do that because I think it's silly to have a linked-list based queue where each node has a large array (who uses that in practice?).
On the burst benchmark, the bursts are much larger than 1024, so it forces LCRQ to create new nodes, which means that it won't be able to re-use the array, and in that scenario, the FAAArrayQueue is equally good for dequeues and even better for enqueues.

LCRQ is non-portable because it requires a double-word compare-and-swap (CAS2), which means it can only be used in x86 and only for native code (no Java, sorry). FAAArrayQueue is capable of getting close to the performance of LCRQ and it is portable because it needs just regular CAS, i.e. you can implement it in Java, or using C++ for any architecture by using std::atomics<>.


How does FAAArrayQueue works?

It has some similarities with LazyIndexArrayQueue. For example, there is an enqidx and deqidx in each node, but they have different purposes: to provide a unique entry in the items array through the usage of a fetch-and-add (FAA) instruction (example code in C++).
    struct Node {
        std::atomic<int>   deqidx;
        std::atomic<T*>    items[BUFFER_SIZE];
        std::atomic<int>   enqidx;
        std::atomic<Node*> next;

        // Start with the first entry pre-filled and enqidx at 1
        Node(T* item) : deqidx{0}, enqidx{1}, next{nullptr} {
            items[0].store(item, std::memory_order_relaxed);
            for (long i = 1; i < BUFFER_SIZE; i++) {
                items[i].store(nullptr, std::memory_order_relaxed);
            }
        }
    };


This idea has been used before, for example by LCRQ itself, or by Yang and Mellor-Crummey (YMC queue) in "A Wait-Free Queue as Fast as Fetch-And-Add" http://chaoran.me/assets/pdf/wfq-ppopp16.pdf
The YMC uses this FAA approach to achieve high throughput and wait-free (unbounded) progress, and they even have the basic idea described in Listing 1 of their paper, but it's an obstruction free queue, while FAAArrayQueue is lock-free.

On the LazyIndexArrayQueue, the enqueue would start by looking for an empty entry, and then if it would do a sucessful CAS, it would update the enqidx. On FAAArrayQueue the logic is inverted: first we do FAA in enqidx to obtain the entry of the array, and then we do CAS on that particular entry:
    void enqueue(T* item, const int tid) {
        if (item == nullptr) throw std::invalid_argument("item can not be nullptr");
        while (true) {
            Node* ltail = hp.protect(kHpTail, tail, tid);
            const int idx = ltail->enqidx.fetch_add(1);
            if (idx > BUFFER_SIZE-1) { // This node is full
                if (ltail != tail.load()) continue;
                Node* lnext = ltail->next.load();
                if (lnext == nullptr) {
                    Node* newNode = new Node(item);
                    if (ltail->casNext(nullptr, newNode)) {
                        casTail(ltail, newNode);
                        hp.clear(tid);
                        return;
                    }
                    delete newNode;
                } else {
                    casTail(ltail, lnext);
                }
                continue;
            }
            T* itemnull = nullptr;
            if (ltail->items[idx].compare_exchange_strong(itemnull, item)) {
                hp.clear(tid);
                return;
            }
        }
    }


The whole node logic of inserting a new node and advancing tail and head is still the classical Michael-Scott algorithm, just like on the other 3 array-based queues we presented, and just like on LCRQ, or on YMC. Nothing new here.

For the dequeue, again the logic is inverted from the LazyIndexArrayQueue: first we do the FAA to get the index of the entry to dequeue, and then we do the CAS to obtain the item, or on this particular implementation, we do an atomic_exchange because it is supposed to be slightly faster on x86:
    T* dequeue(const int tid) {
        while (true) {
            Node* lhead = hp.protect(kHpHead, head, tid);
            if (lhead->deqidx.load() >= lhead->enqidx.load() && 

                lhead->next.load() == nullptr) break;
            const int idx = lhead->deqidx.fetch_add(1);
            if (idx > BUFFER_SIZE-1) { // Node has been drained                

                Node* lnext = lhead->next.load();
                if (lnext == nullptr) break;  // No more nodes in the queue
                if (casHead(lhead, lnext)) hp.retire(lhead, tid);
                continue;
            }
            T* item = lhead->items[idx].exchange(taken);
            if (item == nullptr) continue;
            hp.clear(tid);
            return item;
        }
        hp.clear(tid);
        return nullptr;
    }

   
The uncontended case does 1 FAA plus 1 CAS plus 1 hazard pointer (seq-cst store) for the enqueue, and a similar number of operations for the dequeue.
In x86, the FAA is a fast operation even under contention (as explained in the LCRQ paper), and both the CAS and the hazard pointer store are done with no contention, just like on LCRQ, hence the high throughput.

   
   
The comparison between FAAArrayQueue and LCRQ is not completely fair because they are slightly different beasts:
1. LCRQ is capable of re-using the entries in the items array and will not allocate a new node (with a new items array) unless the current one is already full. FAAArrayQueue is incapable of re-using entries in the items array. This is a clear advantage to LCRQ because it allows the re-usage of the same array (good cache-locality) so long as the queue is not too full. However, if memory usage is a concern for you, then keep in mind that the array of items in the LCRQ uses 128 bytes per entry, while on FAAArrayQueue uses just 8 bytes per entry (on a 64 bit machine).

2. LCRQ requires a double-word Compare-And-Swap (CAS2) instruction, which only exists for x86 and it is not part of any language's memory model or atomics API (that I'm aware of). Any LCRQ implementation is always x86 specific and must be done in a native language (i.e. C/C++/D are ok, Java is a no-no). FAAArrayQueue uses regular CAS which means it can be implemented in any CPU architecture (x86, powerpc, arm, etc) and any language with a memory model and atomics (C11,C++1x,D,Java,etc). Here the advantage is clearly on FAAArrayQueue's side.


If you want a high throughput, memory unbounded, MPMC, linearizable, portable lock-free queue, you'll have a hard time finding something better than FAAArrayQueue.

12 comments:

  1. If enqIdx.getAndIncrement is within the bounds, why do you CAS the value instead of lazy setting it? I'd guess there is no longer a race on that array entry since the index is unique to the current thread.

    Although it doesn't implement the Queue interface, there could be holes in the array and dequeue may return null indicating an empty queue even though the queue is not empty. (It's a similar case to the weak polls in the JCTools queues.)

    I'm not sure about this but let's consider the case where there are 1 enqueue and 1 dequeue threads for simplicity. The enqueue thread picks up slot 0 but stalls before the value CAS. The dequeue thread picks slot 0, swaps in the "taken" token, it finds a null value, loops back and due to the deqIdx >= enqIdx without next, it returns null. Now the enqueue thread resumes and fails to CAS in the value (due to finding "taken" instead of null). This could lead to enqueue chasing dequeue without actually delivering the value over to the other side.

    ReplyDelete
    Replies
    1. 1) FAA is faster than CAS, that's why it's preferable to use it. Go and take a look at the LCRQ paper:
      http://www.cs.tau.ac.il/~mad/publications/ppopp2013-x86queues.pdf
      or this recent ACM Queue article (easier to read) also from Adam Morrison:
      http://queue.acm.org/detail.cfm?id=2991130

      2) Agreed! It is possible and it does not implement the queue interface.

      3) Dude, again you're one post ahead of me ;)
      I have a post dedicated to explain the "bump out" effect you described, but the summary is that yes it is possible,
      and it happens somewhat when there are more/faster dequeuers than enqueuers, but even then, this algorithm is still lock-free because with an array of size 1024, after 1024 failed attempts to enqueue the next node will have the first entry of the array pre-filled with the item to enqueue.
      This is the same reason why LCRQ is lock-free.

      Delete
    2. This comment has been removed by the author.

      Delete
  2. Please provide link to benchmark sources. I want to reproduce it.

    ReplyDelete
    Replies
    1. Hello,
      If I remember correctly, we used the same benchmarks as on the Turn Queue, which are available here
      https://github.com/pramalhe/ConcurrencyFreaks/tree/master/CPP/papers/crturnqueue

      The burst benchmarks are these:
      https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/papers/crturnqueue/include/BenchmarkQ.hpp#L177
      and they are described in the Turn queue paper:
      https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/crturnqueue-2016.pdf

      Delete
  3. Thanks for a wonderful post! I just wonder if why do you think the YMC queue is just obstruction-free. YMC claim in the title of the paper that their queue is wait-free.

    ReplyDelete
    Replies
    1. Hi Jeehoon,
      Thanks :)

      In the YMC paper they have two queues: one queue is obstruction free, the other queue is wait-free unbounded.
      The FAA ArrayQueue has some similarities to the obstruction-free queue shown in the YMC paper.

      As it so happens there is no known way (and I believe there never will be) to provide wait-free unbounded or bounded memory reclamation for that particular algorithm that is used in the wait-free unbounded queue of YMC, therefore, it is not possible to use it in production with wait-free properties, because in production we always need wait-free memory reclamation (otherwise we could go for simply lock-free or blocking).

      Delete
  4. Pedro, thanks for the article and the queue code. My benchmarks (incomplete) show that it has great performance.
    I did find a race condition in my rendering of the code (another pusher thread sneaked in during the creation of the new node and the item added was out of order), and was able to fix it.

    I would replace lines 155-160:
    Node* newNode = new Node(item);
    if (ltail->casNext(nullptr, newNode)) {
    casTail(ltail, newNode);
    hp.clear(tid);
    return;
    }

    with:
    Node* newNode = new Node();
    if (ltail->casNext(nullptr, newNode)) {
    continue;
    }

    ReplyDelete
  5. I've just created a wait-free mpmc in 100+ lines of C++11 code: https://github.com/MengRao/WFMPMC

    Can you help check?

    Thanks,
    Meng

    ReplyDelete
    Replies
    1. Hi Meng,
      This queue is indeed very fast but it is not Wait-Free. Even though the method tryEmplace() is wait-free, this method may not do anything and the definition of wait-free must guarantee that work is done in a finite number of steps. "Doing nothing" is not wait-free, even if the "nothing" is done in a finite number of steps.

      One suggestion, for the write_idx and read_idx you may want to try alignas(128) instead of alignas(64) because on x86 the prefetcher gets two consecutive cache lines, which means you may have (some minor) false sharing even when writing in the adjacent cache line. https://github.com/MengRao/WFMPMC/blob/master/WFMPMC.h#L200

      Delete
  6. This comment has been removed by a blog administrator.

    ReplyDelete