非同期ファイル I/O

同期 I/O では、メソッドは I/O 操作が完了するまでブロックされ、I/O 操作の完了後にデータを返します。非同期 I/O については、BeginRead を呼び出すことができます。メイン スレッドで他の処理を継続でき、後からデータを処理できます。また、同時に複数の I/O 要求を保留できます。

このデータが利用可能になった時点を把握するには、発行した I/O 要求に対応する IAsyncResult を渡して EndRead または EndWrite を呼び出します。また、読み取ったバイト数や書き込んだバイト数を算出するには、EndRead または EndWrite を呼び出すコールバック メソッドを用意します。非同期 I/O は、同時に多数の I/O 要求を保留できるためバフォーマンスを向上させますが、一般に、アプリケーションを大幅に再構築しないと正しく機能しません。

Stream クラスでは、同じストリーム内での同期および非同期の読み書き操作の混合をサポートします。これは、オペレーティング システムがこれを許可していない場合でも同様です。Stream は、同期の読み書き操作の実装に対しては既定の非同期操作の実装を提供し、非同期の読み書き操作の実装に対しては既定の同期操作の実装を提供します。

Stream の派生クラスを実装するときは、同期または非同期のどちらかの Read メソッドおよび Write メソッドを実装する必要があります。ReadWrite のオーバーライドは許可されており、非同期メソッド (BeginReadEndReadBeginWrite、および EndWrite) の既定の実装を同期メソッドの実装と併用できますが、この方法ではあまり効率的なパフォーマンスは得られません。同様に、同期の Read メソッドと Write メソッドは非同期のメソッドを実装した場合でも正しく機能しますが、同期メソッドに特定して実装した場合の方が一般にパフォーマンスは優れています。ReadByte および WriteByte の既定の実装は、単一の要素で構成されるバイト配列に対する同期の Read メソッドと Write メソッドを呼び出します。Stream からクラスを派生させるとき、内部バイト バッファがある場合は、パフォーマンス向上のため、これらのメソッドを内部バッファにアクセスするようにオーバーライドすることをお勧めします。

バッキング ストアに関連付けられているストリームは、同期または非同期のいずれかの Read メソッドおよび Write メソッドをオーバーライドし、既定によりもう一方の機能を実装します。ストリームが非同期操作または同期操作をサポートしていない場合は、該当するメソッドが例外をスローするようにする必要があります。

仮想のバルク イメージ プロセッサで非同期操作を実装する例を次に示します。その後で、同期操作の実装例を示します。このコードは、ディレクトリのすべてのファイルに対して CPU 集中型の操作を実行するデザインになっています。詳細については、「非同期プログラミングのデザイン パターン」を参照してください。

Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions



