CCR Interop with Non-CCR Code

Glossary Item Box

MSDN Magazine: Concurrent Affairs -- Concurrency and Coordination Runtime

See Also Microsoft Robotics Developer Studio Send feedback on this topic

CCR Interop with Non-CCR Code

Thread apartment constraints

Several legacy windows components, especially COM objects, require specific thread apartment policy when interacting with them. Even more recent frameworks, such as .NET WinForms require SingleThreadedApartment policy on the thread that hosts the WinForm.

Concurrency and Coordination Runtime (CCR) can easily host and interoperate with Single Thread Apartment (STA) components: Components should create an instance of the CCR Dispatcher class, with one thread, and with the appropriate thread apartment policy in its constructor. DispatcherQueue instances can then use this dispatcher when activating handlers that need to interoperate with the legacy object. These handlers can then safely access the COM or WinForm object, while hiding their STA affinity to other CCR components that simply post on regular CCR ports, not caring what dispatcher the handlers attached on the port use.

The CCR WinForm adapter library is one convenient way to host .NET windows forms within the CCR (Ccr.Adapters.Winforms.dll).

Coordination with main application thread

CCR software components are often executed in the context of a Common Language Runtime (CLR) application, like a standalone executable. The .NET runtime starts programs using one OS threads and terminates them when the thread exits. Since CCR applications are asynchronous and concurrent, they are not "active" when no messages are sent and almost never block any threads. The CCR dispatcher will keep threads in efficient sleep state but if these threads were created as background, the application will exit even of the CCR is executing items.

One common pattern for interfacing with the synchronous world of CLR startup, is to use a System.Threading.AutoResetEvent and block the main application thread until the CCR application is finished. The event can be signaled by any CCR handler.

Example 25

void InteropBlockingExample()
{
    // create OS event used for signalling
    AutoResetEvent signal = new AutoResetEvent(false);

    // schedule a CCR task that will execute in parallel with the rest of
    // this method
    Arbiter.Activate(
        _taskQueue,
        new Task<AutoResetEvent>(signal, SomeTask)
    );
    // block main application thread form exiting
    signal.WaitOne();
}

void ThrottlingExample()
{
    int maximumDepth = 10;
    Dispatcher dispatcher = new Dispatcher(0, "throttling example");
    DispatcherQueue depthThrottledQueue = new DispatcherQueue("ConstrainQueueDepthDiscard",
          dispatcher,
          TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks,
          maximumDepth);

    Port<int> intPort = new Port<int>();
    Arbiter.Activate(depthThrottledQueue,
       Arbiter.Receive(true, intPort,
       delegate(int i)
       {
           // only some items will be received since throttling will discard most of them
           Console.WriteLine(i);
       })
    );

    // post items as fast as possible so that the depth policy is activated and discards
    // all the oldest items in the dispatcher queue
    for (int i = 0; i < maximumDepth * 100000; i++)
    {
        intPort.Post(i);
    }
}



/// <summary>
/// Handler that executes in parallel with main application thread
/// </summary>
/// <param name="signal"></param>
void SomeTask(AutoResetEvent signal)
{
    try
    {
        for (int i = 0; i < 1000000; i++)
        {
            int k = i * i / 10;
        }
    }
    finally
    {
        // done, signal main application thread
        signal.Set();
    }
}

Example 25 demonstrates a trivial case of blocking the main application thread using an OS event.

Simplifying the .NET Asynchronous Programming Pattern

CCR can be used with any .NET class that implements the Asynchronous Programming Model (APM) pattern. It actually greatly simplifies the asynchronous pattern, and when using C#, the need for delegates and continuation passing is completely eliminated. CCR Iterator scheduling support allows you to yield directly to pending I/O operations and implement readable code and patterns traditionally only available to synchronous code.

Example 27

IEnumerator<ITask> CcrReadEnumerator(string filename)
{
    var resultPort = new Port<IAsyncResult>();
    // stage 1: open file and start the asynchronous operation
    using (FileStream fs = new FileStream(filename, 
        FileMode.Open, FileAccess.Read, FileShare.Read, 8192, FileOptions.Asynchronous))
    {
        Byte[] data = new Byte[fs.Length];
        fs.BeginRead(data, 0, data.Length, resultPort.Post, null);    
        // stage 2: suspend execution until operation is complete
        yield return Arbiter.Receive(false, resultPort, delegate { });
        // stage 3: retrieve result of operation just by assigned variable to CCR port
        var ar = (IAsyncResult)resultPort;
        try
        {   Int32 bytesRead = fs.EndRead(ar); }
        catch
        {   
            // handle I/O exception 
        }
        ProcessData(data);
    }
}

Example 27 demonstrates how a CCR iterator can use the file stream APM BeginRead/EndRead methods but without passing a delegate. Instead we supply the the Post method to a CCR port, so the asynchronous result is posted directly to a CCR Port. The code then yields to a receive operation on the port. It is the yield return statement that allows us to write logically sequential code but without using an OS thread! The code retains the scalability of asynchronous, overlapped operations, but it is as readable as synchronous, sequential code.

Example 28

/// <summary>
/// Read from one stream into a Http request stream, asynchronously
/// </summary>
public virtual IEnumerator<ITask> UploadHttpStream(string contentUrl,
    Stream contentStream, PortSet<HttpStatusCode, Exception> resultPort)
{
    // Create HTTP request
    HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(contentUrl);
    webRequest.Method = "POST";
    HttpStatusCode status = HttpStatusCode.OK;
    Exception ex = null;
    using (Stream requestStream = webRequest.GetRequestStream())
    {
        byte[] readBuffer = new byte[1024];
        int bytesRead = -1;
        // With CCR and iterators you can do loops and asynchronous I/O
        do
        {
            // use CCR stream adapter (or a custom APM adapter) to schedule operation
            var ioResultPort = StreamAdapter.Read(contentStream, readBuffer, 0, 1024);
            // yield to result (success or failure)
            yield return (Choice)ioResultPort;
            // check for error
            ex = ioResultPort;
            if (ex != null)
            {
                resultPort.Post(ex);
                // exit on error
                yield break;
            }
            bytesRead = ioResultPort;
            var writeResultPort = StreamAdapter.Write(requestStream, readBuffer, 0, bytesRead);
            // yield to write operation
            yield return (Choice)writeResultPort;
            // check for write error
            ex = writeResultPort;
            if (ex != null)
            {
                resultPort.Post(ex);
                yield break;
            }
        } while (bytesRead > 0);

        requestStream.Flush();
    }
    // Use built-in CCR adapter for reading HTTP response
    var webResultPort = WebRequestAdapter.ReadResponse(webRequest, _taskQueue);
    // yield to web response operation
    yield return (Choice)webResultPort;
    // check for any exceptions
    GetErrorDetails((Exception)webResultPort, out status);
    resultPort.Post(status);
}

Example 28 shows a more complex APM interaction: A CCR iterator is scheduled to read data asynchronously from a stream, and again write the data asynchronously to a HTTP web request stream. Notice that the CCR can express while and for loops with asynchronous operations inside!

See Also 

MSDN Magazine: Concurrent Affairs -- Concurrency and Coordination Runtime

 

 

© 2012 Microsoft Corporation. All Rights Reserved.