CLR 完全介绍
9 种可重复使用的并行数据结构和算法
Joe Duffy
代码下载位置:
CLRInsideOut2007_05.exe
(156 KB)
Browse the Code Online

目录
本专栏并未涉及很多公共语言运行库 (CLR) 功能的机制问题,而是更多介绍了如何有效使用您手头所具有的工具。身为一名程序员,必须做出很多决策,而选择正确的数据结构和算法无疑是最常见的,也是最重要的决策之一。错误的选择可能导致程序无法运行,而大多数情况下,则决定了性能的好坏。鉴于并行编程通常旨在改进性能,并且要难于串行编程,因此所作的选择对您程序的成功就更为重要。
在本专栏中,我们将介绍九种可重复使用的数据结构和算法,这些结构和算法是许多并行程序所常用的,您应该能够轻松将它们应用到自己的 .NET 软件中。专栏中每个示例随附的代码都是可用的,但尚未经过完全定型、测试和优化。这里列举的模式虽然并不详尽,但却代表了一些较为常见的模式。如您所见,很多示例都是互为补充的。
在开始前,我想还是先介绍一些相关内容。Microsoft
® .NET Framework 提供了几个现有的并发基元。虽然我要为您讲解如何构建自己的基元,但实际上现有基元是足以应付大多数情况的。我只是想说某些可选的方案有时也是有参考价值的。此外,了解这些技巧如何应用于实际操作也有助于加深您对并行编程的整体理解。在开始讲解前,我假定您对现有基元已经有了一个基本的了解。您也可以参阅《MSDN
® 杂志》2005 年 8 月版的文章“
关于多线程应用程序:每个开发人员都应了解的内容”,以全面了解其概念。
好了,下面就让我们一起认识这些技巧吧。
倒计数锁存 (Countdown Latch)
Semaphore 之所以成为并发编程中一种较为知名的数据结构,原因是多方面的,而并不只是因为它在计算机科学领域有着悠久的历史(可以追溯到 19 世纪 60 年代的操作系统设计)。Semaphore 只是一种带有一个计数字段的数据结构,它只支持两种操作:放置和取走(通常分别称为 P 和 V)。一次放置操作会增加一个 semaphore 计数,而一次取走操作会减少一个 semaphore 计数。当 semaphore 计数为零时,除非执行一项并发的放置操作使计数变为非零值,否则任何后续的取走尝试都将阻塞(等待)。这两种操作均为不可再分 (atomic) 操作,并发时不会产生错误,能够确保并发的放置和取走操作有序地进行。Windows® 具有基础内核和对 semaphore 对象的 Win32® 支持(请参阅 CreateSemaphore 和相关 API),并且在 .NET Framework 中这些对象可以通过 System.Threading.Semaphore 类公开到上层。Mutex 和 Monitor 所支持的临界区,通常被认为是一种特殊的 semaphore,其计数会在 0 和 1 之间来回切换,换句话说,是一个二进制的 semaphore。
另外还有一种“反向 semaphore”也是非常有用。也就是说,有时您需要数据结构能够等待数据结构计数归零。Fork/join 式并行模式在数据并行编程中是极为常见的,其中由单个“主”线程控制执行若干“辅助”线程并等待线程执行完毕。在这类情况下,使用反向 semaphore 会很有帮助。大多数时候,您其实并不想唤醒线程来修改计数。因此在这种情况下,我们将结构称为倒计数“锁存”,用来表示计数的减少,同时还表明一旦设置为“Signaled”状态,锁存将保持“signaled”(这是一个与锁存相关的属性)。遗憾的是,Windows 和 .NET Framework 均不支持这种数据结构。但令人欣慰的是,构建这种数据闭锁并不难。
要构建一个倒计数锁存,只需将其计数器的初始值设为 n,并让每项辅助任务在完成时不可再分地将 n 减掉一个计数,这一点可以通过为减量操作加上“锁”或调用 Interlocked.Decrement 来实现。接下来,线程可以不执行取走操作,而是减少计数并等待计数器归零;而当线程被唤醒时,它就可以得知已经有 n 个信号向锁存注册。
在 while (count != 0) 循环中,让等待的线程阻塞通常是不错的选择(这种情况下,您稍后将不得不使用事件),而不是使用旋转。
图 1 是一个简单的 CountdownLatch 类型的示例。

