Wednesday, November 2, 2016

The Encapsulator Queue - A wait-free/lock-free queue for GC languages

Today we're going to talk about a new concurrent queue we call The Encapsulator.
Oooohhhhh you though we had done just the CRTurn queue and that was it? Nope, we have several different queues and the CRTurn is just one of them, and judging by the amount of queues we came up with, we'll probably spend the next months talking about concurrent queues  ;)

So what is The Encapsulator?
It's a Multi-Producer-Multi-Consumer queue (memory unbounded) with wait-free enqueue() and lock-free dequeue().
It was designed for languages with a GC (Java, Scala, D, etc) but it can be modified to have memory reclamation (hazard pointers) and thus be implemented in C11/C++1x.

Java source code for this queue can be obtained here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/EncapsulatorQueue.java


One of the ideas behind The Encapsulator is that there is an Encap object that encapsulates the item that is put in the queue.
A node doesn't contain the pointer to the item directly, it contains the pointer to an Encap instance which itself has a pointer to the item.
Code examples in Java:
static class Node<E> {
    Encap<E>[] encaps = null;
    volatile Node<E> next = null;

    public Node(Encap<E>[] reqs, int used) {
        encaps = new Encap[used];

        // Copy the array so we don't have to carry "length" around
        System.arraycopy(reqs, 0, encaps, 0, used);
    }
}

static class Encap<E> {
    volatile E item;
    public Encap() { }
    public Encap(E item) { this.item = item; }
}

When an item is to be dequeued, the dequeuer will attempt to do a CAS on the Encap.item from its current value to null, and if it succeeds, the item has been "dequeued". We could use an extra variable in the Encap object to mark the object as logically dequeued, but to reduce memory usage, we combine this logical dequeueing with the pointer to the item.

Another trick is that each node can have multiple pointers to Encap objects (up to MAX_THREADS). Some or all of these Encap objects may contain items that have been dequeued, but that's ok.
On each node, the dequeuer will scan the array of Encaps at node.encaps, starting from the first entry, looking for the first Encap with a non-null item:
public E dequeue(final int tid) {
    Node<E> lhead = head;
    Node<E> node = lhead;
    while (node != null) {
        for (int i = 0; node.encaps != null && i < node.encaps.length; i++) {
            final Encap<E> encap = node.encaps[i];
            final E item = encap.item;
            if (item == null) continue;
            if (encap.casItem(item, null)) {
                if (node != lhead) casHead(lhead, node);
                return item;
            }
        }
        node = node.next;
    }
    return null;
}


On the enqueuer side, we use the familiar approach of each enqueuer publishing a request to enqueue on an array called enqueuers[].
public EncapsulatorQueue(int maxThreads) {
    this.maxThreads = maxThreads;
    Node<E> sentinelNode = new Node<E>();
    head = sentinelNode;
    tail = sentinelNode;
    enqueuers = new AtomicReferenceArray<Encap<E>>(maxThreads);

}
Unlike the CRTurn that publishes the nodes to be inserted in the queue, for The Encapsulator we publish the items themselves. Notice that the items need not be unique:
public void enqueue(E item, final int tid) {
    if (item == null) throw new NullPointerException();
    final Encap<E> myEncap = new Encap<E>(item);
    enqueuers.set(tid, myEncap);  // Open request
    final Encap<E>[] lreqs = new Encap[maxThreads];
    for (int iter = 0; iter < 2; iter++) {
        Node<E> ltail = tail;
        if (ltail.next != null) { // Advance tail if needed
            casTail(ltail, ltail.next);
            ltail = tail;
            if (ltail.next != null) continue;
        }
        int numreqs = 0;
        for (int i = 0; i < maxThreads; i++) {
            final Encap<E> encap = enqueuers.get(i);
            if (encap == null) continue;
            lreqs[numreqs++] = encap;
        }
        if (ltail != tail || ltail.next != null) continue;
        if (ltail.casNext(null, new Node<E>(lreqs, numreqs))) {
            casTail(ltail, ltail.next);
            break;
        }
    }
    enqueuers.lazySet(tid, null);
}


When an enqueuer wants to enqueue a new item, after publishing it in its dedicated entry in enqueuers[tid], it will check if a pending tail advance is needed and if it is, advance the tail. This will be done at most two times because if the tail changes twice, the enqueuer is certain that its item has been enqueued and there is no more work to be done except setting the entry back to null.
This trick of publishing in an array and then doing a loop at most two times was first shown by Maurice Herlihy in his Universal Wait-Free Construct, and since then, it has been used by others, like Panagiota Fatourou and Nikolaos Kallimanis in their wait-free Sim queue (based on P-Sim):
http://www.hpl.hp.com/techreports/Compaq-DEC/CRL-91-10.pdf
http://thalis.cs.uoi.gr/tech_reports/publications/TR2011-01.pdf

