Saturday, December 24, 2016

Sim Queue in Java (part 1/2)

It's Christmas, time for gifts and sharing, so I wanted to share some really cool queues we've been working on, but their papers aren't ready yet, so it will be delayed for some more posts. Instead I'm going to talk about SimQueue, a Multi-Producer-Multi-Consumer Wait-Free queue.

SimQueue was published in 2011 and was invented by Panagiota Fatourou and Nikolaos Kallimanis, and the academic paper is here:
http://thalis.cs.uoi.gr/tech_reports/publications/TR2011-01.pdf
Jump directly to page 16 and skip the universal wait-free construct, Algorithms 4, 5, 6 are the interesting ones.

Outside of academics, very few people have heard of SimQueue, and it's unfortunate because this is the highest scaling MPMC wait-free queue currently known. Yes, it actually has better scalability than our Turn queue and has better throughput that any of the lock-free single-item-per-node queues.
Ohhhh yes, it's a wait-free queue that is better than the lock-free queues, you heard me right!


If it's so good, how come nobody is using it?

For several reasons:

First, the academic paper is hard to read, even for those fluent in "academese". It gets mixed with the universal wait-free construct in the same paper that has the name PSim, and although it is certainly possible to wrap a single-threaded queue in PSim to obtain a wait-free queue, such an approach would not be very good, certainly not as good as SimQueue.
SimQueue is a wait-free queue that uses some of the same tricks as PSim, but not all.
Don't confuse PSim and SimQueue, they're in the same paper, they share some of the same tricks, but they're different things.

Second, there is an implementation of SimQueue in C99 by the authors available on github but it's not pretty (C99 code rarely is), and it has several bugs. We pointed them out to the authors who have fixed one of them, but not all... the others we fixed ourselves in our implementation which you can get at the end of this post. Feel free to look at their implementation, at least it's available and compiles which is more than what most authors do, but be aware it has bugs:
https://github.com/nkallima/sim-universal-construction/blob/master/sim-synch1.4/simqueue.c

Third, there is no memory reclamation to SimQueue and it's not easy to add it in C/C++... well, at least it's not easy to add and keep the wait-free progress for the dequeue() method.
Without memory reclamation there is no purpose to this queue, except academic, but even there I think most academics shy away from it.
Simplicity is really important when it comes to this stuff, that's why Michael-Scott's queue is so pervasive, because it's simple to understand and simple to explain, it's simple to use in an academic paper/benchmarks. Go and try to understand SimQueue and then come back to see how successful that was.


If it's that complicated, why spend the time to learn how it works?

Well, because it's awesome!
Yeah that's right, it's awesome, and not just for one single reason, no, it has several cool tricks that make it awesome, tricks that may be used in other stuff and that are vital to understanding lock-free and wait-free algorithms.
So let's dive into SimQueue and try to simplify it as much as possible.
Before we start, here are the differences in our implementation in Java from the original code:
  •  We don't use Fetch-And-Add (FAA), but we could if we wanted to. Instead we use an array of integers (could be bools);
  •  There is no pointer_t or pools, we just allocate what we need on the fly and let the GC handle the trash;
  •  There are no HalfEnq or HalfDeq types and corresponding objects;
  •  Our dequeue() uses the CRTurn consensus to be wait-free. The original SimQueue starts from zero which is a bit unfair, but still wait-free.
  •  - We don't use a pool of nodes. If the CAS on EnqState fails we just let the GC clean the sub-list and we make new nodes;


Enqueue():
Before we start explaining the enqueue() method we need to define an EnqState object, which is used to have a coherent snapshort of the enqueuers state: 

// All the members in this class and their contents are immutable (afer visible to other threads)
static class EnqState<E> {
    final Node<E> tail;       // The current tail
    final Node<E> nextNode;   // The next node to add to the tail (beginning of sublist)
    final Node<E> nextTail;   // The future tail, once tail.next becomes nextNode (end of sublist)
    final int[] applied;      // Enqueue requests to match enqueuers[]
}

