CLR Inside Out

9 Reusable Parallel Data Structures and Algorithms

Joe Duffy

Code download available at:  CLR Inside Out 2007_05.exe(156 KB)

Contents

Countdown Latch
Reusable Spin Wait
Barriers
Blocking Queue
Bounded Buffer
Thin Event
Lock-Free LIFO Stack
Loop Tiling
Parallel Reductions
Conclusion

This column is less about the mechanics of a common language runtime (CLR) feature and more about how to efficiently use what you’ve got at your disposal. Selecting the right data structures and algorithms is, of course, one of the most common yet important decisions a programmer must make. The wrong choice can make the difference between success and failure or, as is the case most of the time, good performance and, well, terrible performance. Given that parallel programming is often meant to improve performance and that it is generally more difficult than serial programming, the choices are even more fundamental to your success.

In this column, we’ll take a look at nine reusable data structures and algorithms that are common to many parallel programs and that you should be able to adapt with ease to your own .NET software. Each example is accompanied by fully working, though not completely hardened, tested, and tuned, code. The list is by no means exhaustive, but it represents some of the more common patterns. As you’ll notice, many of the examples build on each other.

There’s something I should mention at the outset. The Microsoft® .NET Framework has several existing concurrency primitives. While I’ll show you how to build your own primitives, the existing ones are completely suitable for most situations. I just want to demonstrate that alternatives are sometimes worth considering. Besides, seeing these techniques in action will deepen your understanding of parallel programming in general. I will assume a basic understanding of the existing primitives, though. See Vance Morrison’s "What Every Dev Must Know About Multithreaded Apps" in the August 2005 issue of MSDN® Magazine for a comprehensive overview.

Now, let’s have a look at the techniques.

Countdown Latch

Semaphores are one of the better known data structures in concurrent programming for many reasons, not the least of which is that semaphores have a long history in computer science dating back to operating system design in the 1960s. A semaphore is simply a data structure with a count field, and it supports two operations: put and take (often called P and V, respectively). A put operation increments the semaphore’s count by one, and a take decrements it by one. When the semaphore’s count becomes zero, any subsequent attempts to take from it will block (wait) until another a concurrent put makes the count non-zero. Both are atomic, concurrency-safe operations, ensuring that concurrent puts and takes are serialized with respect to one another. Windows® has first-class kernel and Win32® support for semaphore objects (see CreateSemaphore and related APIs), and they are surfaced in the .NET Framework through the System.Threading.Semaphore class. Critical regions, as supported by Mutex and Monitor, are often characterized as a special semaphore with a count that toggles between 0 and 1—in other words, a binary semaphore.

A kind of "reverse semaphore" is often useful, too. That is to say, sometimes you want a data structure that supports waiting for the data structure’s count to reach zero. Fork/join parallelism—where a single "master" thread controls the execution of n "subservient" threads and then waits for them to finish—is quite common in data parallel programming, and having a reverse semaphore on hand is very useful for such cases. Most of the time, you don’t actually want waking threads to modify the count, so in this case we’ll call the structure a countdown "latch" to indicate that counts decrease, and that once set to the signaled state, the latch remains signaled (a property often associated with latches). Sadly, neither Windows nor the .NET Framework supports such a data structure. Thankfully, however, it’s simple to build one.

To build a countdown latch, you just initialize its counter to n, and have each subservient task atomically decrement it by one when it finishes, for example by surrounding the decrement operation with a lock or with a call to Interlocked.Decrement. Then, instead of a take operation, a thread could decrement and wait for the counter to become zero; when awoken, it will know that n signals have been registered with the latch. Instead of spinning on this condition, as in while (count != 0), it’s usually a good idea to let the waiting thread block, in which case you then have to use an event. Figure 1 is an example of a simple CountdownLatch type.

Figure 1 CountdownLatch

public class CountdownLatch {
    private int m_remain;
    private EventWaitHandle m_event;

    public CountdownLatch(int count) {
        m_remain = count;
        m_event = new ManualResetEvent(false);
    }

    public void Signal() {
        // The last thread to signal also sets the event.
        if (Interlocked.Decrement(ref m_remain) == 0)
            m_event.Set();
    }

    public void Wait() {
        m_event.WaitOne();
    }
}

This is surprisingly straightforward, but can be tricky to get right. We’ll see some examples of how you might use this data structure later. Note that the basic implementation shown here leaves room for various improvements, including: adding some degree of spin waiting before invoking WaitOne on the event, lazily allocating the event instead of doing it in the constructor (in case spinning is sufficient to avoid blocking altogether, as is demonstrated by the ThinEvent later in this column), adding reset capabilities, and providing a Dispose method so that the internal event object can be closed when no longer needed. These are all left as exercises for the reader.