Figure 1 CountdownLatch
|
public class CountdownLatch {
private int m_remain;
private EventWaitHandle m_event;
public CountdownLatch(int count) {
m_remain = count;
m_event = new ManualResetEvent(false);
}
public void Signal() {
// The last thread to signal also sets the event.
if (Interlocked.Decrement(ref m_remain) == 0)
m_event.Set();
}
public void Wait() {
m_event.WaitOne();
}
}
|
这看上去极为简单,但要正确运用还需要技巧。稍后我们将通过一些示例来讲解如何使用这种数据结构。请注意,此处所示的基本实现还有很多可以改进的地方,例如:在事件上调用 WaitOne 之前添加某种程度的旋转等待、缓慢分配事件而不是在构造器中进行分配(以防足够的旋转会避免出现阻塞,如本专栏稍后介绍的 ThinEvent 演示的那样)、添加重置功能以及提供 Dispose 方法(以便在不再需要内部事件对象时将对象关闭)。这些都是留给读者作为练习之用。
可重用旋转等待 (Spin Wait)
虽然忙碌等待 (busy waiting) 更容易实现阻塞,但在某些情况下,您也许的确想在退回到真正的等待状态前先旋转 (spin) 一段时间。我们很难理解为何这样做会有帮助,而大多数人之所以一开始就避免旋转等待,是因为旋转看上去像是在做无用功;如果上下文切换(每当线程等待内核事件时都会发生)需要几千个周期(在 Windows 上确实是这样),我们称之为 c,并且线程所等待的条件出现的时间少于 2c 周期时间(1c 用于等待自身,1c 用于唤醒),则旋转可以降低等待所造成的系统开销和滞后时间,从而提升算法的整体吞吐量和可伸缩性。
如果您决定使用旋转等待,就必须谨慎行事。因为如果这样做,您可能需要注意很多问题,比如:要确保在旋转循环内调用 Thread.SpinWait,以提高 Intel 超线程技术的计算机上硬件对其他硬件线程的可用性;偶尔使用参数 1 而非 0 来调用 Thread.Sleep,以避免优先级反向问题;通过轻微的回退 (back-off) 来引入随机选择,从而改善访问的局部性(假定调用方持续重读共享状态)并可能避免活锁;当然,在单 CPU 的计算机最好不要采用这种方法(因为在这种环境下旋转是非常浪费资源的)。
SpinWait 类需要被定义为值类型,以便分配起来更加节省资源(请参见图 2)。现在,我们可以使用此算法来避免前述 CountdownLatch 算法中出现的阻塞。

Figure 2 SpinWait
|
public struct SpinWait {
private int m_count;
private static readonly bool s_isSingleProc =
(Environment.ProcessorCount == 1);
private const int s_yieldFrequency = 4000;
private const int s_yieldOneFrequency = 3*s_yieldFrequency;
public int Spin() {
int oldCount = m_count;
// On a single-CPU machine, we ensure our counter is always
// a multiple of ‘s_yieldFrequency’, so we yield every time.
// Else, we just increment by one.
m_count += (s_isSingleProc ? s_yieldFrequency : 1);
// If not a multiple of ‘s_yieldFrequency’ spin (w/ backoff).
int countModFrequency = m_count % s_yieldFrequency;
if (countModFrequency > 0)
Thread.SpinWait((int)(1 + (countModFrequency * 0.05f)));
else
Thread.Sleep(m_count <= s_yieldOneFrequency ? 0 : 1);
return oldCount;
}
private void Yield() {
Thread.Sleep(m_count < s_yieldOneFrequency ? 0 : 1);
}
}
|
|
private const int s_spinCount = 4000;
public void Wait() {
SpinWait s = new SpinWait();
while (m_remain > 0) {
if (s.Spin() >= s_spinCount) m_event.WaitOne();
}
}
|
不可否认,选择频率和旋转计数是不确定的。与 Win32 临界区旋转计数类似,我们应该根据测试和实验的结果来选择合理的数值,而且即使合理的数值在不同系统中也会发生变化。例如,根据 Microsoft Media Center 和 Windows kernel 团队的经验,MSDN 文档建议临界区旋转计数为 4,000 ,但您的选择可以有所不同。理想的计数取决于多种因素,包括在给定时间等待事件的线程数和事件出现的频率等。大多数情况下,您会希望通过等待事件来消除显式让出时间,如锁存的示例中所示。
您甚至可以选择动态调整计数:例如,从中等数量的旋转开始,每次旋转失败就增加计数。一旦计数达到预定的最大值,就完全停止旋转并立即发出 WaitOne。逻辑如下所示:您希望立即增加达到预定的最大周期数,但却无法超过最大周期数。如果您发现此最大值不足以阻止上下文切换,那么立即执行上下文切换总的算来占用的资源更少。慢慢您就会希望旋转计数能够达到一个稳定的值。
屏障 (Barrier)
屏障,又称集合点,是一种并发性基元,它无需另一“主”线程控制即可实现各线程之间简单的互相协调。每个线程在到达屏障时都会不可再分地发出信号并等待。仅当所有 n 都到达屏障时,才允许所有线程继续。这种方法可用于协调算法 (cooperative algorithms),该算法广泛应用于科学、数学和图形领域。很多计算中都适合使用屏障,实际上,甚至 CLR 的垃圾收集器都在使用它们。屏障只是将较大的计算分割为若干较小的协作阶段 (cooperative phase),例如:
|
const int P = ...;
Barrier barrier = new Barrier(P);
Data[] partitions = new Data[P];
// Running on ‘P’ separate threads in parallel:
public void Body(int myIndex) {
FillMyPartition(partitions[myIndex]);
barrier.Await();
ReadOtherPartition(partitions[P – myIndex - 1]);
barrier.Await();
// ...
}
|
您会很快发现在这种情况下是可以使用倒计数锁存的。每个线程都可以在调用 Signal 后立即调用 Wait,而不是调用 Await;在到达屏障后,所有线程都会被释放。但这里存在一个问题:前面所讲的锁存并不支持多次重复使用同一对象,而这却是所有屏障都支持的一个常用属性。实际上,上述示例就需要使用此属性。您可以通过单独的屏障对象来实现这一点,但这样做非常浪费资源;而由于所有线程每次只出现在一个阶段,因此根本无需多个屏障对象。
要解决这个问题,您可以使用相同的基础倒计数锁存算法来处理减少计数器计数、发信号指示事件、等待等方面的问题,从而将其扩展为可重复使用。要实现这一点,您需要使用所谓的感应反向屏障 (sense reversing barrier),该屏障需要在“偶数”和“奇数”阶段之间交替。在交替阶段需要使用单独的事件。图 3 是这类 Barrier 数据结构实现过程的一个示例。

