CCR Coordination Primitives

Glossary Item Box

Microsoft Robotics Developer Studio Send feedback on this topic

CCR Coordination Primitives

Asynchronous programming is hard because there is no simple method to coordinate between multiple operations, deal with partial failure (one of many operations fail but others succeed) and also define execution behavior of asynchronous callbacks, so they don't violate some concurrency constraint. For example, they don't attempt to do something in parallel. The Concurrency and Coordination Runtime (CCR) enables and promotes concurrency by providing ways to express what coordination should happen. Plus, enforce the high level constraints between different code segments, all run due to some messages being received on ports.

At the most primitive level, you can run a method on a CCR thread using Spawn(Handler) which is defined in CcrServiceBase. There are a couple of additional overloads of Spawn that accept 1, 2 or 3 parameters to pass to the handler, e.g. Spawn<T0>(T0, Handler<T0>) which passes a parameter of type T0. You can call Spawn multiple times, but eventually you will use all of the available threads and they are free-running, i.e. there is no coordination amongst them apart from what they might do themselves.

It’s important to realize the relationship between asynchronous behavior and concurrency: The loose coupling, fast initiation of work, and consistent use of queues for interaction, promotes software design that scales and has well-defined dependencies. So if the drawbacks mentioned above can be addressed, it is an appropriate model for software components.

The coordination primitives provided by the CCR can be classified based on two primary use scenarios:

  1. Coordination of inbound requests, for long lived service-oriented components. A common example is a web service listening for HTTP requests on some network port, using a CCR port to post all inbound requests and attaching handlers that wake up and serve each request independent of each other. In addition, it uses some of the advanced primitives to guarantee that some handlers never run when others handlers are active.
  2. Coordination of responses from one or more outstanding requests, with multiple possible return types per response. One example is waiting for success or failure, on a PortSet associated with a pending request. When the request completes, a success item or a failure item will be posted, and the appropriate code should execute. Another example is scattering multiple requests at once. Then collecting all the responses using a single primitive, not caring what order the responses arrive across a single or multiple response ports.

Each arbiter will be described briefly, followed by a more detailed explanation in the context of the above scenarios.

Arbiter Static Class

The arbiter static class provides helper routines for creating instances of all CCR arbiter classes, in a discoverable, type-safe manner. All methods described below are members of this class. The arbiter static methods are not an exhaustive list of all the possible ways the CCR arbiters can be constructed. For more advanced configurations, each arbiter class can be constructed directly using the appropriate constructor parameters. The following list shows which CCR classes are created when invoking some of the most common arbiter class methods (this is not an exhaustive list):

  • Arbiter.FromTask -> Creates instance of Task
  • Arbiter.Choice -> Creates instance of Choice
  • Arbiter.Receive -> Creates instance of Receiver
  • Arbiter.Interleave -> Creates instance of Interleave
  • Arbiter.JoinedReceive ->Creates instance of JoinReceiver
  • Arbiter.MultipleItemReceive -> Creates instance JoinSinglePortReceiver

The above classes are described in the reference documentation and should be used for advanced scenarios when additional parameters/configurations are necessary.

Port Extension Methods

A more concise alternative to the static methods in the Arbiter class is available through the port extension methods. Using the new C# 3.0 extension method support, coordination primitives can be created by using the port instance. Most examples in the guide use the port extensions to create receivers, joins, etc. For example, get.ResponsePort.Choice() is an easy way to handle the response to a Get request to a DSS service. (DSS services expose PortSets that contain a port for the requested information and one for a Fault, so Choice is used to handle whichever message type is sent back. It effectively waits on the two ports at the same time and executes the corresponding delegate for whichever port receives a message).

Single Item Receiver

A single item receiver associates a user delegate, that takes a single parameter of type T, with an instance of Port<T>. If the persist option is true, the receiver will execute an instance of the user delegate for every item posted. If the persist option is false, the receiver will only execute the user delegate for one item and then un-register from the port.

Bb648749.hs-note(en-us,MSDN.10).gif If items are already queued on the Port