Reusable Spin Wait

Though blocking is generally preferred to busy waiting, circumstances do arise in which you may want to spin for some time before falling back to a real wait. The reason that this can be useful is subtle, and most people initially avoid spin waiting because it seems like pure wasted work; if a context switch (which happens any time a thread waits on a kernel event) costs several thousands of cycles (we’ll call this c), which it does on Windows, and if the condition the thread is waiting for occurs in fewer than 2c cycles time (1c for the wait itself, 1c for waking up), then spinning could decrease the overhead and latency caused by waiting, boosting your algorithm’s overall throughput and scalability.

Once you decide to use a spin wait, you must tread with care. There are many issues that may come up: making sure you call Thread.Spin­Wait inside the spin loop to improve hardware availability to other hardware threads on Intel Hyper-threaded machines; calling Thread.Sleep with an argument of 1 instead of 0 once in a while to avoid priority inversion; using a slight back-off to introduce randomization, improving locality (assuming the caller is continuously rereading shared state), and possibly avoiding livelock; and, of course, always yielding on a single-CPU machine (since spinning in such environments is entirely wasteful).

The SpinWait class is defined as a value type so that it’s cheap to allocate (see Figure 2). We can now use this algorithm to avoid blocking in the CountdownLatch algorithm shown previously:

Figure 2 SpinWait

public struct SpinWait {
    private int m_count;
    private static readonly bool s_isSingleProc =
        (Environment.ProcessorCount == 1);
    private const int s_yieldFrequency = 4000;
    private const int s_yieldOneFrequency = 3*s_yieldFrequency;

    public int Spin() {
        int oldCount = m_count;

        // On a single-CPU machine, we ensure our counter is always
        // a multiple of ‘s_yieldFrequency’, so we yield every time.
        // Else, we just increment by one.
        m_count += (s_isSingleProc ? s_yieldFrequency : 1);

        // If not a multiple of ‘s_yieldFrequency’ spin (w/ backoff).
        int countModFrequency = m_count % s_yieldFrequency;
        if (countModFrequency > 0)
            Thread.SpinWait((int)(1 + (countModFrequency * 0.05f)));
        else
            Thread.Sleep(m_count <= s_yieldOneFrequency ? 0 : 1);

        return oldCount;
    }

    private void Yield() {
        Thread.Sleep(m_count < s_yieldOneFrequency ? 0 : 1);
    }
}

private const int s_spinCount = 4000;
public void Wait() {
    SpinWait s = new SpinWait();
    while (m_remain > 0) {
        if (s.Spin() >= s_spinCount) m_event.WaitOne();
    }
}

The choice of yield frequency and spin count is admittedly arbitrary. Just like Win32 critical section spin counts, these numbers should be chosen based on testing and experimentation, and the right answer is apt to differ from one system to the next. The MSDN documentation, for instance, recommends a spin count of 4,000 for critical sections based on experience from the Microsoft Media Center and Windows kernel teams, but your mileage will vary. The perfect number depends on many factors, including the number of threads waiting for an event at any given time, the frequency with which events occur, and so on. In most cases, you’ll want to eliminate the explicit yields by waiting on an event instead, as shown with the latch example.

You can even choose a dynamically adjusting count: for example, start at a medium number of spins, and each time spinning is unsuccessful, increment the count. Once the count reaches a predetermined maximum, stop spinning altogether and just issue the WaitOne right away. The logic is as follows: you are willing to burn up to the predetermined maximum number of cycles, but no more. If you find that this maximum is not enough to prevent the context switch, then doing the context switch right away will be cheaper in total. Over time, you would hope the spin count reaches a stable value.

Barriers

A barrier, also known as a rendezvous point, is a concurrency primitive that permits threads to coordinate with each other without the need for another "master" thread orchestrating everything. Each of the threads signals and waits, atomically, once it reaches the barrier. All threads are allowed to proceed only when all n have reached it. This can be used for cooperative algorithms, as are common in scientific, mathematical, and graphics domains. Using barriers is suitable in many computations; in fact, the CLR’s garbage collector even uses them. Barriers simply break a bigger computation into smaller, cooperative phases, for example:

const int P = ...;
Barrier barrier = new Barrier(P);
Data[] partitions = new Data[P];

