I/O Asynchronous Completion

Using asynchronous I/O completion events, a thread from the thread pool processes data only when the data is received, and once the data has been processed, the thread returns to the thread pool.

To make an asynchronous I/O call, an operating-system I/O handle must be associated with the thread pool and a callback method must be specified. When the I/O operation completes, a thread from the thread pool invokes the callback method.

The following C# code example demonstrates a simple asynchronous I/O operation.

Note   This example requires more than 100MB of available memory.

using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;

public class BulkImageProcAsync{
   public const String ImageBaseName = "tmpImage-";
   public const int numImages = 200;
   public const int numPixels = 512*512;

   // ProcessImage has a simple O(N) loop, and you can vary the number
   // of times you repeat that loop to make the application more CPU-bound
   // or more I/O-bound.
   public static int processImageRepeats = 20;

   // Threads must decrement NumImagesToFinish, and protect
   // their access to it through a mutex.
   public static int NumImagesToFinish = numImages;
   public static Object NumImagesMutex = new Object[0];
   
   // WaitObject is signalled when all image processing is done.
   public static Object WaitObject = new Object[0];

   public class ImageStateObject{
      public byte[] pixels;
      public int imageNum;
      public FileStream fs;
   }

   public static void MakeImageFiles(){
      int sides = (int) Math.Sqrt(numPixels);
      Console.Write("Making "+numImages+" "+sides+"x"+sides+" images...  ");
      byte[] pixels = new byte[numPixels];
      for(int i=0; i<numPixels; i++)
      pixels[i] = (byte) i;

      for(int i=0; i<numImages; i++){
         FileStream fs = new FileStream(
            ImageBaseName+i+".tmp", 
            FileMode.Create,
            FileAccess.Write, 
            FileShare.None, 
            8192, 
            false
         );
         fs.Write(pixels, 0, pixels.Length);
         FlushFileBuffers(fs.Handle);
         fs.Close();
      }
      Console.WriteLine("Done.");
   }

   public static void ReadInImageCallback(IAsyncResult asyncResult){

      ImageStateObject state = (ImageStateObject) asyncResult.AsyncState;

      Stream stream = state.fs;
      int bytesRead = stream.EndRead(asyncResult);
      if (bytesRead != numPixels)
         throw new Exception("In ReadInImageCallback, got wrong number of bytes from the image!  got: " + bytesRead);

      ProcessImage(state.pixels, state.imageNum);
      stream.Close();

      // Now write out the image. No async here.
      FileStream fs = new FileStream(
         ImageBaseName + state.imageNum + ".done", 
         FileMode.Create, 
         FileAccess.Write, 
         FileShare.None, 
         4096, 
         false
      );

      fs.Write(state.pixels, 0, numPixels);
      fs.Close();

      // This application model uses too much memory.
      // Releasing memory as soon as possible is a good idea, 
      // especially global state.
      state.pixels = null;

      // Record that an image is done now.
      lock(NumImagesMutex){
         NumImagesToFinish--;
         if (NumImagesToFinish==0){
            lock(WaitObject){
               Monitor.Pulse(WaitObject);
            }
         }
      }
   }

   public static void ProcessImage(byte[] pixels, int imageNum){

      Console.WriteLine("ProcessImage "+imageNum);

      // Do some CPU-intensive operation on the image.
      for(int i=0; i<processImageRepeats; i++)
         for(int j=0; j<numPixels; j++)
            pixels[j] += 1;

      Console.WriteLine("ProcessImage "+imageNum+" done.");
   }

   public static void ProcessImagesInBulk(){

      Console.WriteLine("Processing images...  ");
      long t0 = Environment.TickCount;
      NumImagesToFinish = numImages;

      AsyncCallback readImageCallback = new AsyncCallback(ReadInImageCallback);
      for(int i=0; i<numImages; i++){

         ImageStateObject state = new ImageStateObject();
         state.pixels = new byte[numPixels];
         state.imageNum = i;

         // Very large items are read only once, so the 
         // buffer on the file stream can be very small to save memory.

         FileStream fs = new FileStream(
            ImageBaseName+i+".tmp",
            FileMode.Open, 
            FileAccess.Read, 
            FileShare.Read, 
            1, 
            true
         );
         state.fs = fs;
         fs.BeginRead(state.pixels, 0, numPixels, readImageCallback, state);
      }

      // Determine whether all images are done being processed.  
      // If not, block until all are finished.
      bool mustBlock = false;
      lock (NumImagesMutex){
         if (NumImagesToFinish > 0)
            mustBlock = true;
      }
      if (mustBlock){
         Console.WriteLine(
            "All worker threads are queued... Blocking until they complete.  numLeft: " + NumImagesToFinish
         );
            lock(WaitObject){
            Monitor.Pulse(WaitObject);
            }      
      }

      long t1 = Environment.TickCount;
      Console.WriteLine("Total time processing images: {0} ms", (t1-t0));
   }

   public static void Cleanup(){
      for(int i=0; i<numImages; i++){
         File.Delete(ImageBaseName+i+".tmp");
         File.Delete(ImageBaseName+i+".done");
      }
   }

   public static void TryToClearDiskCache(){

      // Try to force all pending writes to disk, and clear the
      // disk cache of any data.
      byte[] bytes = new byte[100*(1<<20)];
      for(int i=0; i<bytes.Length; i++)
      bytes[i] = 0;
      bytes = null;
      GC.Collect();
      Thread.Sleep(2000);
   }

   public static void Main(String[] args){

      Console.WriteLine("Bulk image processing sample application, using asynchronous I/O");
      Console.WriteLine("Simulates applying a simple transformation to "+numImages+" \"images\"");
      Console.WriteLine("(ie, Async FileStream & Threadpool benchmark)");
      Console.WriteLine("Warning - this test requires "+(numPixels * numImages * 2)+" bytes of tmp space");

      if (args.Length==1){
         processImageRepeats = Int32.Parse(args[0]);
         Console.WriteLine("ProcessImage inner loop - "+processImageRepeats);
      }

      MakeImageFiles();
      TryToClearDiskCache();
      ProcessImagesInBulk();
      Cleanup();
   }
 
   [DllImport("KERNEL32", SetLastError=true)]
   private static extern void FlushFileBuffers(IntPtr handle);

}

See Also

Threading Objects and Features | ThreadPool