Figure 3 屏障
|
using System;
using System.Threading;
public class Barrier {
private volatile int m_count;
private int m_originalCount;
private EventWaitHandle m_oddEvent;
private EventWaitHandle m_evenEvent;
private volatile bool m_sense = false; // false==even, true==odd.
public Barrier(int count) {
m_count = count;
m_originalCount = count;
m_oddEvent = new ManualResetEvent(false);
m_evenEvent = new ManualResetEvent(false);
}
public void Await() {
bool sense = m_sense;
// The last thread to signal also sets the event.
if (m_count == 1 || Interlocked.Decrement(ref m_count) == 0) {
m_count = m_originalCount;
m_sense = !sense; // Reverse the sense.
if (sense == true) { // odd
m_evenEvent.Reset();
m_oddEvent.Set();
} else { // even
m_oddEvent.Reset();
m_evenEvent.Set();
}
} else {
if (sense == true) m_oddEvent.WaitOne();
else m_evenEvent.WaitOne();
}
}
}
|
为何需要两个事件,其原因很难讲清楚。一种方法是在 Await 中先执行 Set 随后立即执行 Reset,但这很危险,并会导致死锁。原因有二:第一,另一线程的 m_count 可能已减少,但在依次快速调用 Set 和 Reset 时,计数的值还不足以达到可在事件上调用 WaitOne。第二,虽然等待的线程可能已经能够调用 WaitOne,但可报警等待(如 CLR 一贯使用的那些)可以中断等待,从而暂时从等待队列中删除等待的线程,以便运行异步过程调用 (APC)。等待的线程将始终看不到事件处于设置 (set) 状态。两种情况均会导致事件丢失,并可能导致死锁。针对奇数阶段和偶数阶段使用单独的事件即可避免这种情况。
您可能想继续像 CountdownLatch 中那样将旋转添加到 Barrier。但如果您尝试这样做,可能会遇到一个问题:一般情况下,旋转线程会开始旋转直到 m_count 归零。但通过上述实现,m_count 的值实际上永远不会为零,除非最后一个线程将其重置为 m_originalCount。这种想当然的方法将导致一个或多个线程进行旋转(永远地),而其他线程则会在下一阶段阻塞(也是永远地)。解决方法很简单。您旋转,等待感应发生变化,如图 4 中所示。

Figure 4 等待
|
public void Await() {
bool sense = m_sense;
// The last thread to signal also sets the event.
if (m_count == 1 || Interlocked.Decrement(ref m_count) == 0) {
m_count = m_originalCount;
m_sense = !sense; // Reverse the sense.
if (sense == true) { // odd
m_evenEvent.Set();
m_oddEvent.Reset();
} else { // even
m_oddEvent.Set();
m_evenEvent.Reset();
}
} else {
SpinWait s = new SpinWait();
while (sense == m_sense) {
if (s.Spin() >= s_spinCount) {
if (sense == true) m_oddEvent.WaitOne();
else m_evenEvent.WaitOne();
}
}
}
}
|
由于所有线程都必须离开前一阶段的 Await 才可以完成下一阶段,因此可以确定,所有线程要么会观察到感应发生变化,要么最终等待事件并被唤醒。
阻塞队列
在共享内存的体系结构中,两项或多项任务间唯一的同步点通常是一个位于中枢的共享集合的数据结构。通常,一项或多项任务会负责为其他一项或多项任务生成要执行的“工作”,我们称之为生产者/使用者关系 (producer/consumer)。这类数据结构的简单同步通常是非常简单的 — 使用 Monitor 或 ReaderWriterLock 即可解决,但当缓冲区为空时,任务间的协调就会变得比较困难。要解决这个问题,通常需要使用阻塞队列。
实际上,阻塞队列有几种稍微不同的变体,包括简单变体和复杂变体。在简单变量中,使用者仅在队列为空时才会阻塞,而在复杂变量中,每个生产者都正好“配有”一个使用者,也就是说,在使用者到达并开始处理排队项目之前,生产者会被阻塞,同理,在生产者交付项目前,所有使用者也会被阻塞。这时通常使用 FIFO(先进先出)排序,但我们并不总是有必要这么做。我们也可以对缓冲区进行限制,这一点稍后会为大家介绍。由于稍后将要介绍的受限缓冲区包含更为简单的“为空时阻塞”(blocking-when-empty) 行为,因此我们这里仅对配对变量做一了解。
要实现这个目的,我们只需封装一个简单的 Queue<T>,上方保持同步。那么到底需要何种同步?每当线程对元素进行排队时,在返回前都会等待使用者取消元素排队。当线程取消元素排队时,如果发现缓冲区为空,则必须等待新元素的进入。当然,取消排队后,使用者必须通知生产者已取到其项目(请参见图 5)。