// Running on ‘P’ separate threads in parallel:
public void Body(int myIndex) {
    FillMyPartition(partitions[myIndex]);
    barrier.Await();
    ReadOtherPartition(partitions[P – myIndex - 1]);
    barrier.Await();
    // ...
}

You’ll quickly notice that a countdown latch could be used in this situation. Instead of calling Await, each thread could call Signal followed immediately by a call to Wait; all threads will be released once they all reach the barrier. But there is a problem: our earlier latch doesn’t support reusing the same object multiple times, a handy property all barriers should support. The above example, in fact, requires this. You can use separate barrier objects to achieve this, but that would be wasteful; there’s no need for more than one since all threads are only ever in one phase at a time.

To solve this problem, you can start with the same basic countdown latch algorithm to deal with decrementing the counter, signaling the event, waiting, and so forth, extending it to support reuse. To do this, you need to use a so-called sense reversing barrier, which requires alternating between "even" and "odd" phases. You use a separate event for the alternative phases. Figure 3 is a sample implementation of such a Barrier data structure.

Figure 3 Barrier

using System;
using System.Threading;

public class Barrier {
    private volatile int m_count;
    private int m_originalCount;
    private EventWaitHandle m_oddEvent;
    private EventWaitHandle m_evenEvent;
    private volatile bool m_sense = false; // false==even, true==odd.

    public Barrier(int count) {
        m_count = count;
        m_originalCount = count;
        m_oddEvent = new ManualResetEvent(false);
        m_evenEvent = new ManualResetEvent(false);
    }

    public void Await() {
        bool sense = m_sense;

        // The last thread to signal also sets the event.
        if (m_count == 1 || Interlocked.Decrement(ref m_count) == 0) {
            m_count = m_originalCount;
            m_sense = !sense; // Reverse the sense.
            if (sense == true) { // odd
                m_evenEvent.Reset();
                m_oddEvent.Set();
             } else { // even
                m_oddEvent.Reset();
                m_evenEvent.Set();

            }
        } else {
            if (sense == true) m_oddEvent.WaitOne();
            else               m_evenEvent.WaitOne();
        }

    }
}

The reason you need two events is subtle. One approach would be to do a Set immediately followed by Reset in Await, but this is dangerous and will lead to deadlocks for two reasons. First, another thread may have decremented m_count, but not yet reached the call to WaitOne on the event when the thread calls Set and then Reset in quick succession. Second, though the waiting thread might have reached the call to WaitOne, alertable waits, like the ones the CLR always uses, can interrupt the wait, temporarily removing a waiting thread from the wait queue so it can run an asynchronous procedure call (APC). The waiting thread will never see the event in a set state. Both cases lead to missed events and probable deadlocks. Avoid this by using separate events for odd and even phases.

You might want to go ahead and add spinning to the Barrier, just as with the CountdownLatch. But you’ll encounter a problem if you try: normally the spinning thread would spin until it saw an m_count of 0. With the implementation above, however, m_count will never actually reach 0 before the last thread resets it to m_originalCount. The naïve approach to spinning would lead to one or more threads spinning (forever) and all other threads being blocked (forever) on the next phase. The solution is easy. You spin, waiting for the sense to change, as shown in Figure 4.

Figure 4 Waiting

public void Await() {
    bool sense = m_sense;

    // The last thread to signal also sets the event.
    if (m_count == 1 || Interlocked.Decrement(ref m_count) == 0) {
        m_count = m_originalCount;
        m_sense = !sense; // Reverse the sense.
        if (sense == true) { // odd
            m_evenEvent.Set();
            m_oddEvent.Reset();
        } else { // even
            m_oddEvent.Set();
            m_evenEvent.Reset();
        }
    } else {
        SpinWait s = new SpinWait();
        while (sense == m_sense) {
            if (s.Spin() >= s_spinCount) {
                if (sense == true) m_oddEvent.WaitOne();
                else               m_evenEvent.WaitOne();
            }
        }
    }
}

Because all threads must leave the Await from a previous phase before a subsequent phase can complete, you can be assured that all threads will either observe that the sense changed or will end up waiting on our event and being awakened that way.

Blocking Queue

In shared-memory architectures, the sole point of synchronization between two or more tasks is often a central, shared-collection data structure. Frequently one or more tasks are responsible for generating "work" for one or more other tasks to consume, referred to as a producer/consumer relationship. Simple synchronization for such data structures is typically straightforward—using a Monitor or ReaderWriterLock will do the trick—but coordination among tasks when the buffer becomes empty is harder. This problem is normally solved with a blocking queue.

