Saturday, December 17, 2016

A C++ implementation of Kogan-Petrank wait-free queue with memory reclamation

KP is one of the best known MPMC wait-free queues. Its design is simple for a wait-free queue, though not as simple as our Turn queue. You can see the original paper and code here http://www.cs.technion.ac.il/~erez/Papers/wfquque-ppopp.pdf
(not to be confused with http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf)
and our implementation of it in Java here:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/KoganPetrankNoSLQueue.java
with self-linking:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/Java/com/concurrencyfreaks/queues/KoganPetrankQueue.java

The major drawback of this queue is that it's meant for Java with a Garbage Collector. We had to implement it in C++ to compare against our Turn queue and so we had to add Hazard Pointers to it. This turned out to be an incredibly difficult task but we managed.

You can get the C++ code here with Hazard Pointers:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/KoganPetrankQueueCHP.hpp


How hard can it be do add hazard pointers and still maintain the queue wait-free?

Just to get an idea of the difficulty of the task, here is a comparison between the dequeue() method in Java with a GC with the C++ with Hazard Pointers implementation:

Java:

public E dequeue() {
    // We better have consecutive thread ids, otherwise this will blow up
    long phase = maxPhase() + 1;
    state.set(TID, new OpDesc<E>(phase, true, false, null));
    help(phase);
    help_finish_deq();
    final Node<E> node = state.get(TID).node;
    if (node == null) return null; // We return null instead of throwing an exception
    final E value = node.next.value;
    node.next = node;              // Self-link to help the GC
    return value;
}

C++:

T* dequeue(const int TID) {
    // We better have consecutive thread ids, otherwise this will blow up
    long long phase = maxPhase(TID) + 1;
    state[TID].store(new OpDesc(phase, true, false, nullptr));
    help(phase, TID);
    help_finish_deq(TID);
    OpDesc* curDesc = hpOpDesc.protect(kHpODCurr, state[TID], TID);
    Node* node = curDesc->node;
    if (node == nullptr) {
        hpOpDesc.clear(TID);
        hpNode.clear(TID);
        OpDesc* desc = state[TID].load();
        for (int i = 0; i < MAX_THREADS; i++) {
            if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break;
            desc = state[TID].load();
            if (desc == OPDESC_END) break;
        }
        hpOpDesc.retire(desc, TID);
        return nullptr; // We return null instead of throwing an exception
    }
    Node* next = node->next
    T* value = next->item.load();
    next->item.store(nullptr); // "next" can be deleted now
    hpOpDesc.clear(TID);
    hpNode.clear(TID);
    hpNode.retire(node, TID); // "node" will be deleted only when node.item == nullptr
    OpDesc* desc = state[TID].load();
    for (int i = 0; i < maxThreads*2; i++) { // Is maxThreads+1 enough?
        if (desc == OPDESC_END) break;
        if (state[TID].compare_exchange_strong(desc, OPDESC_END)) break;
        desc = state[TID].load();
    }
    hpOpDesc.retire(desc, TID);
    return value;
}

See the difference between the two?
Java has 9 lines of code while C++ has 30 lines of code, and it's not about the language, it's just because in C++ we need to use Hazard Pointers (HP) and it has to be done in a wait-free way.   

Keep in mind that there are some very subtle details about this algorithm that we might have missed so we can't give our usual guarantee that this implementation is correct. We didn't design this, we just slapped (wait-free) HP on it, so use it at your own risk. I think it's right, and Andreia thinks it's right, and we never saw an issue in our stress tests, but that is as good as it gets for code that is several pages long and most of it wasn't written by us  ;)


Let's see some of the modifications we had to do to get this wait-free queue working in C++ with Hazard Pointers:

help():
- Line 38 requires an hazard pointer to read the other thread's state. The problem is that the validation of the hazard pointer may fail an infinite number of times because the instance in the state[] may change. However, there is a trick that allows us to stop after a finite number of steps. If we re-check the hp at most NUM_THRDS*2 we have the certainty that that particular thread has completed at least one request for enqueue/dequeue and it is ok to "skip it" and try to help the next thread in the array of states. Why this is so, is non-trivial and left as an exercise to the reader.