Figure 5 阻塞队列
|
class Cell<T> {
internal T m_obj;
internal Cell(T obj) { m_obj = obj; }
}
public class BlockingQueue<T> {
private Queue<Cell<T>> m_queue = new Queue<Cell<T>>();
public void Enqueue(T obj) {
Cell<T> c = new Cell<T>(obj);
lock (m_queue) {
m_queue.Enqueue(c);
Monitor.Pulse(m_queue);
Monitor.Wait(m_queue);
}
}
public T Dequeue() {
Cell<T> c;
lock (m_queue) {
while (m_queue.Count == 0)
Monitor.Wait(m_queue);
c = m_queue.Dequeue();
Monitor.Pulse(m_queue);
}
return c.m_obj;
}
}
|
请注意,我们可以在 Enqueue 方法中依次调用 Pulse 和 Wait,类似地,在 Dequeue 方法中依次调用 Wait 和 Pulse。只有在释放监视器之后,才会对内部事件进行设置,而由于监视器的这种实现方法,会导致某个线程调度 ping-pong。我们可能会想,也许可以使用 Win32 事件来构建一个更加优化的通知机制。但是,使用这类完善的 Win32 事件会带来巨大开销,尤其是使用它们时所进行的成本分配和内核跳转 (kernel transitions),因此还是考虑其他选择吧。您可以像 CLR 的 ReaderWriterLock 那样将这些时间集中到池中,也可以像 ThinEvent 类型(稍后介绍)一样缓慢分配它们。这种实现方法也是有弊端的,即要为每个新元素分配对象,备选方法可能也会将这些对象加入到池中,但同样会带来其他麻烦。
受限缓冲区 (Bounded Buffer)
某些类别的队列中会出现资源使用问题。如果生产者任务创建项目的速度快于使用者处理项目的速度,则系统的内存使用将不受限制。
为了说明这个问题,我们假设一个系统,单个生产者每秒钟可将 50 个项目排入队列,而使用者每秒钟只能使用 10 个项目。首先,这样会打乱系统的平衡性,而且对于一对一的生产者 — 使用者配置无法进行很好的调整。只需一分钟,就会有 2,400 个项目堆积在缓冲区中。假设这些项目中每个项目使用 10KB 内存,那将意味着缓冲区本身就会占用 24MB 内存。一小时后,这个数字将飙升至 1GB 以上。解决这一问题的一个方法是调整生产者线程与使用者线程的比例,在上述情况中,要将比例调整为一个生产者对应五个使用者。但是到达速度通常是不稳定的,这会导致系统周期性的不稳定,并带来一些明显的问题,简单的固定比例是无法解决这个问题的。
服务器上的程序通常是长时间运行的,人们希望程序能够长期保持良好的运行状态,但如果内存使用不受限,就会造成不可避免的混乱,还可能导致必须定期回收服务器进程的情况。
受限缓冲区允许您对生产者强制阻塞前缓冲区可能达到的大小进行限制。阻塞生产者可让使用者有机会“赶上”(通过允许其线程接收调度时间片),同时还能够解决内存使用量增加的问题。我们的方法还是仅封装 Queue<T>,同时添加两个等待条件和两个事件通知条件:生产者在队列满时等待,直至队列变为非满,而使用者在队列空时等待,直至变为非空;生产者在生产出项目后通知等待的使用者,而使用者也会在取走项目后通知生产者,如图 6 中所示。