Actually, there are several slight variants of blocking queues, ranging from the simple—where the consumer blocks only when the queue is empty—to the complex—in which each producer is "paired" with exactly one consumer; that is, the producer is blocked until a consumer arrives to process the enqueued item and, similarly, the consumer is blocked until a producer delivers an item. FIFO (first in first out) ordering is common, but not always necessary. Buffers can also be bounded, as we’ll see later. We only look at the pairing variant here, since the bounded buffer shown later includes the simpler blocking-when-empty behavior.

To implement this, just wrap a simple Queue<T> with synchronization on top. What kind of synchronization? Whenever a thread enqueues an element, it waits for a consumer to dequeue the element before returning. When a thread dequeues an element, should it find the buffer to be empty, it must wait for a new element to arrive. And of course after dequeueing, the consumer must signal the producer that it has taken its item (see Figure 5).

Figure 5 Blocking Queue

class Cell<T> {
    internal T m_obj;
    internal Cell(T obj) { m_obj = obj; }
}
 
public class BlockingQueue<T> {
    private Queue<Cell<T>> m_queue = new Queue<Cell<T>>();
    public void Enqueue(T obj) {
        Cell<T> c = new Cell<T>(obj);
        lock (m_queue) {
            m_queue.Enqueue(c);
            Monitor.Pulse(m_queue);
            Monitor.Wait(m_queue);
        }
    }
    public T Dequeue() {
        Cell<T> c;
        lock (m_queue) {
            while (m_queue.Count == 0)
                Monitor.Wait(m_queue);
            c = m_queue.Dequeue();
            Monitor.Pulse(m_queue);
        }
        return c.m_obj;
    }
}

Notice that we call Pulse and then Wait in the Enqueue method, and similarly Wait and then Pulse in Dequeue. Because of the way monitors are implemented (internal events are set before the monitor is released), this can cause some thread scheduling ping-pong. We might consider building a finer-grained notification mechanism instead, perhaps using Win32 events. However, using full-fledged Win32 events like this can introduce quite a bit of overhead—particularly the allocation cost and kernel transitions when using them—so it might be time to consider alternatives. You could pool them, just as the CLR’s ReaderWriterLock does, or lazily allocate them, as our ThinEvent type (shown later) does. This implementation also has the disadvantage of allocating an object for each new element; an alternative approach might be to pool these objects too, but that also comes with some additional complexities.

Bounded Buffer

A resource consumption problem can arise in some kinds of queues. If the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage.

To illustrate this, imagine a system in which a single producer enqueues 50 items/second, and the consumer only consumes at a rate of 10 items/second. First, the system is imbalanced and will not scale well with a 1-to-1 producer-to-consumer configuration. After just one minute, 2,400 items will have piled up in the buffer. If these items consume, say, 10KB apiece, then this represents 24MB of memory just for the buffer itself. After an hour, this will have grown to over 1GB. Weighting the number of producer threads to consumer threads is one solution, which, in this case, means a ratio of one producer to five consumers. But arrival rates are often volatile, causing periodic imbalances and resulting in dramatic problems; a simple, fixed ratio won’t solve this.

On a server, where programs are often long-running and expected to exhibit good uptimes, the potential for unbounded memory usage can wreak absolute havoc, possibly leading to a situation in which the server process must be recycled regularly.

A bounded buffer allows you to place a limit on the size that the buffer may reach before the producer is forced to block. Blocking the producer gives the consumer a chance to "catch up" (by allowing its thread to receive a scheduling timeslice) while at the same time eliminating the memory build-up issue. Our approach, again, is to simply wrap a Queue<T>, adding two wait conditions and two event notification conditions: a producer waits when the queue is full (until it becomes non-full) and a consumer waits when the queue is empty (until it becomes non-empty); a producer signals waiting consumers when it has produced an item, and a consumer signals a producer when it has taken an item, as shown in Figure 6.

Figure 6  Bounded Buffer

public class BoundedBuffer<T> {
    private Queue<T> m_queue = new Queue<T>();
    private int m_consumersWaiting;
    private int m_producersWaiting;
    private const int s_maxBufferSize = 128;

    public void Enqueue(T obj) {
        lock (m_queue) {
            while (m_queue.Count == (s_maxBufferSize - 1)) {
                m_producersWaiting++;
                Monitor.Wait(m_queue);
                m_producersWaiting--;
            }
            m_queue.Enqueue(obj);
            if (m_consumersWaiting > 0)
                Monitor.PulseAll(m_queue);
        }
    }

