Share via


How to: Synchronize a Producer and a Consumer Thread (C# Programming Guide) 

The following example demonstrates thread synchronization between the primary thread and two worker threads using the lock keyword, and the AutoResetEvent and ManualResetEvent classes. For more information, see lock Statement (C# Reference).

The example creates two auxiliary, or worker threads. One thread produces elements and stores them in a generic queue that is not thread-safe. For more information, see Queue. The other thread consumes items from this queue. In addition, the primary thread periodically displays the contents of the queue, so the queue is accessed by three threads. The lock keyword is used to synchronize access to the queue to insure that the state of the queue is not corrupted.

In addition to simply preventing simultaneous access with the lock keyword, further synchronization is provided by two event objects. One is used to signal the worker threads to terminate, and the other is used by the producer thread to signal to the consumer thread when a new item has been added to the queue. These two event objects are encapsulated in a class called SyncEvents. This allows the events to be passed to the objects that represent the consumer and producer threads easily. The SyncEvents class is defined like this:

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}

The AutoResetEvent class is used for the "new item" event because you want this event to reset automatically each time the consumer thread responds to this event. Alternatively, the ManualResetEvent class is used for the "exit" event because you want multiple threads to respond when this event is signaled. If you used AutoResetEvent instead, the event would revert to a non-signaled state after just one thread responded to the event. The other thread would not respond, and in this case, would fail to terminate.

The SyncEvents class creates the two events, and stores them in two different forms: as EventWaitHandle, which is the base class for both AutoResetEvent and ManualResetEvent, and in an array based on WaitHandle. As you will see in the consumer thread discussion, this array is necessary so that the consumer thread can respond to either event.

The consumer and producer threads are represented by classes named Consumer and Producer, both of which define a method called ThreadRun. These methods are used as the entry points for the worker threads that the Main method creates.

The ThreadRun method defined by the Producer class looks like this:

// Producer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    Random r = new Random();
    while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            while (_queue.Count < 20)
            {
                _queue.Enqueue(r.Next(0,100));
                _syncEvents.NewItemEvent.Set();
                count++;
            }
        }
    }
    Console.WriteLine("Producer thread: produced {0} items", count);
}

This method loops until the "exit thread" event becomes signaled. The state of this event is tested with the WaitOne method, using the ExitThreadEvent property defined by the SyncEvents class. In this case the state of the event is checked without blocking the current thread because the first argument used with WaitOne is zero, indicating that the method should return immediately. If WaitOne returns true, then the event in question is currently signaled. If so, the ThreadRun method returns, which has the effect of terminating the worker thread executing this method.

Until the "exit thread" event is signaled, the Producer.ThreadStart method attempts to keep 20 items in the queue. An item is simply an integer between 0 and 100. The collection must be locked before adding new items to prevent the consumer and primary threads from accessing the collection simultaneously. This is accomplished with the lock keyword. The argument passed to lock is the SyncRoot field exposed by means of the ICollection interface. This field is provided specifically for synchronizing thread access. Exclusive access to the collection is granted for any instructions contained in the code block following lock. For each new item that the producer adds to the queue, a call to the Set method on the "new item" event is made. This signals the consumer thread to emerge from its suspended state to process the new item.

The Consumer object also defines a method called ThreadRun. Like the producer's version of ThreadRun, this method is executed by a worker thread created by the Main method. However, the consumer version of ThreadStart must respond to two events. The Consumer.ThreadRun method looks like this:

// Consumer.ThreadRun
public void ThreadRun()
{
    int count = 0;
    while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
    {
        lock (((ICollection)_queue).SyncRoot)
        {
            int item = _queue.Dequeue();
        }
        count++;
    } 
    Console.WriteLine("Consumer Thread: consumed {0} items", count);
}

This method uses WaitAny to block the consumer thread until any of the wait handles in the provided array become signaled. In this case there are two handles in the array, one for terminating the worker threads, and one for indicating that a new item has been added to the collection. WaitAny returns the index of the event that became signaled. The "new item" event is the first in the array, so an index of zero indicates a new item. In this case check for an index of 1, which indicates the "exit thread" event, and this is used to determine whether this method continues to consume items. If the "new item" event was signaled, you get exclusive access to the collection with lock and consume the new item. Because this example produces and consumes thousands of items, you do not display each item consumed. Instead use Main to periodically display the contents of the queue, as will be demonstrated.

The Main method begins by creating the queue whose contents will be produced and consumed and an instance of SyncEvents, which you looked at earlier:

Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();

Next, Main configures the Producer and Consumer objects for use with worker threads. This step does not, however, create or launch the actual worker threads:

Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);

Notice that the queue and the synchronization event object are passed to both the Consumer and Producer threads as constructor arguments. This provides both objects with the shared resources they need to perform their respective tasks. Two new Thread objects are then created, using the ThreadRun method for each object as an argument. Each worker thread, when started, will use this argument as the entry point for the thread.

Next Main launches the two worker threads with a call to the Start method, like this:

producerThread.Start();
consumerThread.Start();

At this point, the two new worker threads are created and begin asynchronous execution, independent of the primary thread that is currently executing the Main method. In fact, the next thing Main does is suspend the primary thread with a call to the Sleep method. The method suspends the currently executing thread for a given number of milliseconds. Once this interval elapses, Main is reactivated, at which point it displays the contents of the queue. Main repeats this for four iterations, like this:

for (int i=0; i<4; i++)
{
    Thread.Sleep(2500);
    ShowQueueContents(queue);
}

Finally, Main signals the worker threads to terminate by invoking the Set method of the "exit thread" event, and then calls the Join method on each worker thread to block the primary thread until each worker thread respond to the event and terminates.

There is one final example of thread synchronization: the ShowQueueContents method. This method, like the consumer and producer threads, uses lock to gain exclusive access to the queue. In this case, however, exclusive access is particularly important, because ShowQueueContents enumerates over the entire collection. Enumerating over a collection is an operation that is particularly prone to data corruption by asynchronous operations because it involves traversing the contents of the entire collection. The ShowQueueContents method looks like this:

syncEvents.ExitThreadEvent.Set();

producerThread.Join();
consumerThread.Join();

Finally, notice that ShowQueueContents, because it is called by Main, is executed by the primary thread. This means that this method, when it achieves exclusive access to the item queue, is in fact blocking both the producer and consumer threads from accessing the queue. ShowQueueContents locks the queue and enumerates the contents:

private static void ShowQueueContents(Queue<int> q)
{
    lock (((ICollection)q).SyncRoot)
    {
        foreach (int item in q)
        {
            Console.Write("{0} ", item);
        }
    }
    Console.WriteLine();
}

The complete example follows.

Example

using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;

public class SyncEvents
{
    public SyncEvents()
    {

        _newItemEvent = new AutoResetEvent(false);
        _exitThreadEvent = new ManualResetEvent(false);
        _eventArray = new WaitHandle[2];
        _eventArray[0] = _newItemEvent;
        _eventArray[1] = _exitThreadEvent;
    }

    public EventWaitHandle ExitThreadEvent
    {
        get { return _exitThreadEvent; }
    }
    public EventWaitHandle NewItemEvent
    {
        get { return _newItemEvent; }
    }
    public WaitHandle[] EventArray
    {
        get { return _eventArray; }
    }

    private EventWaitHandle _newItemEvent;
    private EventWaitHandle _exitThreadEvent;
    private WaitHandle[] _eventArray;
}
public class Producer 
{
    public Producer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Producer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        Random r = new Random();
        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                while (_queue.Count < 20)
                {
                    _queue.Enqueue(r.Next(0,100));
                    _syncEvents.NewItemEvent.Set();
                    count++;
                }
            }
        }
        Console.WriteLine("Producer thread: produced {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class Consumer
{
    public Consumer(Queue<int> q, SyncEvents e)
    {
        _queue = q;
        _syncEvents = e;
    }
    // Consumer.ThreadRun
    public void ThreadRun()
    {
        int count = 0;
        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
        {
            lock (((ICollection)_queue).SyncRoot)
            {
                int item = _queue.Dequeue();
            }
            count++;
        } 
        Console.WriteLine("Consumer Thread: consumed {0} items", count);
    }
    private Queue<int> _queue;
    private SyncEvents _syncEvents;
}

public class ThreadSyncSample
{
    private static void ShowQueueContents(Queue<int> q)
    {
        lock (((ICollection)q).SyncRoot)
        {
            foreach (int item in q)
            {
                Console.Write("{0} ", item);
            }
        }
        Console.WriteLine();
    }

    static void Main()
    {
        Queue<int> queue = new Queue<int>();
        SyncEvents syncEvents = new SyncEvents();

        Console.WriteLine("Configuring worker threads...");
        Producer producer = new Producer(queue, syncEvents);
        Consumer consumer = new Consumer(queue, syncEvents);
        Thread producerThread = new Thread(producer.ThreadRun);
        Thread consumerThread = new Thread(consumer.ThreadRun);

        Console.WriteLine("Launching producer and consumer threads...");        
        producerThread.Start();
        consumerThread.Start();

        for (int i=0; i<4; i++)
        {
            Thread.Sleep(2500);
            ShowQueueContents(queue);
        }

        Console.WriteLine("Signaling threads to terminate...");
        syncEvents.ExitThreadEvent.Set();

        producerThread.Join();
        consumerThread.Join();
    }

}

Sample Output

Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items

See Also

Tasks

Monitor Synchronization Technology Sample
Wait Synchronization Technology Sample

Reference

Thread Synchronization (C# Programming Guide)
lock Statement (C# Reference)
AutoResetEvent
ManualResetEvent
Set
Join
WaitOne
WaitAll
Queue
ICollection
Start
Sleep
WaitHandle
EventWaitHandle

Concepts

C# Programming Guide

Other Resources

Thread Class