Figure 6 受限缓冲区 (Bounded Buffer)
|
public class BoundedBuffer<T> {
private Queue<T> m_queue = new Queue<T>();
private int m_consumersWaiting;
private int m_producersWaiting;
private const int s_maxBufferSize = 128;
public void Enqueue(T obj) {
lock (m_queue) {
while (m_queue.Count == (s_maxBufferSize - 1)) {
m_producersWaiting++;
Monitor.Wait(m_queue);
m_producersWaiting--;
}
m_queue.Enqueue(obj);
if (m_consumersWaiting > 0)
Monitor.PulseAll(m_queue);
}
}
public T Dequeue() {
T e;
lock (m_queue) {
while (m_queue.Count == 0) {
m_consumersWaiting++;
Monitor.Wait(m_queue);
m_consumersWaiting--;
}
e = m_queue.Dequeue();
if (m_producersWaiting > 0)
Monitor.PulseAll(m_queue);
}
return e;
}
}
|
这里我们再一次使用了一种比较天真的方法。但是我们确实优化了对 PulseAll 的调用(因为它们耗费的资源很多),方法是使用 m_consumersWaiting 和 m_producersWaiting 这两个计数器并仅就计数器值是否为零发出信号。除此以外,我们还可以再做进一步的改进。例如,共享与此类似的单个事件可能会唤醒过多线程:如果使用者将队列大小降为 0,并且生产者和使用者同时处于等待状态,显然您只希望唤醒生产者(至少开始时要这么做)。此实现将以先进先出的规则处理所有等待者,这意味着您可能需要在任一生产者运行前唤醒使用者,这样做仅仅是为了让使用者发现队列为空,然后再次等待。令人欣慰的是,最终出现生产者和使用者同时等待的情况是很少见的,但其出现也是有规律的,而且受限区域较小。
Thin 事件
与 Monitor.Wait、Pulse 和 PulseAll 相比,Win32 事件有一个很大的优势:它们是“粘滞的”(sticky)。也就是说,一旦有事件收到信号,任何后续等待都将立即取消阻止,即使线程在信号发出前尚未开始等待。如果没有这个功能,您就需要经常编写一些代码将所有等待和信号通知严格地限制在临界区域内,由于 Windows 计划程序总是会提升已唤醒线程的优先级,因此这些代码的效率很低;如果不采取这种方法,您就必须使用某种技巧型很强、容易出现竞态条件的代码。
作为替代方法,您可以使用“thin 事件”,这是一种可重用数据结构,可在阻塞前短暂地旋转,仅在必要时才缓慢分配 Win32 事件,否则允许您执行类似手动重置事件的行为。换句话说,它可以对那些复杂的容易导致竞态条件的代码进行封装,这样您就不必在整个基本代码内散播它。此示例依赖于
Vance Morrison 的文章中描述的一些内存模型保证,使用的时候要多加小心(请参见
图 7)。

Figure 7 Thin 事件
|
public struct ThinEvent {
private int m_state; // 0 means unset, 1 means set.
private EventWaitHandle m_eventObj;
private const int s_spinCount = 4000;
public void Set() {
m_state = 1;
Thread.MemoryBarrier(); // required.
if (m_eventObj != null)
m_eventObj.Set();
}
public void Reset() {
m_state = 0;
if (m_eventObj != null)
m_eventObj.Reset();
}
public void Wait() {
SpinWait s = new SpinWait();
while (m_state == 0) {
if (s.Spin() >= s_spinCount) {
if (m_eventObj == null) {
ManualResetEvent newEvent =
new ManualResetEvent(m_state == 1);
if (Interlocked.CompareExchange<EventWaitHandle>(
ref m_eventObj, newEvent, null) == null) {
// If someone set the flag before seeing the new
// event obj, we must ensure it’s been set.
if (m_state == 1)
m_eventObj.Set();
} else {
// Lost the race w/ another thread. Just use
// its event.
newEvent.Close();
}
}
m_eventObj.WaitOne();
}
}
}
}
|
这基本上反映了 m_state 变量中的事件状态,其中值为 0 表示未设置,值为 1 表示已设置。现在,等待一个已设置事件耗费资源是很低的;如果 m_state 在 Wait 例程的入口处值为 1,我们会立即返回,无需任何内核跳转。但如果线程在事件设置完毕之前进入等待状态,处理上就需要很多技巧。要等待的首个线程必须分配一个新的事件对象,并对其进行比较后交换至 m_eventObj 字段中;如果 CAS 失败,则意味着其他等待者对事件进行了初始化,所以我们只可重复使用它;否则就必须重新检查自上次看到 m_state 后其值是否发生更改。不然的话,m_state 的值也许会为 1,那么 m_eventObj 就无法收到信号通知,这将导致在调用 WaitOne 时出现死锁。调用 Set 的线程必须首先设置 m_state,随后如果发现值为非空的 m_eventObj,就会调用其上的 Set。这里需要两个内存屏障:需要注意的是切不可提前移动 m_state 的第二次读取,我们会使用 Interlocked.CompareExchange 设置 m_eventObj 来保证这点;在写入 m_eventObj 之前,不可移动 Set 中的对 m_eventObj 的读取(这是一种在某些 Intel 和 AMD 处理器以及 CLR 2.0 内存模型上出现的奇怪的合法转换,并未显式调用 Thread.MemoryBarrier)。并行重置事件通常是不安全的,因此调用方需要进行额外的同步化。
现在您可以轻松在其他地方使用它,例如在上述的 CountdownLatch 示例和队列示例中,而且通常这样做可以大幅度提升性能,尤其是当您可以巧妙地运用旋转时。
我们上面介绍了一个技巧性很强的实现方式。请注意,您可以使用单个标志和监视器来实现自动和手动重置类型,这远比本示例简单(但效率方面有时会不及本例)。
无锁定 LIFO 堆栈
使用锁定构建线程安全的集合是相当直接明了的,即使限制和阻塞方面会有些复杂(如上面看到的)。但是,当所有协调均通过简单的后进先出 (LIFO) 堆栈数据结构实现时,使用锁定的开销会比实际所需的高。线程的临界区(即保持锁定的时间)有开始点和结束点,其持续时间可能会跨越许多指令的有效期。保持锁定可阻止其他线程同时进行读取和写入操作。这样做可以实现序列化,这当然是我们所想要的结果,但严格地讲,这种序列化要比我们所需的序列化强很多。我们的目的是要在堆栈上推入和弹出元素,而这两项操作只需通过常规读取和单一的比较-交换写入即可实现。我们可以利用这点来构建伸缩性更强的无锁定堆栈,从而不会强制线程进行没必要的等待。
我们的算法工作方式如下。使用有链接的列表代表堆栈,其标头代表堆栈的顶端,并存储在 m_head 字段中。在将一个新项推入堆栈时,要构造一个新节点,其值等于您要推入堆栈的值,然后从本地读取 m_head 字段,并将其存储至新节点的 m_next 字段,随后执行不可再分的 Interlocked.CompareExchange 来替换堆栈当前的头部。如果顶端自首次读取后在序列中的任何点发生更改,CompareExchange 都会失败,并且线程必须返回循环并再次尝试整个序列。弹出同样是比较直截了当的。读取 m_head 并尝试将其与 m_next 引用的本地副本交换;如果失败,您只需一直尝试,如图 8 中所示。Win32 提供了一种名为 SList 的类比数据结构,其构建方法采用了类似的算法。