Example 7

var port = new Port<int>();
Arbiter.Activate(_taskQueue,
   Arbiter.Receive(
       true,
       port,
       item => Console.WriteLine(item)
   )
);

// post item, so delegate executes
port.Post(5);

Example 7 shows how to create a Port<int> instance and then activate a single item receiver. This receiver will execute the user delegate every time an item is posted on the port. Notice that the receiver is persisted and it will be active on the port until the port instance is garbage collected.

Also in the example notice the abbreviated syntax for an anonymous delegate. The statement:

       item => Console.WriteLine(item)

creates an anonymous method with one parameter (called item) that executes a single line of code to print the value on the console. If you are not an experienced C# programmer then this syntax might be new to you. An alternative using the older anonymous delegate style of syntax is: FakePre-7b6c0cf41b31468eb8a3ff4f5bef5559-d43cd72276ba42b5b514a72071e2012a

 

Example 8

// alternate version that explicitly constructs a Receiver by passing
// Arbiter class factory methods
var persistedReceiver = new Receiver<int>(
       true, // persisted
       port,
       null, // no predicate
       new Task<int>(item => Console.WriteLine(item)) // task to execute
    );
Arbiter.Activate(_taskQueue, persistedReceiver);            

Example 8 has exactly the same effect at runtime as example 7, but shows that the Arbiter.Receive() method is really just a thin wrapper around the constructor of the Receiver arbiter.

Choice Arbiter

The choice arbiter only executes one of its branches, and then, atomically (in one step that cant be interrupted) removes all other nested arbiters from their ports. This guarantees that only one branch of the choice will ever run and is a common way to express branching behavior, deal with responses that have success/failure, or guard against race conditions.

Example 9

// create a simple service listening on a port
ServicePort servicePort = SimpleService.Create(_taskQueue);

// create request
GetState get = new GetState();

// post request
servicePort.Post(get);

// use the extension method on the PortSet that creates a choice
// given two types found on one PortSet. This a common use of 
// Choice to deal with responses that have success or failure
Arbiter.Activate(_taskQueue,
    get.ResponsePort.Choice(
        s => Console.WriteLine(s), // delegate for success
        ex => Console.WriteLine(ex) // delegate for failure
));

Example 9 shows one common use of the choice arbiter to execute two different delegates, based on messages received on a PortSet. Note that the choice class can take an arbitrary number of receivers, and coordinate across them, not just two. The Choice extension method on PortSet is a concise alternative to creating a choice arbiter, and then creating two receiver arbiters, one for each delegate.

Bb648749.hs-note(en-us,MSDN.10).gif The choice arbiter is an example of a parent arbiter: other arbiters, such as single item receivers or joins, can be nested under a choice. The arbiter design allows for a hierarchy of arbiters, invoking each arbiter in the hierarchy in the correct order, before determining if a user handler should execute. This allows you to express complex coordination with just a few lines of code.

Joins and Multiple Item Receivers

Multiple item receiver arbiters come in two categories:

  1. Also known as joins, or WaitForMultiple in OS literature, they are receivers that attempt to receive from one or more ports. If one of the attempts fail, they post any items back and wait to try again when the right conditions are met. This two phase logic provides a type safe and deadlock free mechanism. It can be used to guarantee atomic access to multiple resources, without the fear of deadlock, since the order the items are received is not important. The number of items and ports can be specified at runtime or be fixed at compile time. The fact that the number of items in the join can be specified at runtime is an important extension the CCR provides over other forms of typed joins.
  2. Receivers that eagerly remove items from each port participating in the receive, and when the total item count is satisfied, execute the user delegate. This version is very fast but should not be used as a resource synchronization primitive. It is often used for gathering results for multiple pending requests, known as scatter/gather scenarios.

Joins

Example 10

var portDouble = new Port<double>();
var portString = new Port<string>();