Module BulkImageProcAsync
    Dim ImageBaseName As String = "tmpImage-"
    Dim numImages As Integer = 200
    Dim numPixels As Integer = 512 * 512

    ' ProcessImage has a simple O(N) loop, and you can vary the number
    ' of times you repeat that loop to make the application more CPU-
    ' bound or more IO-bound.
    Dim processImageRepeats As Integer = 20

    ' Threads must decrement NumImagesToFinish, and protect
    ' their access to it through a mutex.
    Dim NumImagesToFinish As Integer = numImages
    Dim NumImagesMutex(-1) As [Object]
    ' WaitObject is signalled when all image processing is done.
    Dim WaitObject(-1) As [Object]

    Structure ImageStateObject
        Public pixels() As Byte
        Public imageNum As Integer
        Public fs As FileStream
    End Structure


    <SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
    Sub MakeImageFiles()
        Dim sides As Integer = Fix(Math.Sqrt(numPixels))
        Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
        Dim pixels(numPixels) As Byte
        Dim i As Integer
        For i = 0 To numPixels
            pixels(i) = 255
        Next i
        Dim fs As FileStream
        For i = 0 To numImages
            fs = New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
            fs.Write(pixels, 0, pixels.Length)
            FlushFileBuffers(fs.SafeFileHandle.DangerousGetHandle())
            fs.Close()
        Next i
        fs = Nothing
        Console.WriteLine("Done.")

    End Sub


    Sub ReadInImageCallback(ByVal asyncResult As IAsyncResult)
        Dim state As ImageStateObject = CType(asyncResult.AsyncState, ImageStateObject)
        Dim stream As Stream = state.fs
        Dim bytesRead As Integer = stream.EndRead(asyncResult)
        If bytesRead <> numPixels Then
            Throw New Exception(String.Format("In ReadInImageCallback, got the wrong number of " + "bytes from the image: {0}.", bytesRead))
        End If
        ProcessImage(state.pixels, state.imageNum)
        stream.Close()

        ' Now write out the image.  
        ' Using asynchronous I/O here appears not to be best practice.
        ' It ends up swamping the threadpool, because the threadpool
        ' threads are blocked on I/O requests that were just queued to
        ' the threadpool. 
        Dim fs As New FileStream(ImageBaseName + state.imageNum.ToString() + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4096, False)
        fs.Write(state.pixels, 0, numPixels)
        fs.Close()

        ' This application model uses too much memory.
        ' Releasing memory as soon as possible is a good idea, 
        ' especially global state.
        state.pixels = Nothing
        fs = Nothing
        ' Record that an image is finished now.
        SyncLock NumImagesMutex
            NumImagesToFinish -= 1
            If NumImagesToFinish = 0 Then
                Monitor.Enter(WaitObject)
                Monitor.Pulse(WaitObject)
                Monitor.Exit(WaitObject)
            End If
        End SyncLock

    End Sub


    Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
        Console.WriteLine("ProcessImage {0}", imageNum)
        Dim y As Integer
        ' Perform some CPU-intensive operation on the image.
        Dim x As Integer
        For x = 0 To processImageRepeats
            For y = 0 To numPixels
                pixels(y) = 1
            Next y
        Next x
        Console.WriteLine("ProcessImage {0} done.", imageNum)

    End Sub


    Sub ProcessImagesInBulk()
        Console.WriteLine("Processing images...  ")
        Dim t0 As Long = Environment.TickCount
        NumImagesToFinish = numImages
        Dim readImageCallback As New AsyncCallback(AddressOf ReadInImageCallback)
        Dim i As Integer
        For i = 0 To numImages
            Dim state As New ImageStateObject()
            state.pixels = New Byte(numPixels) {}
            state.imageNum = i
            ' Very large items are read only once, so you can make the 
            ' buffer on the FileStream very small to save memory.
            Dim fs As New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 1, True)
            state.fs = fs
            fs.BeginRead(state.pixels, 0, numPixels, readImageCallback, state)
        Next i

        ' Determine whether all images are done being processed.  
        ' If not, block until all are finished.
        Dim mustBlock As Boolean = False
        SyncLock NumImagesMutex
            If NumImagesToFinish > 0 Then
                mustBlock = True
            End If
        End SyncLock
        If mustBlock Then
            Console.WriteLine("All worker threads are queued. " + " Blocking until they complete. numLeft: {0}", NumImagesToFinish)
            Monitor.Enter(WaitObject)
            Monitor.Wait(WaitObject)
            Monitor.Exit(WaitObject)
        End If
        Dim t1 As Long = Environment.TickCount
        Console.WriteLine("Total time processing images: {0}ms", t1 - t0)

    End Sub


    Sub Cleanup()
        Dim i As Integer
        For i = 0 To numImages
            File.Delete(ImageBaseName + i.ToString + ".tmp")
            File.Delete(ImageBaseName + i.ToString + ".done")
        Next i

    End Sub


    Sub TryToClearDiskCache()
        ' Try to force all pending writes to disk, and clear the
        ' disk cache of any data.
        Dim bytes(100 * (1 << 20)) As Byte
        Dim i As Integer
        For i = 0 To bytes.Length - 1
            bytes(i) = 0
        Next i
        bytes = Nothing
        GC.Collect()
        Thread.Sleep(2000)

    End Sub


    Sub Main(ByVal args() As String)
        Console.WriteLine("Bulk image processing sample application," + " using asynchronous IO")
        Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images""", numImages)
        Console.WriteLine("(Async FileStream & Threadpool benchmark)")
        Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)

        If args.Length = 1 Then
            processImageRepeats = Int32.Parse(args(0))
            Console.WriteLine("ProcessImage inner loop - {0}.", processImageRepeats)
        End If
        MakeImageFiles()
        TryToClearDiskCache()
        ProcessImagesInBulk()
        Cleanup()

    End Sub

    <DllImport("KERNEL32", SetLastError:=True)> _
    Sub FlushFileBuffers(ByVal handle As IntPtr)
    End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;

public class BulkImageProcAsync
{
    public const String ImageBaseName = "tmpImage-";
    public const int numImages = 200;
    public const int numPixels = 512 * 512;

    // ProcessImage has a simple O(N) loop, and you can vary the number
    // of times you repeat that loop to make the application more CPU-
    // bound or more IO-bound.
    public static int processImageRepeats = 20;

    // Threads must decrement NumImagesToFinish, and protect
    // their access to it through a mutex.
    public static int NumImagesToFinish = numImages;
    public static Object[] NumImagesMutex = new Object[0];
    // WaitObject is signalled when all image processing is done.
    public static Object[] WaitObject = new Object[0];
    public class ImageStateObject
    {
        public byte[] pixels;
        public int imageNum;
        public FileStream fs;
    }

    [SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
    public static void MakeImageFiles()
    {
        int sides = (int)Math.Sqrt(numPixels);
        Console.Write("Making {0} {1}x{1} images... ", numImages,
            sides);
        byte[] pixels = new byte[numPixels];
        int i;
        for (i = 0; i < numPixels; i++)
            pixels[i] = (byte)i;
        FileStream fs;
        for (i = 0; i < numImages; i++)
        {
            fs = new FileStream(ImageBaseName + i + ".tmp",
                FileMode.Create, FileAccess.Write, FileShare.None,
                8192, false);
            fs.Write(pixels, 0, pixels.Length);
            FlushFileBuffers(fs.SafeFileHandle.DangerousGetHandle());
            fs.Close();
        }
        fs = null;
        Console.WriteLine("Done.");
    }

    public static void ReadInImageCallback(IAsyncResult asyncResult)
    {
        ImageStateObject state = (ImageStateObject)asyncResult.AsyncState;
        Stream stream = state.fs;
        int bytesRead = stream.EndRead(asyncResult);
        if (bytesRead != numPixels)
            throw new Exception(String.Format
                ("In ReadInImageCallback, got the wrong number of " +
                "bytes from the image: {0}.", bytesRead));
        ProcessImage(state.pixels, state.imageNum);
        stream.Close();

        // Now write out the image.  
        // Using asynchronous I/O here appears not to be best practice.
        // It ends up swamping the threadpool, because the threadpool
        // threads are blocked on I/O requests that were just queued to
        // the threadpool. 
        FileStream fs = new FileStream(ImageBaseName + state.imageNum +
            ".done", FileMode.Create, FileAccess.Write, FileShare.None,
            4096, false);
        fs.Write(state.pixels, 0, numPixels);
        fs.Close();

        // This application model uses too much memory.
        // Releasing memory as soon as possible is a good idea, 
        // especially global state.
        state.pixels = null;
        fs = null;
        // Record that an image is finished now.
        lock (NumImagesMutex)
        {
            NumImagesToFinish--;
            if (NumImagesToFinish == 0)
            {
                Monitor.Enter(WaitObject);
                Monitor.Pulse(WaitObject);
                Monitor.Exit(WaitObject);
            }
        }
    }

    public static void ProcessImage(byte[] pixels, int imageNum)
    {
        Console.WriteLine("ProcessImage {0}", imageNum);
        int y;
        // Perform some CPU-intensive operation on the image.
        for (int x = 0; x < processImageRepeats; x += 1)
            for (y = 0; y < numPixels; y += 1)
                pixels[y] += 1;
        Console.WriteLine("ProcessImage {0} done.", imageNum);
    }

    public static void ProcessImagesInBulk()
    {
        Console.WriteLine("Processing images...  ");
        long t0 = Environment.TickCount;
        NumImagesToFinish = numImages;
        AsyncCallback readImageCallback = new
            AsyncCallback(ReadInImageCallback);
        for (int i = 0; i < numImages; i++)
        {
            ImageStateObject state = new ImageStateObject();
            state.pixels = new byte[numPixels];
            state.imageNum = i;
            // Very large items are read only once, so you can make the 
            // buffer on the FileStream very small to save memory.
            FileStream fs = new FileStream(ImageBaseName + i + ".tmp",
                FileMode.Open, FileAccess.Read, FileShare.Read, 1, true);
            state.fs = fs;
            fs.BeginRead(state.pixels, 0, numPixels, readImageCallback,
                state);
        }

        // Determine whether all images are done being processed.  
        // If not, block until all are finished.
        bool mustBlock = false;
        lock (NumImagesMutex)
        {
            if (NumImagesToFinish > 0)
                mustBlock = true;
        }
        if (mustBlock)
        {
            Console.WriteLine("All worker threads are queued. " +
                " Blocking until they complete. numLeft: {0}",
                NumImagesToFinish);
            Monitor.Enter(WaitObject);
            Monitor.Wait(WaitObject);
            Monitor.Exit(WaitObject);
        }
        long t1 = Environment.TickCount;
        Console.WriteLine("Total time processing images: {0}ms",
            (t1 - t0));
    }

    public static void Cleanup()
    {
        for (int i = 0; i < numImages; i++)
        {
            File.Delete(ImageBaseName + i + ".tmp");
            File.Delete(ImageBaseName + i + ".done");
        }
    }

    public static void TryToClearDiskCache()
    {
        // Try to force all pending writes to disk, and clear the
        // disk cache of any data.
        byte[] bytes = new byte[100 * (1 << 20)];
        for (int i = 0; i < bytes.Length; i++)
            bytes[i] = 0;
        bytes = null;
        GC.Collect();
        Thread.Sleep(2000);
    }

    public static void Main(String[] args)
    {
        Console.WriteLine("Bulk image processing sample application," +
            " using asynchronous IO");
        Console.WriteLine("Simulates applying a simple " +
            "transformation to {0} \"images\"", numImages);
        Console.WriteLine("(Async FileStream & Threadpool benchmark)");
        Console.WriteLine("Warning - this test requires {0} " +
            "bytes of temporary space", (numPixels * numImages * 2));

        if (args.Length == 1)
        {
            processImageRepeats = Int32.Parse(args[0]);
            Console.WriteLine("ProcessImage inner loop - {0}.",
                processImageRepeats);
        }
        MakeImageFiles();
        TryToClearDiskCache();
        ProcessImagesInBulk();
        Cleanup();
    }
    [DllImport("KERNEL32", SetLastError = true)]
    private static extern void FlushFileBuffers(IntPtr handle);
}

同様に、同期操作を実装する例を次に示します。

Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions



Module BulkImageProcSync
    Dim ImageBaseName As String = "tmpImage-"
    Dim numImages As Integer = 200
    Dim numPixels As Integer = 512 * 512

    ' ProcessImage has a simple O(N) loop, and you can vary the number
    ' of times you repeat that loop to make the application more CPU-
    ' bound or more IO-bound.
    Dim processImageRepeats As Integer = 20

    <SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
    Sub MakeImageFiles()
        Dim sides As Integer = Fix(Math.Sqrt(numPixels))
        Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
        Dim pixels(numPixels) As Byte
        Dim i As Integer
        For i = 0 To numPixels
            pixels(i) = 255
        Next i
        Dim fs As FileStream
        For i = 0 To numImages
            fs = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
            fs.Write(pixels, 0, pixels.Length)
            FlushFileBuffers(fs.SafeFileHandle.DangerousGetHandle())
            fs.Close()
        Next i
        fs = Nothing
        Console.WriteLine("Done.")

    End Sub


    Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
        Console.WriteLine("ProcessImage {0}", imageNum)
        Dim y As Integer
        ' Perform some CPU-intensive operation on the image.
        Dim x As Integer
        For x = 0 To processImageRepeats
            For y = 0 To numPixels
                pixels(y) = 1
            Next y
        Next x
        Console.WriteLine("ProcessImage {0} done.", imageNum)

    End Sub


    Sub ProcessImagesInBulk()
        Console.WriteLine("Processing images... ")
        Dim t0 As Long = Environment.TickCount
        Dim pixels(numPixels) As Byte
        Dim input As FileStream
        Dim output As FileStream
        Dim i As Integer
        For i = 0 To numImages
            input = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 4196, False)
            input.Read(pixels, 0, numPixels)
            input.Close()
            ProcessImage(pixels, i)
            output = New FileStream(ImageBaseName + i.ToString + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4196, False)
            output.Write(pixels, 0, numPixels)
            output.Close()
        Next i
        input = Nothing
        output = Nothing
        Dim t1 As Long = Environment.TickCount
        Console.WriteLine("Total time processing images: {0}ms", t1 - t0)

    End Sub


    Sub Cleanup()
        Dim i As Integer
        For i = 0 To numImages
            File.Delete(ImageBaseName + i.ToString + ".tmp")
            File.Delete(ImageBaseName + i.ToString + ".done")
        Next i

    End Sub


    Sub TryToClearDiskCache()
        Dim bytes(100 * (1 << 20)) As Byte
        Dim i As Integer
        For i = 0 To bytes.Length - 1
            bytes(i) = 0
        Next i
        bytes = Nothing
        GC.Collect()
        Thread.Sleep(2000)

    End Sub


    Sub Main(ByVal args() As String)
        Console.WriteLine("Bulk image processing sample application," + " using synchronous I/O.")
        Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images.""", numImages)
        Console.WriteLine("(ie, Sync FileStream benchmark).")
        Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)

        If args.Length = 1 Then
            processImageRepeats = Int32.Parse(args(0))
            Console.WriteLine("ProcessImage inner loop  {0}", processImageRepeats)
        End If

        MakeImageFiles()
        TryToClearDiskCache()
        ProcessImagesInBulk()
        Cleanup()

    End Sub


    <DllImport("KERNEL32", SetLastError:=True)> _
    Sub FlushFileBuffers(ByVal handle As IntPtr)
    End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;