Figure 8 无锁定堆栈
|
public class LockFreeStack<T> {
private volatile StackNode<T> m_head;
public void Push(T item) {
StackNode<T> node = new StackNode<T>(item);
StackNode<T> head;
do {
head = m_head;
node.m_next = head;
} while (m_head != head || Interlocked.CompareExchange(
ref m_head, node, head) != head);
}
public T Pop() {
StackNode<T> head;
SpinWait s = new SpinWait();
while (true) {
StackNode<T> next;
do {
head = m_head;
if (head == null) goto emptySpin;
next = head.m_next;
} while (m_head != head || Interlocked.CompareExchange(
ref m_head, next, head) != head);
break;
emptySpin:
s.Spin();
}
return head.m_value;
}
}
class StackNode<T> {
internal T m_value;
internal StackNode<T> m_next;
internal StackNode(T val) { m_value = val; }
}
|
请注意,这是理想的并发控制的一种形式:无需阻止其他线程存取数据,只需抱着会在争用中“胜出”的信念继续即可。如果事实证明无法“胜出”,您将会遇到一些变化不定的问题,例如活锁。这种设计方案还表明您无法确保能够实现 FIFO 调度。根据概率统计,系统中所有线程都将向前推进。而实际上从整体来看,系统进度总是向前推进的,因为其中一个线程的失败总是意味着至少有一个其他线程是推进的(这是调用该“无锁定”的一个条件)。有些情况下,当 CompareExchange 无法避免 m_head 大量争用内存时,使用指数回退算法会很有用。
同时,我们还对堆栈变空的情况采取了相当直截了当的方法。我们只是一直旋转,等待有新项目推入。将 Pop 重新写入处于非等待状态的 TryPop 是很简单易懂的,但是要利用事件进行等待则会有一些复杂。这两个功能都是十分重要的,所以留给那些喜欢动手的读者作为练习之用。
我们为每个 Push 都分配了对象,这让我们无需担心所谓的 ABA 问题。在内部重复使用已从列表中弹出的节点就会导致 ABA 问题。开发人员有时会尝试将节点集中到池中以减少对象分配的数目,但这样做是有问题的:结果会是,即使对 m_head 执行了大量干扰性写入操作,一个不可再分割的操作也可以实现,但实际上却是不正确的。(例如:线程 1 读取节点 A,然后由线程 2 将节点 A 删除并置入池中;线程 2 将节点 B 作为新的头推入,然后节点 A 从池中返回到线程 2 并被推入;随后,线程 1 会成功执行 CompareExchange,但现在作为头的 A 已经与先前所读取的不同了。)尝试以本机 C/C++ 编写此算法时也会出现类似问题;因为一旦地址被释放,内存分配器就会立即重复使用这些地址,节点会被弹出并释放,然后该节点的地址会被用于新的节点分配,结果导致与上面相同的问题。接下来我们不再讨论 ABA,有关它的详细介绍我们可以从其他的资源获得。
最后,可以使用类似的无锁定技术编写一个 FIFO 队列。它的优势在于并行推入和弹出的线程之间并不一定发生冲突,而在上述 LockFreeStack 中,推入者和弹出者始终会争用同一 m_head 字段。然而,这种算法相当复杂,如果您好奇的话,我推荐您阅读 Maged M. Michael 和 Michael L. Scott 在 1996 年撰写的文章“
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms”。
循环分块
循环分块是指对循环的输入范围或数据进行分区,并将每个分区分配给单独的线程以实现并发。这是某些编程模型(例如 OpenMP)实现并行性的一项基本技术(请参阅
Kang Su Gatlin 的《MSDN 杂志》文章),通常称为并行 Forall 循环,是从高性能的 FORTRAN 术语中得到启发而创造的。无论如何,范围都只是一组索引:
|
for (int i = 0; i < c; i++) { ... }
|
或者是一组数据:
|
foreach (T e in list) { ... }
|
我们都可以设计分区技术以提供分块的循环。
您可能想应用多种特定于数据结构的分区技术,这些技术确实很多,本专栏无法一一列举。因此这里我们只关注一种常用技术,可用于将数组中各种非连续范围的元素分配到每个分区。我们只需计算跨距的值(它大约等于元素数目除以分区数目的值),并用它来计算连续范围(参见图 9)。当然尽管其他方法是有效、有用并在某些情况下有必需的,但当输入内容为值类型数组时,此方法可以实现较好的空间局部性。