// activate a joined receiver that will execute only when one
// item is available in each port.
Arbiter.Activate(_taskQueue,
    portDouble.Join(
        portString, // port to join with
        (value, stringValue) => // delegate
        {
            value /= 2.0;
            stringValue = value.ToString();
            // post back updated values
            portDouble.Post(value);
            portString.Post(stringValue);
        })
    );

// post items. The order does not matter, which is what Join its power
portDouble.Post(3.14159);
portString.Post("0.1");

//after the last post the delegate above will execute 

Example 10 demonstrates a simple static join, as specified at compile time with a fixed number of ports. A join receiver is activated across two ports, and then posts items to each port. The join logic then determines if it has everything it needs and schedules the delegate for execution.

Example 11

var portInt = new Port<int>();
var portDouble = new Port<double>();
var portString = new Port<string>();

// activate a joined receiver that will execute only when one
// item is available in each port.
Arbiter.Activate(_taskQueue,
    portDouble.Join(
        portString, // second port to listen
        (value, stringValue) =>
        {
            value /= 2.0;
            stringValue = value.ToString();
            // post back updated values
            portDouble.Post(value);
            portString.Post(stringValue);
        })
    );

// activate a second joined receiver that also listens on portDouble
// and on a new port, portInt. Because the two joins share a common port
// between them (portDouble), there is contention when items are posted on
// that port
Arbiter.Activate(_taskQueue,
    portDouble.Join( 
        portInt, // second port to listen
        (value, intValue) =>
        {
            value /= 2.0;
            intValue = (int)value;
            // post back updated values
            portDouble.Post(value);
            portInt.Post(intValue);
        })
    );

// post items. 
portString.Post("0.1");
portInt.Post(128);

// when the double is posted there will be a race
// between the two joins to determine who will execute first
// The delegate that executes first will then post back a double,
// allowing the delegate that "lost", to execute.
portDouble.Post(3.14159);

Example 11 is a simple extension of the previous join example showing a simple case of join contention. Two independent delegates listen on the same port, plus some other ports not common between them. Because the join implementation is two phase, there is no guarantee both will run, as soon as the value extracted from the shared port, is posted back. The order they run does not matter, so races will not affect the outcome. This basically shows how a traditional locking problem, across multiple resources, can become just a scheduling dependency, resolved by the CCR. Messages are both the resource being guarded from multiple concurrent access, and the signal that triggers the execution of the code that requires it.

Bb648749.hs-note(en-us,MSDN.10).gif Using joins this way is a good alternative to using nested locks, but it is still a very error-prone way to program resource access. The Interleave primitive described in a later section, is a much simpler, less error-prone, and faster alternative.

Example 12

int itemCount = 10;
var portDouble = new Port<double>();

// post N items to a port
for (int i = 0; i < itemCount; i++)
{
    portDouble.Post(i * 3.14159);
}

// activate a Join that
// waits for N items on the same port
Arbiter.Activate(_taskQueue,
    portDouble.Join(
        itemCount,
        items =>
        {
            foreach (double d in items)
            {
                Console.WriteLine(d);
            }
        }
    )
);

Example 12 shows a simple case of a dynamic  join: The number of items is known only at runtime, stored in variable itemCount. They are all read from one port. The example uses a version of join that executes a handler when N items are received from one port. The Join() extension method on the Port class is an an alternative to the Arbiter.JoinedReceive() and Arbiter.MultipleItemReceive() static methods.

Multiple Item receivers

Multiple item receivers are appropriate when no contention is expected on the ports. They can be used to aggregate responses from multiple outstanding requests.

Example 13

// create a simple service listening on a port
var servicePort = SimpleService.Create(_taskQueue);

// shared response port 
var responsePort = new PortSet<string, Exception>();

// number of requests
int requestCount = 10;

// scatter phase: Send N requests as fast as possible
for (int i = 0; i < requestCount; i++)
{
    // create request
    GetState get = new GetState();

    // set response port to shared port
    get.ResponsePort = responsePort;

    // post request
    servicePort.Post(get);
}

