Introduction à TPL DataFlow en VB

Créer un réseau de communication Asynchrone (Pattern Pipeline)

Téléchargez le tutorial et la source

Article par Eric Vernié Microsoft France Division Plate-forme & Ecosystème

Je rencontre les développeurs pour leur faire partager ma passion pour le développement autour des technologies .NET sans oublier le monde natif autour des Apis Win32.

J'ai également en charge la préparation de certains contenus pour des séminaires tel que les TechDays. Retrouvez moi sur les centres de développement Visual C++, Le Framework .NET, ainsi que le site sécurité.

Sommaire de l'article

  • Introduction
  • Tour d'horizon de TPL DataFlow
  • Pré requis
  • Utilisation de TPL DataFlow
  • Conclusion

Introduction

La librairie **Task Parallel Library (TPL)**introduit dans le Framework .NET 4, fournit les fonctionnalités et les algorithmes de bases pour l'asynchronisme et le développement parallèle. Cette librairie disponible dans l'espace de nom System.Threading.Task, permet de s'affranchir de la complexité apportée lors du développement à base de threads. Elle apporte des constructions de haut niveaux pour mettre en œuvre un sous ensemble de modèles commun au développement parallèle, tel que Parallel.for/ForEach, pour  paralléliser des boucles, la notion de tâches, et de planificateur de tâches dit voleur de tâches pour la performance, la gestion des exceptions, des arrêts en "douceur" d'une tâche, et d'autres modèles prédéfinis qui simplifient la modélisation d'une application parallèle.

Note : Peut importe la manière d'implémenter son application et les librairies utilisées (même si elles peuvent vous aider dans ce sens), il faut avant toute chose penser parallèle, cela doit devenir une seconde nature. En effet, en tant que développeur, lorsque nous avons à développer une application, nous nous penchons principalement en premier lieu sur le problème à résoudre dans notre domaine d’activité. Il en résulte que nos structures de données, nos algorithmes et autres constructions, ont été principalement pensées en mode séquentiel, ce qui rendra la programmation parallèle d’autant plus difficile par la suite.
Penser parallèle c'est avant tout décomposer son application.
Décomposition par tâche**
Dans certain cas le problème à résoudre peut se décomposer en plusieurs tâches relativement indépendantes, si tel n’est pas le cas le développeur devra s’orienter alors sur la décomposition de données. Bien évidement comme nous le verrons dans la suite de cet article, un mixte des deux est aussi envisageable.**
Que l’on commence par une décomposition de taches ou de données, l’algorithme parallèle exécutera des taches concurrentielles, il est donc important de les identifier à cette étape.**
Un exemple de décomposition par taches, consiste par exemple, dans un traitement texte, en une impression en arrière plan et le comptage des pages lors du chargement.

La clé de la décomposition par tâches est de s’assurer qu’elles soient suffisamment indépendantes les unes des autres, et de s’assurer également qu’elles puissent être réparties correctement sur un ensemble de cœurs. C'est un des points ou l'utilisation de la TPL peut vous aider efficacement grâce à son planificateur de tâches.

La première passe est d’identifier le plus de tâches possibles, même si on doit en fusionner plus tard. Chacune des tâches devant faire assez de travail pour compenser la surcharge de leur création et de leur gestion.
Les Tâches peuvent être trouvées à différents endroits du code
Une tâche peut correspondre à une fonctionnalité de l’application => Décomposition Fonctionnelle
Dans les itérations des boucles.
Les tâches jouent également un rôle lors de la décomposition de grosse structure de données, ou elles travaillent chacune indépendamment des autres sur une portion des données.
Décomposition des données :**
Pour écrire un algorithme parallèle, les deux décompositions ont besoin d’être adressée. La question n’est donc pas de savoir laquelle il faut utiliser, mais plutôt par laquelle il faut commencer.
On choisira alors de débuter par la décomposition de données si les points suivants sont vrais :
Le problème tourne autour d’une structure de données importante
Des opérations similaires peuvent être fait sur différentes parties de la structure de données, à partir du moment où les différentes parties, sont relativement indépendante