    public T Dequeue() {
        T e;
        lock (m_queue) {
            while (m_queue.Count == 0) {
                m_consumersWaiting++;
                Monitor.Wait(m_queue);
                m_consumersWaiting--;
            }
            e = m_queue.Dequeue();
            if (m_producersWaiting > 0)
                Monitor.PulseAll(m_queue);
        }
        return e;
    }
}

Again, a somewhat naïve approach has been taken. But we do optimize the calls to PulseAll—since they are anything but cheap—by maintaining two counters, m_consumersWaiting and m_producersWaiting, and only signaling if the respective value is non-zero. There are further opportunities for improvement. For example, sharing a single event like this might wake up too many threads: if a consumer reduces the queue’s size to 0, and there are both waiting producers and waiting consumers, clearly you want to wake only the producers (at least at first). This implementation will service all waiters in FIFO order, meaning you might have to wake the consumers before any producer runs, only for them to find out that the queue is empty and subsequently wait again. Thankfully, ending up with simultaneous waiting producers and consumers is quite rare, but it can happen with regularity with small bounds sizes.

Thin Event

Win32 events have one big advantage over Monitor.Wait, Pulse, and PulseAll: they are "sticky." That means once an event has been signaled, any subsequent waits will immediately unblock, even if the threads had not begun waiting before the signal occurred. Without this feature, you frequently need to either write inefficient code where all waiting and signaling happens strictly inside a critical region—this is inefficient because the Windows scheduler always boosts an awoken thread’s priority, always incurring a context switch only for the waking thread to immediately wait again for the critical region—or you must employ some tricky, race-condition-prone code.

Instead of either approach, you can instead use a "thin event," a reusable data structure that spins briefly before blocking, lazily allocates a Win32 event only when necessary, and otherwise gives you manual-reset-event-like behavior. In other words, it encapsulates that tricky race-condition-prone code so that you don’t have to scatter it throughout your entire code-base. This example relies on some memory model guarantees described in Vance Morrison’s article, and should be used with extreme care (see Figure 7).

Figure 7 Thin Event

public struct ThinEvent {
    private int m_state; // 0 means unset, 1 means set.
    private EventWaitHandle m_eventObj;
    private const int s_spinCount = 4000;

    public void Set() {
        m_state = 1;
        Thread.MemoryBarrier(); // required.
        if (m_eventObj != null)
            m_eventObj.Set();
    }

    public void Reset() {
        m_state = 0;
        if (m_eventObj != null)
            m_eventObj.Reset();
    }

    public void Wait() {
        SpinWait s = new SpinWait();
        while (m_state == 0) {
            if (s.Spin() >= s_spinCount) {
                if (m_eventObj == null) {
                    ManualResetEvent newEvent =
                        new ManualResetEvent(m_state == 1);
                    if (Interlocked.CompareExchange<EventWaitHandle>(
                            ref m_eventObj, newEvent, null) == null) {
                       // If someone set the flag before seeing the new
                       // event obj, we must ensure it’s been set.
                       if (m_state == 1)
                           m_eventObj.Set();
                    } else {
                        // Lost the race w/ another thread. Just use
                        // its event.
                        newEvent.Close();
                    }
                }
                m_eventObj.WaitOne();
            }
        }
    }
}

This basically mirrors the event state in an m_state variable, where the value 0 means unset and 1 means set. Waiting on a set event is now really cheap; if m_state is 1 in the entrance to the Wait routine, we return immediately, no kernel transitions needed. The trickiness arises when a thread waits before the event has been set. The first thread to wait must allocate a new event object and compare-and-swap it into the m_eventObj field; if the CAS fails, this means another waiter initialized the event, so we can just reuse it; otherwise, we must recheck that m_state hasn’t changed since we saw it last. Otherwise, m_state could be 1 and m_eventObj could be unsignaled, leading to a deadlock when we call WaitOne. The thread calling Set must set m_state first and then, if it sees a non-null m_eventObj, call Set on it. Two memory barriers are required: the second read of m_state must not be moved earlier, which we guarantee with the use of Interlocked.CompareExchange to set m_eventObj; and the read of m_eventObj in Set mustn’t be moved before the write to m_eventObj (a somewhat surprising legal transformation on some Intel and AMD processors, and the CLR 2.0 memory model, without the explicit call to Thread.MemoryBarrier). Resetting the event is generally not safe to do in parallel, so additional synchronization is needed by the caller.

You can now easily use this elsewhere, such as in the Count­downLatch and the queue examples above, typically with a sizeable performance gain, particularly if you use spinning intelligently.