public class BulkImageProcSync
{
    public const String ImageBaseName = "tmpImage-";
    public const int numImages = 200;
    public const int numPixels = 512 * 512;

    // ProcessImage has a simple O(N) loop, and you can vary the number
    // of times you repeat that loop to make the application more CPU-
    // bound or more IO-bound.
    public static int processImageRepeats = 20;

    [SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
    public static void MakeImageFiles()
    {
        int sides = (int)Math.Sqrt(numPixels);
        Console.Write("Making {0} {1}x{1} images... ", numImages,
            sides);
        byte[] pixels = new byte[numPixels];
        int i;
        for (i = 0; i < numPixels; i++)
            pixels[i] = (byte)i;
        FileStream fs;
        for (i = 0; i < numImages; i++)
        {
            fs = new FileStream(ImageBaseName + i + ".tmp",
                FileMode.Create, FileAccess.Write, FileShare.None,
                8192, false);
            fs.Write(pixels, 0, pixels.Length);
            FlushFileBuffers(fs.SafeFileHandle.DangerousGetHandle());
            fs.Close();
        }
        fs = null;
        Console.WriteLine("Done.");
    }

    public static void ProcessImage(byte[] pixels, int imageNum)
    {
        Console.WriteLine("ProcessImage {0}", imageNum);
        int y;
        // Perform some CPU-intensive operation on the image.
        for (int x = 0; x < processImageRepeats; x += 1)
            for (y = 0; y < numPixels; y += 1)
                pixels[y] += 1;
        Console.WriteLine("ProcessImage {0} done.", imageNum);
    }

    public static void ProcessImagesInBulk()
    {
        Console.WriteLine("Processing images... ");
        long t0 = Environment.TickCount;
        byte[] pixels = new byte[numPixels];
        FileStream input;
        FileStream output;
        for (int i = 0; i < numImages; i++)
        {
            input = new FileStream(ImageBaseName + i + ".tmp",
                FileMode.Open, FileAccess.Read, FileShare.Read,
                4196, false);
            input.Read(pixels, 0, numPixels);
            input.Close();
            ProcessImage(pixels, i);
            output = new FileStream(ImageBaseName + i + ".done",
                FileMode.Create, FileAccess.Write, FileShare.None,
                4196, false);
            output.Write(pixels, 0, numPixels);
            output.Close();
        }
        input = null;
        output = null;
        long t1 = Environment.TickCount;
        Console.WriteLine("Total time processing images: {0}ms",
            (t1 - t0));
    }

    public static void Cleanup()
    {
        for (int i = 0; i < numImages; i++)
        {
            File.Delete(ImageBaseName + i + ".tmp");
            File.Delete(ImageBaseName + i + ".done");
        }
    }

    public static void TryToClearDiskCache()
    {
        byte[] bytes = new byte[100 * (1 << 20)];
        for (int i = 0; i < bytes.Length; i++)
            bytes[i] = 0;
        bytes = null;
        GC.Collect();
        Thread.Sleep(2000);
    }

    public static void Main(String[] args)
    {
        Console.WriteLine("Bulk image processing sample application," +
            " using synchronous I/O.");
        Console.WriteLine("Simulates applying a simple " +
            "transformation to {0} \"images.\"", numImages);
        Console.WriteLine("(ie, Sync FileStream benchmark).");
        Console.WriteLine("Warning - this test requires {0} " +
            "bytes of temporary space", (numPixels * numImages * 2));

        if (args.Length == 1)
        {
            processImageRepeats = Int32.Parse(args[0]);
            Console.WriteLine("ProcessImage inner loop  {0}",
                processImageRepeats);
        }

        MakeImageFiles();
        TryToClearDiskCache();
        ProcessImagesInBulk();
        Cleanup();
    }

    [DllImport("KERNEL32", SetLastError = true)]
    private static extern void FlushFileBuffers(IntPtr handle);
}

参照

関連項目

Stream
Stream.Read
Stream.Write
Stream.BeginRead
Stream.BeginWrite
Stream.EndRead
Stream.EndWrite
IAsyncResult
Mutex

その他の技術情報

ファイルおよびストリーム入出力