When there are multiple enqueuers attempting to enqueue, we can aggregate all their Encap objects in one node, with a pointer to each Encap in an entry of the Node.items[] array. This means we use just one node to store multiple items.
public void enqueue(E item, final int tid) {
    if (item == null) throw new NullPointerException();
    final Encap<E> myEncap = new Encap<E>(item);
    enqueuers.set(tid, myEncap);  // Open request
    final Encap<E>[] lreqs = new Encap[maxThreads];
    for (int iter = 0; iter < 2; iter++) {
        Node<E> ltail = tail;
        if (ltail.next != null) { // Advance tail if needed
            casTail(ltail, ltail.next);
            ltail = tail;
            if (ltail.next != null) continue;
        }
        int numreqs = 0;
        for (int i = 0; i < maxThreads; i++) {
            final Encap<E> encap = enqueuers.get(i);
            if (encap == null) continue;
            lreqs[numreqs++] = encap;
        }
        if (ltail != tail || ltail.next != null) continue;
        if (ltail.casNext(null, new Node<E>(lreqs, numreqs))) {
            casTail(ltail, ltail.next);
            break;
        }
    }
    enqueuers.lazySet(tid, null);
}

This idea isn't completely new, with at least two other queues having multiple items per node (one by Adam Morrison and Yehuda Afek, and the other by Chaoran Yang and John Mellor-Crummey):
http://www.cs.tau.ac.il/~mad/publications/ppopp2013-x86queues.pdf
http://chaoran.me/assets/pdf/wfq-ppopp16.pdf
The difference to The Encapsulator is that these two other queues have a fixed size array with up to thousands or millions of items in the array, most of them unused, while The Encapsulator has a variable size array (of Encap objects) and is at most MAX_THREADS, with all entries being used.
With our approach there is very little wastage of memory. The worst it can get is that there were MAX_THREADS doing an enqueue and now they've all been dequeued, but we have to keep the node because it is the last node in the queue so we need it as a "sentinel" to be able to enqueue new nodes after it. This situation is unlikely because it means there was a burst of MAX_THREADS enqueuers and then no further enqueues.
Even so, having an array of size MAX_THREADS is typically small compared to the thousands or millions required by the other two queues mentioned above.
Another difference is that both those queues rely on using Fetch-And-Add (FAA) to be wait-free, or getAndAdd() as it's called in the Java world, while The Encapsulator uses just CAS, no FAA is needed to be wait-free.

The next step is to create the node with an array of the needed size, and insert it in the tail, advancing afterwards:
public void enqueue(E item, final int tid) {
    if (item == null) throw new NullPointerException();
    final Encap<E> myEncap = new Encap<E>(item);
    enqueuers.set(tid, myEncap);  // Open request
    final Encap<E>[] lreqs = new Encap[maxThreads];
    for (int iter = 0; iter < 2; iter++) {
        Node<E> ltail = tail;
        if (ltail.next != null) { // Advance tail if needed
            casTail(ltail, ltail.next);
            ltail = tail;
            if (ltail.next != null) continue;
        }
        int numreqs = 0;
        for (int i = 0; i < maxThreads; i++) {
            final Encap<E> encap = enqueuers.get(i);
            if (encap == null) continue;
            lreqs[numreqs++] = encap;
        }
        if (ltail != tail || ltail.next != null) continue;
        if (ltail.casNext(null, new Node<E>(lreqs, numreqs))) {
            casTail(ltail, ltail.next);
            break;
        }
    }
    enqueuers.lazySet(tid, null);
}

This approach is based on the lock-free queue by Maged Michael and Michael Scott, so nothing new here as well:
https://www.research.ibm.com/people/m/michael/podc-1996.pdf


On the next post we'll talk about how to limit the number of times the same Encap object can be inserted in the queue, which is not needed for correctness but can come in handy for situations with over-subscription.

3 comments:

  1. Sorry, for question answer on which may be obvious, but still.

    My question is about enqueue method.
    Let suppose that first thread processed request of enqueing of item and added new node to the queue. So effectively the first thread now at enqueuers.lazySet(tid, null); method. The second thread executes its own request to enque the item. But request of first thread is still open does not mean that item from first thread will be enqued twice ?

    ReplyDelete
    Replies
    1. Hi Andrey,
      The Encap object can be put in the list twice (or more times), but the "item" can be dequeued just once. This is the main trick of the Encapsulator: to allow an Encap object to be "enqueued" multiple times, but the item is not repeated by the dequeuer, thus providing linearizability.

      I'll explain with more details and schematics on the next post, so stay tuned.
      Thanks for your question!

      Delete
  2. Hi Pedro,
    Got it . The rest is clear for me . Thank you for explanation and looking forward for your next posts.

    ReplyDelete