It uses the immutable after visible trick that Copy-On-Write based algorithm do. Yes, it can be wasteful, but the GC will take care of it, and on the original algorithm we could re-use instances. We're not going to re-use instances because then we have to deal with ABA problems and the algorithm just gets too complicated (but faster).
The queue class itself has an array of enqueuers (seq-cst atomics), and array of items to be inserted, and a pointer to the current EnqState (seq-cst atomic).
    // Used by enqueuers
    private final AtomicIntegerArray enqueuers;
    private final E[] items;
    @sun.misc.Contended
    private volatile EnqState<E> enqState;
Here's what the enqueue() looks like:     
public void enqueue(E item, final int tid) {
    if (item == null) throw new NullPointerException();
    // Publish enqueue request
    items[tid] = item;
    final int newrequest = (enqState.applied[tid]+1)%2;
    enqueuers.set(tid, newrequest);       
    for (int iter = 0; iter < 3; iter++) {
        final EnqState<E> lstate = enqState;
        // Advance the tail if needed
        if (lstate.tail.next != lstate.nextNode) lstate.tail.next = lstate.nextNode;
        // Check if my request has been done
        if (lstate.applied[tid] == newrequest) return;
        // Help other requests, starting from zero
        Node<E> first = null, node = null;
        int[] applied = lstate.applied.clone();
        for (int i = 0; i < maxThreads; i++) {
            // Check if it is an open request
            if (enqueuers.get(i) == applied[i]) continue;
            applied[i] = (applied[i]+1) % 2;
            Node<E> prev = node;
            node = new Node<E>(items[i]);
            if (first == null) {
                first = node;
            } else {
                prev.relaxedStoreNext(node);  // We don't want the volatile store here
            }
            if (lstate != enqState) break;
        }
        // Try to apply the new sublist
       if (lstate == enqState) casEnqState(lstate, new EnqState<E>(lstate.nextTail, first, node, applied));
    }
}

We start by publishing the item we want to enqueue and then opening an enqueue request by toggling the bit on the entry of the current thread in enqueuers[] array. This atomic store will indicate to other enqueuers that we are attempting to insert the "item" in the queue.


Next step is to link the last node with the current first of the sublist in case the previous enqueuer did not finish its job... yes SimQueue inserts a sublist at a time instead of a single node, but we'll get to that.

Then, we start helping all the other opened enqueue requests, by checking which ones have a different enqueuers and applied entry. This is the first trick of SimQueue: signaling the open request by toggling a single bit, and then keeping all the currently satisfied requests in the last EnqState instance.
Remember several posts ago when I said that "The Encapsulator" queue was a different way to solve the problem of knowing when a certain enqueue request has been done or not? Well, it's a really hard problem, and SimQueue solves it elegantly in this way. On the original code they used and XOR on the bits, but it's the same thing.


Afterwards, we scan for all open enqueue requests, and for each we create a new node and insert in it the corresponding item, thus creating a sub-list of nodes. When one node has been created and linked for each opened request, we create a EnqState with the start and end of the sublist, the new applied states to know which requests have been satisfied, and try to replace the current EnqState with our own.
If it fails, we try again starting over form the new EnqState, but we know that we need to do this at most two times, a trick original used by Maurice Herlihy in its Universal Wait-Free Construct. In our implementation we do it three times because we need an extra iteration to make sure the sub-list is properly linked.



One of the cool things is that the construction of the sub-list is single-threaded code and only after it has been done do we try to insert the whole sub-list at the tail of the current list, by doing the CAS on the EnqState, and not on the tail.next
The disadvantage is that when the CAS fails we need to throw away all the created nodes. On the original code there is a pool of nodes that can be re-used, and the same thing can be done here, but I didn't want to complicate the code too much.

Source code to an over-simplified SimQueue in Java can be found here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/CRSimQueue.java
That's it for now, we'll look at the dequeue() method on the next post.

Merry Christmas!

No comments:

Post a Comment