maxPhase():
- Line 51 requires an hp to read the other thread's phase. Again a similar problem to the one in help().
This one is even trickier because it has implications on whether or not the Lamport Bakery consensus is still valid if you skip one of the participants.
The answer is yes, it's still valid. The why it is so, would take me a small blog post to explain, so I'm not going to. Conclusion, waiting for NUM_THRDS+1 attempts per hazard pointer is enough, and if after this it's not validated, then just skip over to the next entry of state[]. Notice that this means that when computing the maxPhase it can happen that the return value is -1, even if all the other threads are actively participating and with a high phase, and that the current thread will "jump in front"
of all of them... as surprising as this may seem, it's still wait-free.


Ok, I could go on and on, but I'm bored and have better things to do with my time, and this is enough to get the picture: deploying hazard pointers in code that wasn't designed to do so, can be tricky.... and I didn't even mention that we had to come up with a new variant of hazard pointers to improve the performance of KP queue, but that's a topic for a different post. If you're interested, you can look at the Conditional Hazard Pointers class on github and read section 3.2 of our paper on the Turn queue:
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/CPP/queues/HazardPointersConditional.hpp
https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/crturnqueue-2016.pdf


Next post we'll talk about another queue.

9 comments:

  1. I understand that the code is shorter in Java, but wouldn't garbage collection use a lot of non wait-free locks to allocate and release memory buffers?! in effect making the claim that even the Java version is lock-free a false claim?

    ReplyDelete
    Replies
    1. Yes, I completely agree: because of the GC, the Java implementation of the Kogan-Petrank queue is _not_ lock-free.
      What they claim on their paper is that the "algorithm" itself is lock-free, which indeed is true, though that particular queue algorithm doesn't work without a GC and as Petrank said so himself, there are no lock-free GCs ;)
      http://concurrencyfreaks.blogspot.com/2017/08/lock-free-memory-reclamation-by-erez.html

      Delete
  2. I am confused by the 'help_enq' function in KP's paper. Why check 'last == tail.get()' and 'next == null' is necessary?

    ReplyDelete
  3. Lol, this goes to my point that hand-made wait-free data structures are just too complicated and the best is to use a wait-free STM.
    You should be asking this stuff to Alex or Erez, not to me ;) but here goes why I _think_ they did it that way:

    http://www.cs.technion.ac.il/~erez/Papers/wfquque-ppopp.pdf
    In line 69 they read tail.get() and then last.next.get() and then tail.get() again. The reason for this double check is to make sure that 'last' and 'next' are consistent. This kind of "double-check pattern" is common in lock-free and wait-free algorithms. It only works if the loads are ordered, which is the case for the calls to .get() in Java memory model.
    The check for 'next == null' in line 72 is to cover the case where another thread has inserted a new node but had not yet time to advance the tail.

    Cheers,
    Pedro

    ReplyDelete
    Replies
    1. Can I consider this "double-check" just an optimization, but not because of some bug? Even I remove this "double-check", this "help_enq" function also work?

      Delete
    2. No no no, this is NOT an optimization. If you take out the double-check the algorithm will be INCORRECT!

      I'm afraid my previous explanation was unclear: the 'last' and 'next' HAVE to be read consistently, and seen as there is no "atomic-load-instruction-for-two-non-adjacent-words-in-memory" then the main trick is to use this double-check-with-load-acquire.

      ... if you have to ask, then changing any line of code in a lock-free or wait-free algorithm will turn it into an incorrect algorithm :(

      Delete
    3. Yes, as you say, if remove this check would be incorrect. Thank you so much. I have another question about line 73. In KP's paper said ' element might have been already added to the queue by another concurrent thread". But I think when thread finish 'isStillPending(tid, phase)' check also could bee suspended. And this situation still exist. So if this check is redundant?

      Delete
    4. Better ask Alex or Erez:
      http://www.cs.technion.ac.il/~sakogan/
      http://www.cs.technion.ac.il/~erez/

      If you want to ask difficult questions about wait-free queues, you better ask about one that I was an author of, like this one:
      https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/crturnqueue-2016.pdf

      ;)

      Delete
    5. Thanks for your patient help :)

      Delete