We showed a tricky implementation above. Note that you can implement both auto- and manual-reset types using a single flag and monitors, considerably simpler than this example (but not always as efficient).

Lock-Free LIFO Stack

Building a thread-safe collection using locks is pretty straightforward, even when bounding and blocking complicate things (as we saw above). When all coordination occurs via a simple last-in-first-out (LIFO) stack data structure, however, using a lock could be more expensive than is absolutely required. A thread’s critical region, the time during which a lock is held, has a beginning and an end, the duration of which may span many instructions’ worth of time. Holding the lock prevents other threads from reading and writing simultaneously. This achieves serialization, which is of course what we want, but is strictly stronger than we need: we’re just pushing and popping elements on and off a stack, both of which can be achieved with regular reads and a single compare-and-swap write. We can exploit this fact to build a more scalable, lock-free stack that doesn’t force threads to wait unnecessarily.

Our algorithm works as follows. You use a linked list to represent the stack, the head of which represents the top of the stack, stored in the m_head field. When pushing a new item onto the stack, you construct a new node with the value you’re going to push onto the stack, read the m_head field locally, store it into the new node’s m_next field, and then do an atomic Interlocked.CompareExchange to replace the stack’s current head. If the head has changed at any point in this sequence (since it was first read), the CompareExchange will fail, and the thread must loop back around and try the whole sequence again. Popping is similarly straightforward. You read the m_head and try to swap it with our local copy’s m_next reference; if it fails, you just keep trying, as you see in Figure 8. Win32 offers an analogous data structure, called an SList, built using a similar algorithm.

Figure 8 Lock-Free Stack

public class LockFreeStack<T> {
    private volatile StackNode<T> m_head;

    public void Push(T item) {
        StackNode<T> node = new StackNode<T>(item);
        StackNode<T> head;
        do {
            head = m_head;
            node.m_next = head;
        } while (m_head != head || Interlocked.CompareExchange(
                ref m_head, node, head) != head);
    }

    public T Pop() {
        StackNode<T> head;
        SpinWait s = new SpinWait();

        while (true) {
            StackNode<T> next;
            do {
                head = m_head;
                if (head == null) goto emptySpin;
                next = head.m_next;
            } while (m_head != head || Interlocked.CompareExchange(
                    ref m_head, next, head) != head);
            break;

        emptySpin:
            s.Spin();
        }

        return head.m_value;
    }
}

class StackNode<T> {
    internal T m_value;
    internal StackNode<T> m_next;
    internal StackNode(T val) { m_value = val; }
}

Note that this is a form of optimistic concurrency control: instead of blocking other threads from accessing data, you just proceed with the hope that you’ll "win" the race. If this hope turns out to be wrong, you can encounter liveness issues such as livelock. This design choice also implies that you can’t reliably achieve FIFO scheduling. All threads in the system will probabilistically make forward progress. And in fact, our system as a whole has deterministic forward progress, since the failure of one thread always means at least one other thread has made progress (a requirement for calling this "lock-free"). Sometimes it is useful to use an exponential backoff when a CompareExchange fails to avoid massive memory contention on m_head.

Also, we’ve taken a rather naïve approach to the case in which the stack has become empty. We just spin forever, waiting for a new item to be pushed. It’s straightforward to rewrite Pop into a non-waiting TryPop method, and a bit more complex to make use of events for waiting. Both are important features and left as exercises for the motivated reader.

We incur an object allocation for each Push, saving us from having to worry about so-called ABA problems. ABA occurs due to the internal reuse of nodes that have been popped off the list. Developers sometimes will try to pool nodes to reduce the number of object allocations, but this is problematic: the result can be that an atomic operation erroneously succeeds even though there have been a number of intervening writes to m_head. (For example: node A is read by thread 1, then removed by thread 2 and placed into the pool; B is pushed as the new head by thread 2, then A returned from the pool to thread 2 and pushed; thread 1 then succeeds with its CompareExchange, even though the A now at the head is different from the one it read.) A similar problem occurs if you try to write this algorithm in native C/C++; because the memory allocator can reuse addresses as soon as they are freed, a node can be popped, freed, and then its address can be subsequently handed out to a new node allocation, causing the same problem. We’ll omit any further discussion of ABA here—it has been described at length elsewhere.

Lastly, it’s possible to write a FIFO queue using similar lock-free techniques. This is attractive because threads pushing and popping concurrently will not necessarily conflict with one another, in contrast with the LockFreeStack shown above in which pushers and poppers are always fighting for the same m_head field. The algorithm is rather complex, however, so if you’re curious, I encourage you to take a look at Maged M. Michael and Michael L. Scott’s 1996 paper, "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms".