// gather phase: 
// activate a multiple item receiver that waits for a total
// of N responses, across the ports in the PortSet.
// The service could respond with K failures and M successes (K+M == N)
Arbiter.Activate(_taskQueue,
    responsePort.MultipleItemReceive(
        requestCount, // total responses expected
        (successes, failures) => Console.WriteLine("Total received:" + successes.Count + failures.Count)
        )
    );

Example 13 shows a common case for dealing with multiple pending asynchronous operations, using a single delegate to gather the results. Assuming for any N operations, K can fail, M can succeed, and K+M = N - the CCR MultipleItemReceiver gives a concise way to gather all the results, arriving in any order and in any combination across the types. A single delegate will be called, with two collections, containing the K failures and M successes. The MutipleItemReceive extension method can be used for two discrete types but the underlying MultipleItemGather CCR arbiter can work with an arbitrary number of types.

Note that the MultipleItemReceiver in Example 13 must be activated or it will have no effect. The code will compile, but the receiver will not be executed. Using yield return does an implicit activation, so there is no need to use Arbiter.Activate. However, yield return can only be used inside an Iterator.

Coordination for service-oriented components

Persisted Single Item Receivers

CCR was motivated from the beginning as the runtime capable of efficiently executing components that listen on some queues for messages, and activate handlers to process inbound messages. The simplest case is to use the receiver arbiter, in persisted mode, to listen on a port and activate a handler whenever an item is posted.

Example 14

/// <summary>
/// Base type for all service messages. Defines a response PortSet used
/// by all message types.
/// </summary>
public class ServiceOperation
{
    public PortSet<string, Exception> ResponsePort = new PortSet<string, Exception>();
}

public class Stop : ServiceOperation
{
}

public class UpdateState : ServiceOperation
{
    public string State;
}

public class GetState : ServiceOperation
{
}

/// <summary>
/// PortSet that defines which messages the services listens to
/// </summary>
public class ServicePort : PortSet<Stop, UpdateState, GetState>
{
}
/// <summary>
/// Simple example of a CCR component that uses a PortSet to abstract
/// its API for message passing
/// </summary>
public class SimpleService
{
    ServicePort _mainPort;
    DispatcherQueue _taskQueue;
    string _state;

    public static ServicePort Create(DispatcherQueue taskQueue)
    {
        var service = new SimpleService(taskQueue);
        service.Initialize();
        return service._mainPort;
    }

    private void Initialize()
    {
        // using the supplied taskQueue for scheduling, activate three
        // persisted receivers, that will run concurrently to each other,
        // one for each item type
        Arbiter.Activate(_taskQueue,
            Arbiter.Receive<UpdateState>(true, _mainPort, UpdateHandler),
            Arbiter.Receive<GetState>(true, _mainPort, GetStateHandler)
        );
    }

    private SimpleService(DispatcherQueue taskQueue)
    {
        // create PortSet instance used by external callers to post items
        _mainPort = new ServicePort();

        // cache dispatcher queue used to schedule tasks
        _taskQueue = taskQueue;
    }
    void GetStateHandler(GetState get)
    {
        if (_state == null)
        {
            // To demonstrate a failure response,
            // when state is null will post an exception
            get.ResponsePort.Post(new InvalidOperationException());
            return;
        }

        // return the state as a message on the response port
        get.ResponsePort.Post(_state);
    }
    void UpdateHandler(UpdateState update)
    {
        // update state from field in the message
        _state = update.State;

        // as success result, post the state itself
        update.ResponsePort.Post(_state);
    }
}

Example 14 shows a class implementing the common CCR pattern for a software component:

  • Definitions of message types used to interact with the component.
  • Definition of a PortSet derived class that accepts the message types defined. Its not necessary to derive from PortSet, but its a convenient way to reuse a PortSet with a particular number of types.
  • A static Create method that initializes an instance of the component and returns a PortSet instance used to communicate with the component instance.
  • A private Initialize method that attaches some arbiters on the public PortSet external code will use to talk to the service.

If no concurrency constraints exist between the different handlers, simple, persisted single item receivers can be used.

Interleave Arbiter