Si on souhaite développer une application parallèle efficace, il est donc important d’introduire dés le début de la phase de conception, une réflexion sur le parallélisme, afin d’identifier en amont, les parties qui pourront être concurrentielles et si cela est justifié. L’application doit pouvoir s’adapter automatiquement, par exemple au nombre de processeurs de la machine (qui sont en constante évolution).
La décomposition, permet également d'identifier des tâches qui auraient des dépendances entre elles (Partage de la mémoire, principal source de bogues en programmation parallèle), afin de les regrouper, de les ordonner efficacement, de mieux partager les données, donc d’identifier les verrous (source de problèmes de performances).
Enfin, il est important d'évaluer et de mesurer l'application, afin d'ajuster de façon fine la manière dont l'application va se comporter.

Comme je le disais plus haut, il peut également arriver, que nous ayons besoins de faire un mixte des deux, et mettre en place une structure algorithmique organisée par flux de données. Une des structures, plus connue sous le nom de modèle en pipeline, ce base sur le concept d'agents ou d'acteurs, ou chaque agent implémente une étape particulière du pipeline et informe les autres agents par envoi de messages.

L'exemple le plus courant que nous prenons et la ligne d'assemblage d'une voiture, ou chaque ouvrier à une tâche bien particulière, mais travaillant simultanément, sur une voiture différente.
La décomposition par tâches, c'est par exemple :

  • Un ouvrier soude le pare-chocs
  • Un ouvrier soude les ailes
  • Un ouvrier soude le toit
  • Un ouvrier pause le pare-brise
  • Etc..

Après une phase d'évaluation, peut être que le plus efficace c'est qu'un seul ouvrier soude à la fois les ailes et le pare-chocs.
La décomposition par données, ce sont les éléments de structure qui composent la voiture.
Sur la photo, une des lignes d'assemblage de la carrosserie, se sépare en deux, afin de monter par exemple des options supplémentaires, puis elle se joint de nouveau pour la finition.
Ce n'est sans doute pas exactement comme cela que ça se passe, mais vous avez le principe.

01**
Diagram by Bruno Boucard - Techday 2011**