Loop Tiling

Loop tiling is the practice of partitioning the input range or data for a loop and assigning each partition to a separate thread in order to achieve concurrency. This is the primary technique for achieving parallelism with some programming models, like OpenMP (see Kang Su Gatlin’s MSDN Magazine article, and is often referred to as a parallel forall loop, inspired by high-performance FORTRAN’s terminology. No matter if the range is just a set of indices:

for (int i = 0; i < c; i++) { ... } 

or a range of data:

foreach (T e in list) { ... }

we can devise partitioning techniques that give us a tiled loop.

There are many data structure-specific partitioning techniques you might want to apply, certainly too many to enumerate in this column. So we’ll just focus on a common technique, which is to assign disjoint ranges of elements from the array to each partition. We simply calculate a stride which is roughly the number of elements divided by the number of partitions, and use that to calculate contiguous ranges (see Figure 9). This yields good spatial locality when the input is an array of value types, though other approaches are certainly valid, useful, and sometimes necessary.

Figure 9 Loop Tiling

public static void ForAll(int from, int to, Action<int> a, int p) {
    ForAll<int>(null, from, to, null, a, p);
}
public static void ForAll<T>(IList<T> data, Action<T> a, int p) {
    ForAll<T>(data, 0, data.Count, a, null, p);
}

private static void ForAll<T>(IList<T> data, int from, int to,
        Action<T> a0, Action<int> a1, int p) {
    int size = from - to;
    int stride = (size + p - 1) / p;
    CountdownLatch latch = new CountdownLatch(p);

    for (int i = 0; i < p; i++) {
        int idx = i;
        ThreadPool.QueueUserWorkItem(delegate {
            int end = Math.Min(size, stride * (idx + 1));
            for (int j = stride * idx; j < end; j++) {
                if (data != null) a0(data[j]);
                else              a1(j);
            }

            latch.Signal();
        });
    }

    latch.Wait();
}

We offer two public versions of ForAll here: one that accepts a range of numbers, and the other that accepts an IList<T>, just like a foreach loop in C#. Both forward to the same helper overload, which invokes either an action passing the element from the list at the given index or passes the index itself. You could use the first overload where you’d typically place a normal for loop. For example, this code

for (int i = 0; i < 10; i++) { S; }

becomes:

Parallel.ForAll(0, 10, delegate(int i){ S; },
    Environment.ProcessorCount);

And you can use the second where you’d typically place a C# foreach loop, so that

List<T> list = ...;
foreach (T e in list) { S; }

becomes:

Parallel.ForAll(list, delegate(T e) { S; }, 
    Environment.ProcessorCount);

You need to be careful that no statements in S write to shared memory; otherwise you will need to add proper synchronization to the parallel versions. Versions can of course be written to accommodate any IEnumerable<T>, partition the iteration space in a different way, and so forth (all of which has been omitted from this column to conserve space). And in this example, the calling thread is "wasted" for the duration of the n subtasks. A better approach would be to use the calling thread to run one of the tasks itself, and then join with the others upon its completion. Extending the ForAll method to do this is trivial.

Parallel Reductions

There is a category of operations that can be performed using a reduction (also known as a fold or aggregation) where many values are combined in some way to produce a single output. A general reduction works as follows. You take a binary operator—that is, a function with two arguments—and compute it over a vector or set of elements of size n, from left to right. For j = 0 to n - 1, you invoke the binary operator, passing as input to the jth iteration the output of invoking the operator on element j - 1 as the first argument and the jth element itself as the second argument. A special seed value is used for the first argument to the 0th element, since there is no previous value to use. A final (and optional) result selector is then used to convert an intermediate value into a final result.

Let’s see an example of this. If the binary operator is + and the input is a vector of 5 elements, {1, 2, 3, 4, 5}, then the expanded computation looks like ((((1 + 2) + 3) + 4) + 5). If you convert this expansion into function-calling form, it looks like this (assuming 0 for a seed): +(+(+(+(+(0, 1), 2), 3), 4), 5) In other words, you just compute the sum of all of the input numbers. This is called a sum reduction. A straightforward translation of this generalized algorithm into a serial algorithm might look like this:

delegate T Func<T>(T arg0, T arg1);

T Reduce<T>(T[] input, T seed, Func<T> r) {
    T result = seed;
    foreach (T e in input)
        result = r(result, e);
    return result;
}

When invoking it, you can now simply sum up a set of numbers like this (in C# 3.0):

int[] nums = ... some set of numbers ...;
int sum = Reduce(nums, 0, (x,y) => x + y;);

All this is pretty abstract but, in addition to sum, many operations can be expressed as a reduction, as you see in Figure 10.

Figure 10 Operations Expressed as Reductions

  Seed Binary Operator Result Selector
Count 0 (a, b) => a + 1 N/A
Sum 0 (a, b) => a + b N/A
Min NaN (a, b) => a < b ? a : b N/A
Max NaN (a, b) => a > b ? a : b N/A
Average { 0, 0 } (a, b) => new { a[0] + b, a[1] + 1 } (a) => a[0] / a[1]

Given the nums array above, we can use our reduction routine to find the min and max of the array:

int min   = Reduce(nums, int.MaxValue, (x,y) => x < y ? x : y;);
int max   = Reduce(nums, int.MinValue, (x,y) => x > y ? x : y;);

(Count is omitted because the partial results must be summed, requiring two separate binary operators, and Average is omitted because it also requires some extra steps.)

You can use a similar technique to the one discussed for loop tiling to break apart the input data and perform the reduction in parallel. Each partition will compute its own intermediate value that we’ll then combine into a single final value, using the same operator used to calculate the intermediate values. Why is this possible? Because all of the operations mentioned above are associative; recall from your elementary mathematics that an associative binary operator + simply means that (a + b) + c = a + (b + c); that is, the order of evaluation doesn’t matter to the correctness of the calculation.

For example, consider the sum reduction. If you partition the input data { 1, 2, 3, 4 } into two partitions, {1, 2} and {3, 4}, then, because + is associative, when you add the results of the independent additions, the result is the same: (1 + 2) + (3 + 4) = ((((1 + 2) + 3) + 4). In fact, any disjoint partitioning of the input yields correct results. Figure 11 shows a generalized Reduce method which takes the seed and binary operator as arguments, using a stride partitioning approach as described earlier.

Figure 11 A Reduce Method

public delegate T Func<T>(T arg0, T arg1);

public static T Reduce<T>(IList<T> data, int p, T seed, Func<T> r){
    T[] partial = new T[p];
    int stride = (data.Count + p - 1) / p;
      CountdownLatch latch = new CountdownLatch(p);

    for (int i = 0; i < p; i++) {
        int idx = i;
          ThreadPool.QueueUserWorkItem(delegate {
            // Do the ‘ith’ intermediate reduction in parallel.
            partial[idx] = seed;
            int end = Math.Min(data.Count, stride * (idx + 1));
            for (int j = stride * idx; j < end; j++)
                partial[idx] = r(partial[idx], data[j]);
            latch.Signal();
        });
    }

    latch.Wait();

    // Do the final reduction on the master thread.
     T final = seed;
    for (int i = 0; i < p; i++)
        final = r(final, partial[i]);

    return final;
}

The master thread forks off a set of p worker threads, each of which calculates an intermediate result by performing a reduction over its own partition of data and placing the value into its dedicated slot in the intermediates array. The master thread then waits for all children to finish (using the CountdownLatch from earlier), and performs a final reduction step using the partial results from each child as the input. In the literature, tree reductions are quite common, where each node in the tree does a partial reduction on a certain number of intermediate results. While this theoretically leads to a more scalable algorithm, with the number of threads you’re apt to deal with and the associated synchronization costs on Windows, the reality is that the serial reduction as shown performs better for common values of p and binary operators.

There are plenty of opportunities for optimization here—such as reusing the calling thread for one of the partitions—but nevertheless, the example should illustrate the point well enough. To support operations like average, where the intermediate reduction operation is different for the intermediary and final reduction stages, and where a final result selection routine is needed, we need a slightly different API. This is a fairly simple exercise.

Conclusion

In this column, I’ve shown you some low-level parallel data structures and algorithms that can help you write managed code that takes advantage of multi-processor and multi-core architectures. As with all programming, abstractions tend to clump together in layers, with the most performance-critical typically residing at the bottom. It’s safe to say that many of the techniques shown in this column are at the very bottom, serving as a foundation for higher-level abstractions and application-specific parallel code. Though selecting the right data structures and basic algorithms is only one step in a longer process, I hope you’ll come away with a deeper understanding of parallel programming techniques, which will be instrumental to your success in this new landscape of programming.

Send your questions and comments to clrinout@microsoft.com.

Joe Duffy works on parallel programming models and infrastructure at Microsoft. Some of the code samples in this column are based on his upcoming book, Concurrent Programming on Windows, to be released by Addison Wesley sometime in 2007.