For non trivial components that listen on ports, often a private resource is used that should be carefully protected from concurrent access. A data structure stored internally requiring multiple updates that must be treated atomically is one case. Another scenario is a component implementing a complex multi-step process, that cannot be preempted when certain external requests arrive. The CCR helps you think only about implementing the complex process, and takes care of queueing requests and handler activations, until the process is complete. You use the interleave arbiter to declare what protection segments of code require.

For programmers familiar with the reader/writer lock primitive in thread programming, the interleave arbiter is a similar concept. It’s a writer biased reader/writer. But, instead of locking a specific object, sections of code are protected from each other. Avoiding contention on a lock, interleave uses internal queues to create scheduling dependencies. Plus, it manages execution so tasks that can run concurrently, do, and tasks that run exclusively, first wait for all other tasks to complete.

Example 15

/// <summary>
/// Simple example of a CCR component that uses a PortSet to abstract
/// its API for message passing
/// </summary>
public class ServiceWithInterleave
{
    ServicePort _mainPort;
    DispatcherQueue _taskQueue;
    string _state;

    public static ServicePort Create(DispatcherQueue taskQueue)
    {
        var service = new ServiceWithInterleave(taskQueue);
        service.Initialize();
        return service._mainPort;
    }

    private void Initialize()
    {
        // activate an Interleave Arbiter to coordinate how the handlers of the service
        // execute in relation to each other and to their own parallel activations 
        Arbiter.Activate(_taskQueue,
            Arbiter.Interleave(
            new TeardownReceiverGroup(
            // one time, atomic teardown
                Arbiter.Receive<Stop>(false, _mainPort, StopHandler)
            ),
            new ExclusiveReceiverGroup(
            // Persisted Update handler, only runs if no other handler running
                Arbiter.Receive<UpdateState>(true, _mainPort, UpdateHandler)
            ),
            new ConcurrentReceiverGroup(
            // Persisted Get handler, runs in parallel with all other activations of itself
            // but never runs in parallel with Update or Stop
                Arbiter.Receive<GetState>(true, _mainPort, GetStateHandler)
            ))
        );
    }

    private ServiceWithInterleave(DispatcherQueue taskQueue)
    {
        // create PortSet instance used by external callers to post items
        _mainPort = new ServicePort();

        // cache dispatcher queue used to schedule tasks
        _taskQueue = taskQueue;
    }

    void GetStateHandler(GetState get)
    {
        if (_state == null)
        {
            // when state is null will post an exception
            get.ResponsePort.Post(new InvalidOperationException());
            return;
        }

        // return the state as a message on the response port
        get.ResponsePort.Post(_state);
    }

    void UpdateHandler(UpdateState update)
    {
        // update state from field in the message
        // Because the update requires a read, a merge of two strings
        // and an update, this code needs to run un-interrupted by other updates.
        // The Interleave Arbiter makes this guarantee since the UpdateHandler is in the
        // ExclusiveReceiverGroup
        _state = update.State + _state;

        // as success result, post the state itself
        update.ResponsePort.Post(_state);
    }

    void StopHandler(Stop stop)
    {
        Console.WriteLine("Service stopping. No other handlers are running or will run after this");
    }
}

Example 15 extends the SimpleService class to use an interleave arbiter to coordinate the receivers that execute the various handlers. Interleave is another example of a parent arbiter that can have various other receivers nested. The example shows how you can concisely state intent in terms of concurrency. Certain handlers can run independently, others can not. The CCR does not need to know what resource or multi step process needs exclusive access. It only needs to know what code handler to protect. The handlers are very simple in this example. However, in a later section, iterator handlers demonstrate how interleave can protect complex code that runs in multiple steps.

If you are writing a DSS service, then your service is a subclass of the DsspServiceBase class which automatically creates a MainInterleave for you. Furthermore, the service handlers are automatically added to this interleave (when you call base.Start) if you mark them with the ServiceHandler attribute. For more information on this see the DSS documentation.

 

 

© 2012 Microsoft Corporation. All Rights Reserved.