Figure 9 循环分块
|
public static void ForAll(int from, int to, Action<int> a, int p) {
ForAll<int>(null, from, to, null, a, p);
}
public static void ForAll<T>(IList<T> data, Action<T> a, int p) {
ForAll<T>(data, 0, data.Count, a, null, p);
}
private static void ForAll<T>(IList<T> data, int from, int to,
Action<T> a0, Action<int> a1, int p) {
int size = from - to;
int stride = (size + p - 1) / p;
CountdownLatch latch = new CountdownLatch(p);
for (int i = 0; i < p; i++) {
int idx = i;
ThreadPool.QueueUserWorkItem(delegate {
int end = Math.Min(size, stride * (idx + 1));
for (int j = stride * idx; j < end; j++) {
if (data != null) a0(data[j]);
else a1(j);
}
latch.Signal();
});
}
latch.Wait();
}
|
我们这里提供了两种公共版的 ForAll:一种接受一定范围内的数字,另一种接受 IList<T>(类似于 C# 中的 Foreach 循环)。两者都转发到同一帮助程序重载,重载可调用从给定索引的列表传递元素这一操作,或者传递索引本身。您应使用第一个重载(通常在此加入一个普通 For 循环)。例如,下面的代码
|
for (int i = 0; i < 10; i++) { S; }
|
将变为:
|
Parallel.ForAll(0, 10, delegate(int i){ S; },
Environment.ProcessorCount);
|
现在可以使用第二个重载(通常在此加入一个 C# foreach 循环),这样,
|
List<T> list = ...;
foreach (T e in list) { S; }
|
将变为:
|
Parallel.ForAll(list, delegate(T e) { S; },
Environment.ProcessorCount);
|
您需要谨慎以确保 S 中的语句不会写入共享内存;否则您需要向并行版本添加合适的同步操作。当然可以编写相应版本来支持 IEnumerable<T>、以其他方式对迭代空间进行分区等(为了节省空间,本专栏省略了这些内容)。在本示例中,在 n 项子任务的持续时间内调用线程被“浪费”了。更好的方法是使用调用线程来运行其中一项任务,然后在其完成时加入其他任务。没有必要通过扩展 ForAll 方法来执行此操作。
并行分拆
有一类操作可使用分拆(又称为折叠或聚合)来执行,其中会以某种方式将许多值进行组合以便生成单一输出。一般分拆的工作方式如下。使用一个二元运算符(即包含两个参数的函数),并对照向量或一组大小为 n 的元素从左至右对其进行计算。对于 j = 0 至 n – 1,调用该二元运算符,作为输入传递至 jth 迭代,并将在元素 j – 1 上调用运算符得到的输出作为第一个参数,将 jth 元素本身作为第二个参数。由于没有预先给定的值可用,因此会将一个特殊的种子值用作第 0 个元素的第一个参数。然后使用最终(可选)结果选择器将中间值转换为最终结果。
我们来看一个示例。
如果二元运算符为 +,输入为包含 5 个元素 {1, 2, 3, 4, 5} 的向量,那么展开的计算应类似于 ((((1 + 2) + 3) + 4) + 5)。
如果将此展开式转换为函数调用格式,则类似于以下形式(假定种子值为 0):+(+(+(+(+(0, 1), 2), 3), 4), 5) 换句话说,您只需计算所有输入数字的总和即可。这称为总和分拆。有种直接的方法可以将这种一般化的算法转换为串行算法,如下所示:
|
delegate T Func<T>(T arg0, T arg1);
T Reduce<T>(T[] input, T seed, Func<T> r) {
T result = seed;
foreach (T e in input)
result = r(result, e);
return result;
}
|
调用它时,您只需求得一组数字的和即可,如下所示(在 C# 3.0 中):
|
int[] nums = ... some set of numbers ...;
int sum = Reduce(nums, 0, (x,y) => x + y;);
|
所有这些内容都很抽象,但除了求和之外,还有很多操作也可以通过拆分表示,如
图 10 中所示。

Figure 10 表示为拆分的操作
| |
种子 |
二元运算符 |
结果选择器 |
| 计数 |
0 |
(a, b) => a + 1 |
N/A |
| 和 |
0 |
(a, b) => a + b |
N/A |
| 最小值 |
NaN |
(a, b) => a < b ? a :b |
N/A |
| 最大值 |
NaN |
(a, b) => a > b ? a :b |
N/A |
| 平均 |
{ 0, 0 } |
(a, b) => new { a[0] + b, a[1] + 1 } |
(a) => a[0] / a[1] |
如果给定上面的 nums 数组,我们就可以使用拆分例程找到数组的最小值和最大值:
|
int min = Reduce(nums, int.MaxValue, (x,y) => x < y ? x : y;);
int max = Reduce(nums, int.MinValue, (x,y) => x > y ? x : y;);
|
(省略 Count 是因为必须对部分结果求和,而这需要两个单独的二元运算符;省略 Average 是因为它还需要一些其他步骤。)
您可以使用与讨论循环分块时论述的技术相类似的技术来分割输入数据和并行执行拆分。每个分区都将计算其自己的中间值,然后我们会使用与计算中间值相同的运算符把这些值合并为单个最终值。这样为何可行呢?因为上述的所有操作都是相关联的;回想一下初等数学中学过的知识:关联性二元运算符 + 意味着 (a + b) + c = a + (b + c);也就是说,计算顺序不会影响计算结果的正确性。
例如,考虑一下总和拆分。如果您将输入数据 { 1, 2, 3, 4 } 分为两个分区({1, 2} 和 {3, 4}),那么由于 + 是关联的,因此当您将各独立求和的结果加到一起时,结果是保持不变:(1 + 2) + (3 + 4) = ((((1 + 2) + 3) + 4). 实际上,任何非连续的输入分区都会产生正确的结果。图 11 所展示的一般化 Reduce 方法将种子值和二元运算符作为参数,它使用的是前面所述的跨距分区法。

Figure 11 Reduce 方法
|
public delegate T Func<T>(T arg0, T arg1);
public static T Reduce<T>(IList<T> data, int p, T seed, Func<T> r){
T[] partial = new T[p];
int stride = (data.Count + p - 1) / p;
CountdownLatch latch = new CountdownLatch(p);
for (int i = 0; i < p; i++) {
int idx = i;
ThreadPool.QueueUserWorkItem(delegate {
// Do the ‘ith’ intermediate reduction in parallel.
partial[idx] = seed;
int end = Math.Min(data.Count, stride * (idx + 1));
for (int j = stride * idx; j < end; j++)
partial[idx] = r(partial[idx], data[j]);
latch.Signal();
});
}
latch.Wait();
// Do the final reduction on the master thread.
T final = seed;
for (int i = 0; i < p; i++)
final = r(final, partial[i]);
return final;
}
|
主线程会分出一组 p 工作线程,每个线程都会对其自己数据的分区执行拆分操作并将值置入其在中间值数组中的专属空位,来计算出一个中间结果。然后,主线程会等待所有子线程完成(使用前述的 CountdownLatch),然后将每个子线程产生的部分结果用作输入来执行最终的拆分。在文案中,树拆分相当常见,其中树中的每个节点会对特定数目的中间结果执行部分拆分。虽然这在理论上会产生伸缩性更强的算法,但是鉴于您要处理的线程数以及要在 Windows 上耗费的相关同步化成本,实际情况是串行拆分(如前所示)对 p 的公用值和二元运算符的执行效果更好。
此处还有大量的优化空间(例如对其中一个分区重复使用调用线程),但示例应该很好地说明了这一点。我们需要稍有不同的 API 才可支持“求平均值”这类操作,因为中间值拆分操作对于中间拆分和最终拆分两个阶段是有所不同的,而且需要具有最终结果选择例程。这是一个相当简单的练习。
总结
在本专栏中,我向您介绍了一些低级别并行数据结构和算法,可帮助您编写可利用多处理器和多核架构的托管代码。对于所有编程来说,会容易形成抽象层,而对性能影响最为至关重要的抽象通常驻留在底部。完全可以说,本专栏中介绍的很多技术都是最基本的技术,它们可为更高级抽象和特定于应用程序的并行码提供基础。虽然选择正确的数据结构和基本算法只是漫长流程中的一步,但我希望您能对并行编程技术有更深刻的理解,它会在这个新的编程领域助您一臂之力。
将您的问题和意见发送至:clrinout@microsoft.com clrinout@microsoft.com.
Joe Duffy在 Microsoft 从事并行编程模型和基础架构方面的工作,并定期将相关内容发布在
www.bluebytesoftware.com/blog 上。本专栏中的某些代码示例就源自他即将问世的新书《Concurrent Programming on Windows》
,该书将由 Addison Wesley 于 2007 年出版发行。