La difficulté en programmation, vient du fait que chaque agents doivent dialoguer et se synchroniser. L'envoi de message, comme mentionné précédemment, est une réponse à cette difficulté afin d'éviter de partager inutilement des données en mémoire (rappelez vous c'est source de bogues). Néanmoins, la TPL, bien que l'on puisse y recourir pour implémenter ce type de structure algorithmique, ne fournit pas les éléments pour le faire aisément. C'est la que rentre en jeux la bibliothèque TPL DataFlow.

Tour d’horizon de TPL DataFlow

TPL Dataflow (TDF), a été conçu au dessus de TPL, c'est un ensemble de primitives qui permet d'adresser le type de scénarios vu plus haut et qui vient combler un vide de la TPL.

Décomposition du code Modèle de programmation Parallèle
Visual Studio 2010
Données Task Parallel Library

Parallel LINQ
Tâches Task Parallel Library
Flux de données TPL DataFlow

TDF, repose sur des principes vieux de dix ans, et a été fortement inspirée par les librairies CCR de Microsoft Robotics Studio, et par l'Asynchronous Agent Library de Visual C++ 2010.
Elle implémente des blocs de flux de données, qui permettent de communiquer par messages, ces messages contiendront une copie des données à partager. Ces blocs, peuvent être mis en liaisons afin de créer des réseaux de communication.

Il existe 3 catégories de blocs de flux de données.

  • Les blocs Tampon, blocs qui permettent de stocker dans un ordre FIFO des messages.
    BufferBlock(Of T) le bloc le plus fondamental, qui permet de poster ou de recevoirune instance de T.
    Dim bb As New BufferBlock(Of String)
    bb.Post("Bonjour")
    bb.Post("Le monde")
    Dim ab As New ActionBlock(Of String)(Sub(message)
                                             Console.WriteLine(message)
                                         End Sub)
    bb.LinkTo(ab)

    Ici on lie le bloc ActionBlock au BufferBlock bb.LinkTo(ab), puis on poste des données, bb.Post("Bonjour") qui seront reçu par le bloc action.

    BroadcastBlock(Of T), qui comme son nom l'indique, permet de diffuser un message à plusieurs autres blocs.
    Dim bcb As New BroadcastBlock(Of Integer)(Nothing)
    Dim ab1 As New ActionBlock(Of Integer)(Sub(message)
                                               Console.WriteLine(message)
                                           End Sub)
    Dim ab2 As New ActionBlock(Of Integer)(Sub(message)
                                               Console.WriteLine(message)
                                           End Sub)
    bcb.LinkTo(ab1)
    bcb.LinkTo(ab1)
    For i As Integer = 0 To 3
        bcb.SendAsync(i)
    Next
    Le bloc BrodcastBlock est lié aux deux blocs actions ab1, et ab2, les deux blocs recevront chacun une copie des données.

  • WriteOnceBlock(Of T), bloc le plus simple, qui ne peut stocker qu'une seule valeur et qui est immuable.

  • Les blocs Action, qui permettent l'exécution d'un délégué, afin d'exécuter une action particulière sur les données entrante
    ActionBlock(Of TInput), pour exécuter une action particulière.
    Dim ab As New ActionBlock(Of Integer)(Sub(message)
                                              Console.WriteLine(message)
                                          End Sub)

    For i As Integer = 0 To 9
        ab.SendAsync(i)
        If i = 5 Then ab.Complete()
    Next
    ab.Completion.Wait()

    Il est possible d'attendre qu'un bloc est fini de traiter les données qu'on lui envoi ab.Completion.Wait(). Il est également possible d'indiquer que le bloc ne doit plus traiter les messages entrant ab.Complete().

    TransformBlock(Of TInput, Of TOuput) et TransformManyBlock(Of TInput, Of TOuput) , qui permettent la transformation d'une donnée en entrée TInput, en donnée de sortie TOutput.
    Dim bb As New BufferBlock(Of Integer)()
    Dim ab As New ActionBlock(Of String)(Sub(message)
                                             Console.WriteLine(message)
                                         End Sub)

    Dim tb As New TransformBlock(Of Integer, String)(Function(message)
                                                    Return "#" + message.ToString()
                                                     End Function)
    tb.LinkTo(ab)
    bb.LinkTo(tb)
    For i As Integer = 0 To 9
        bb.Post(i)
    Next
    Le bloc TransformBlock, transforme ici un integer en String. Vous noterez que nous avons déjà commencé à créer un pipeline, car le bloc de transformation est lié au bloc BufferBlockbb.LinkTo(tb) et que le bloc action est lié au bloc de transformation tb.LinkTo(ab).

  • Les blocs de Jointure, qui permettent de combiner et de grouper plusieurs messages provenant de blocs différents
    BatchBlock(Of T), combine N éléments simple et envoi N éléments simultanément.
    Dim options As New ExecutionDataflowBlockOptions()
    options.TaskScheduler = TaskScheduler.Current
    options.MaxDegreeOfParallelism = Environment.ProcessorCount
    Dim bb As New BatchBlock(Of String)(2)
    Dim ab As New ActionBlock(Of String())(Sub(message)
                                            For Each s As String In message
                                              Console.WriteLine(s + " ID : " +
    Task.CurrentId.Value.ToString())
                                            Next
                                           End Sub, options)

    bb.LinkTo(ab)
    For i As Integer = 0 To 9
        bb.Post(String.Format("Message #{0}", i))
    Next
    Ici nous indiquons au bloc BatchBlock de combiner 2 éléments BatchBlock(Of String)(2)
    Vous remarquerez également, que les blocs, peuvent prendre comme paramètre un type ExecutionDataflowBlockOptions qui définit les options du flux de données par exemple.
    Le planificateur à utiliser (TaskScheduler), le degré de parallélisme souhaité, le nombre de messages par Tâches, etc.
    Dim options As New ExecutionDataflowBlockOptions()
    options.TaskScheduler = TaskScheduler.Current
    options.MaxDegreeOfParallelism = Environment.ProcessorCount
    options.MaxMessagesPerTask = 10
    options.CancellationToken = token

    JoinBlock, est capable de grouper les données provenant de multiple messages.
    Dim bb1 As New BufferBlock(Of String)
    Dim bb2 As New BufferBlock(Of Integer)
    Dim jb As New JoinBlock(Of String, Integer)
    bb1.LinkTo(jb.Target1)
    bb2.LinkTo(jb.Target2)
    Dim ab = New ActionBlock(Of Tuple(Of String, Integer))(Sub(message)
                                                        Console.WriteLine(message)                                                       End Sub)
    jb.LinkTo(ab)
    For i As Integer = 0 To 9
       bb1.Post(String.Format("bb1 {0}", i))
       bb2.Post(i)
    Next

    BatchedJoinBlock, c'est la combinaison des deux précédents blocs

Pour de plus amples informations sur TDF, reportez-vous à l’excellent article de StephenToubIntroduction to TPL DataFlow
Maintenant que nous avons rapidement parcourus TDF, nous allons l'utiliser, dans un exemple concret.
L'exemple classique que je vous propose ici, consiste à créer un réseau de communication entre différents blocs, afin de :

  1. Parcourir le disque en fonction d’un chemin spécifique
  2. Ouvrir le fichier en flux de données
  3. Crypter les flux de données
  4. Copier les fichiers dans un répertoire destination

Pré requis

Pour réaliser les exemples de cet article il vous faut :

Utilisation de TPL DataFlow

Une fois installé, vous retrouverez les binaires et la documentation de Visual Studio Async CTP Refresh sur C:\Users\[VOUS]\Documents\Microsoft Visual Studio Async CTP. Ce que nous allons faire maintenant, c'est créer la structure d'un projet, afin de référencer correctement les librairies.

  1. Dans une nouvelle application console, ajoutez comme référence les librairies AsyncCTPLibrary.dll, et System.Threading.Tasks.Dataflow.dll
    02

  2. Puis la librairie System.ServiceModel.dll.

  3. Ajoutez les imports suivants :
    Imports System.Threading
    Imports System.Threading.Tasks
    Imports System.Threading.Tasks.Dataflow
    Imports System.Security.Cryptography
    Imports System.IO

  4. Nous allons maintenant, utiliser nos deux premiers blocs.
    Un BufferBlock, que nous alimenterons avec la liste des fichiers parcourus.
    Un ActionBlock, qui sera lié au BufferBlock et qui aura pour simple rôle d'afficher le nom et le chemin des fichiers.
    Dim browserBlock As New BufferBlock(Of String)()
    Dim ab As New ActionBlock(Of String)(Sub(messageEntrant)
                                         'Le block reçoit le message et l'affiche
                                          Console.WriteLine(messageEntrant)
                                        End Sub)
    'Liaison du BufferBlock à l'ActionBlock
    browserBlock.LinkTo(ab)
    'Démarre l'envoi des données dans une nouvelle tâche
    TaskEx.Run(Sub()
               'Parcours des fichiers
               Dim enumFiles = Directory.EnumerateFiles("g:\temp", "*.*",
    SearchOption.AllDirectories)
        For Each filePath As String In enumFiles
                      'Je poste le message
                      browserBlock.Post(filePath)
                   Next
              End Sub)
    L'ActionBlock étant lié au BufferBlock (méthode LinkTo()), il recevra tous les messages postés sur le BufferBlock. Pour éviter que l'ActionBlock n'ai à traiter les messages d'un seul coup, nous démarrons le parcours de fichiers dans une tâche indépendante TaskEx.Run() (Méthode apporté par la CTP async). Pour que le chaînage fonctionne, il faut que les blocs liées entre eux manipule le ou les mêmes types de paramètres. En l'occurrence ici un type String.

  5. Maintenant nous allons corser un peu la difficulté. Ce que nous souhaitons, c'est pouvoir ouvrir le fichier dans un flux de données, transformer le message type chaîne de caractères en flux de données, tout en affichant en même temps les chemins de fichiers. Pour ce faire, nous allons utiliser le bloc TransformBlock pour transformer un type String en Stream, et le block BroadcastBlock pour diffuser le message à plus d'un block. Nous garderons notre bloc ActionBlockpour afficher les chemins des fichiers.
    Dim browserBroadCastBlock As New BroadcastBlock(Of String)(Nothing)
    Dim ab As New ActionBlock(Of String)(Sub(messageEntrant)
                                         'Le block reçoit le message et l'affiche
                                          Console.WriteLine(messageEntrant)
                                          End Sub)

    Dim transformBlock As New TransformBlock(Of String, Stream)
    (Function(messageEntrant)
                                              Return New FileStream(messageEntrant,                                                                 FileMode.Open)
                                             End Function)
    'Liaison des deux blocks avec le BroadcastBlock
    browserBroadCastBlock.LinkTo(ab)
    browserBroadCastBlock.LinkTo(transformBlock)
    'Démarre l'envoi des données dans une nouvelle tâche
    TaskEx.Run(Sub()
              'Parcours des fichiers
             Dim enumFiles = Directory.EnumerateFiles("g:\temp", "*.*",
    SearchOption.AllDirectories)
              For Each filePath As String In enumFiles
                 'Je poste le message
                 browserBroadCastBlock.Post(filePath)
              Next
      End Sub)

    Le BroadcastBlock est lié aux blocs Action et Transformation. Le bloc de transformation reçoit le message contenant le chemin d'accès au fichier, ouvre le fichier et retourne le flux de données. L'instruction returnici agit comme l'instruction Post() et poste un message à qui veut bien le traiter. Pour l'instant comme aucun autre bloc n'est lié au bloc de transformation, le flux est perdu, il retourne le flux dans le vide et surtout ce flux n'est pas fermé!!!

  6. A cette étape, nous créons notre réseau de communication, avec deux blocs Action supplémentaires, l'un pour crypter et l'autre pour copier les fichiers. Vous noterez que nous employons désormais des types Tuple(Of) afin de faire voyager des messages plus complexes. Le principe de chaînage des blocs reste le même qu'a l'étape 5, à l'exception du faite, que notre bloc transformation, transforme un type String en un type Tuple(Of Stream,String), et ceci afin de faire transiter un message plus complexe, qui contient plus d'information. En l'occurrence ici, le flux de données du fichier, ainsi que son nom que nous utiliserons pour la copie de fichiers.
    'Définir les blocs
    Dim browserBroadCastBlock As New BroadcastBlock(Of String)(Nothing)
    Dim abAffichageMessage As New ActionBlock(Of String)
    (Sub(messageEntrant)
                                                  Console.WriteLine(messageEntrant)
                                              End Sub)
    Dim transformBlock As New TransformBlock(Of String, Tuple(Of Stream, String))
    (Function(messageEntrant)
    Dim fs As New FileStream(messageEntrant, FileMode.Open)
    fs.Position = 0
    Dim messageSortant = New Tuple(Of Stream, String)(fs, messageEntrant)
        Return messageSortant
    End Function)
    Dim bufferFinblock As New BufferBlock(Of Tuple(Of Stream, String))
    'Bloc cryptage des données
    Dim abCryptage As New ActionBlock(Of Tuple(Of Stream, String))
    (Sub(messageEntrant)
                      Dim cryptedStream As Stream = Encrypt(messageEntrant.Item1)
                      cryptedStream.Position = 0
                      messageEntrant.Item1.Close()
                     Dim messageSortant As New Tuple(Of Stream, String)
    (cryptedStream, messageEntrant.Item2)
                     bufferFinblock.Post(messageSortant)
                    End Sub)
    'bloc copie de fichiers
    Dim abCopieFichier As New ActionBlock(Of Tuple(Of Stream, String))
    (Sub(messageEntrant)
      Dim filePath As String = FormatAndCreateFilePath(messageEntrant.Item2, "g:\crypter")
      Dim fs As New FileStream(filePath, FileMode.Create)
      Console.WriteLine(filePath)
      messageEntrant.Item1.CopyToAsync(fs)
      fs.Flush()
    End Sub)
    'création du réseau de communication
    browserBroadCastBlock.LinkTo(abAffichageMessage)
    browserBroadCastBlock.LinkTo(transformBlock)
    transformBlock.LinkTo(abCryptage)
    bufferFinblock.LinkTo(abCopieFichier)

    'démarrer le pipeline
    TaskEx.Run(
    Sub()
    Dim enumFiles As IEnumerable(Of String) = Directory.EnumerateFiles("G:\temp", "*.*",
     SearchOption.AllDirectories)
      Dim filePath As String
      For Each filePath In enumFiles
            browserBroadCastBlock.Post(filePath)
      Next
    End Sub)
    Les méthodes Encrypt et FormatAndCreatePath, sont fournies avec le code complet en téléchargement avec cet article.

Conclusion

Vous venez de voir très succinctement la manière de créer un réseau de communication, entre différent bloc afin d'éviter le partage des données en mémoire. Néanmoins, cette façon de faire reste encore très primitive, car il manque beaucoup de choses. Par exemple, la possibilité d'arrêter à tout moment la communication entre chaque bloc, la gestion des erreurs, la gestion d'une sentinelle, qui signale la fin du traitement des fichiers. Comme il n'était pas possible de tout traiter dans cet article, vous retrouverez en téléchargement une ébauche de projet qui gère ces aspects.