Lorsque vous vous arrêtez et pensez à la vie quotidienne, vous pouvez facilement tout voir comme un événement. Considérez la séquence suivante :
Nous pourrions continuer encore et encore ici, mais j'ai fait valoir mon point de vue : la vie est une séquence d'événements. Compte tenu de ce fait, comment concevriez-vous un nouveau système logiciel aujourd'hui ? Collecteriez-vous différents résultats et les traiteriez-vous à un intervalle arbitraire ou attendriez-vous jusqu'à la fin de la journée pour les traiter ? Non, vous ne le feriez pas; vous voudriez agir sur chaque événement dès qu'il se produit. Bien sûr, il peut y avoir des cas où vous ne pouvez pas réagir immédiatement à des circonstances individuelles… pensez à obtenir un vidage de la valeur d'une journée de transactions en une seule fois. Mais quand même, vous agiriez dès que vous auriez reçu les données, un événement forfaitaire important si vous voulez.
Alors, comment implémentez-vous un système logiciel pour travailler avec des événements ? La réponse est le traitement de flux.
Devenu la technologie de facto pour traiter les données d'événement, le traitement de flux est une approche du développement logiciel qui considère les événements comme l'entrée ou la sortie principale d'une application. Par exemple, il n'y a aucun sens à attendre pour agir sur des informations ou répondre à un achat potentiellement frauduleux par carte de crédit. D'autres fois, cela peut impliquer la gestion d'un flux entrant d'enregistrements dans un microservice, et leur traitement le plus efficace est le mieux pour votre application.
Quel que soit le cas d'utilisation, il est sûr de dire qu'une approche de streaming d'événements est la meilleure approche pour gérer les événements.
Dans cet article de blog, nous allons créer une application de streaming d'événements à l'aide d'Apache Kafka®, le producteur .NET et les clients consommateurs, et la bibliothèque parallèle de tâches (TPL) de Microsoft. À première vue, vous ne regrouperez peut-être pas automatiquement ces trois éléments en tant que candidats susceptibles de travailler ensemble. Bien sûr, Kafka et les clients .NET forment une excellente paire, mais où TPL s'intègre-t-il dans l'image ?
Le plus souvent, le débit est une exigence clé et afin d'éviter les goulots d'étranglement dus aux inadéquations d'impédance entre la consommation de Kafka et le traitement en aval, nous suggérons généralement la parallélisation en cours chaque fois que l'opportunité se présente.
Lisez la suite pour voir comment les trois composants fonctionnent ensemble pour créer une application de streaming d'événements robuste et efficace. La meilleure partie est que le client Kafka et TPL s'occupent de la plupart des gros travaux ; vous n'avez qu'à vous concentrer sur votre logique métier.
Avant de plonger dans l'application, donnons une brève description de chaque composant.
Si le traitement de flux est la norme de facto pour la gestion des flux d'événements, alors Apache Kafka est la norme de facto pour la création d'applications de streaming d'événements. Apache Kafka est un journal distribué fourni de manière hautement évolutive, élastique, tolérante aux pannes et sécurisée. En un mot, Kafka utilise des courtiers (serveurs) et des clients. Les courtiers forment la couche de stockage distribué du cluster Kafka, qui peut s'étendre sur des centres de données ou des régions cloud. Les clients offrent la possibilité de lire et d'écrire des données d'événement à partir d'un cluster de courtiers. Les clusters Kafka sont tolérants aux pannes : si un courtier tombe en panne, d'autres courtiers prendront le relais pour assurer la continuité des opérations.
J'ai mentionné dans le paragraphe précédent que les clients écrivent ou lisent à partir d'un cluster de courtiers Kafka. Apache Kafka est fourni avec des clients Java, mais plusieurs autres clients sont disponibles, à savoir le producteur et consommateur .NET Kafka, qui est au cœur de l'application dans cet article de blog. Le producteur et le consommateur .NET apportent la puissance du streaming d'événements avec Kafka au développeur .NET. Pour plus d'informations sur les clients .NET, consultez la documentation .
La bibliothèque parallèle de tâches ( TPL ) est "un ensemble de types publics et d'API dans les espaces de noms System.Threading et System.Threading.Tasks", simplifiant le travail d'écriture d'applications simultanées. Le TPL fait de l'ajout de la simultanéité une tâche plus facile à gérer en gérant les détails suivants :
1. Gestion du partitionnement du travail 2. Planification des threads sur le ThreadPool 3. Détails de bas niveau tels que l'annulation, la gestion de l'état, etc.
L'essentiel est que l'utilisation du TPL peut optimiser les performances de traitement de votre application tout en vous permettant de vous concentrer sur la logique métier. Plus précisément, vous utiliserez le sous-ensemble de la bibliothèque de flux de données de la TPL.
La bibliothèque de flux de données est un modèle de programmation basé sur des acteurs qui permet de transmettre des messages en cours de processus et de mettre en pipeline des tâches. Les composants Dataflow s'appuient sur les types et l'infrastructure de planification du TPL et s'intègrent de manière transparente au langage C#. La lecture à partir de Kafka est généralement assez rapide, mais le traitement (un appel DB ou un appel RPC) est généralement un goulot d'étranglement. Toutes les opportunités de parallélisation que nous pouvons utiliser et qui permettraient d'obtenir un débit plus élevé sans sacrifier les garanties de commande méritent d'être prises en considération.
Dans cet article de blog, nous allons exploiter ces composants Dataflow avec les clients .NET Kafka pour créer une application de traitement de flux qui traitera les données dès qu'elles seront disponibles.
Avant d'entrer dans l'application que vous allez créer ; nous devrions donner quelques informations générales sur ce qui compose la bibliothèque de flux de données TPL. L'approche détaillée ici est plus applicable lorsque vous avez des tâches gourmandes en CPU et en E/S qui nécessitent un débit élevé. La bibliothèque de flux de données TPL se compose de blocs qui peuvent mettre en mémoire tampon et traiter des données ou des enregistrements entrants, et les blocs appartiennent à l'une des trois catégories suivantes :
Blocs source - Agissent comme une source de données et d'autres blocs peuvent y lire.
Blocs cibles - Un récepteur de données ou un récepteur, sur lequel d'autres blocs peuvent écrire.
Blocs propagateurs - Se comportent à la fois comme un bloc source et cible.
Vous prenez les différents blocs et les connectez pour former soit un pipeline de traitement linéaire, soit un graphique de traitement plus complexe. Considérez les illustrations suivantes :
La bibliothèque de flux de données fournit plusieurs types de blocs prédéfinis qui se répartissent en trois catégories : mise en mémoire tampon, exécution et regroupement. Nous utilisons les types de mise en mémoire tampon et d'exécution pour le projet développé pour cet article de blog. Le BufferBlock<T> est une structure à usage général qui met les données en mémoire tampon et est idéale pour une utilisation dans les applications producteur/consommateur. Le BufferBlock utilise une file d'attente premier entré, premier sorti pour gérer les données entrantes.
Le BufferBlock (et les classes qui l'étendent) est le seul type de bloc de la bibliothèque de flux de données qui permet d'écrire et de lire directement des messages ; d'autres types s'attendent à recevoir des messages ou à envoyer des messages à des blocs. Pour cette raison, nous avons utilisé un BufferBlock
comme délégué lors de la création du bloc source et de l'implémentation de l'interface ISourceBlock
et du bloc récepteur implémentant l'interface ITargetBlock
.
L'autre type de bloc Dataflow utilisé dans notre application est un TransformBlock <TInput, TOutput> . Comme la plupart des types de blocs dans la bibliothèque de flux de données, vous créez une instance de TransformBlock en fournissant un Func<TInput, TOutput>
pour agir en tant que délégué que le bloc de transformation exécute pour chaque enregistrement d'entrée qu'il reçoit.
Deux fonctionnalités essentielles des blocs Dataflow sont que vous pouvez contrôler le nombre d'enregistrements qu'il mettra en mémoire tampon et le niveau de parallélisme.
En définissant une capacité de mémoire tampon maximale, votre application appliquera automatiquement une contre-pression lorsque l'application rencontre une attente prolongée à un moment donné du pipeline de traitement. Cette contre-pression est nécessaire pour éviter une suraccumulation de données. Ensuite, une fois que le problème disparaît et que la taille du tampon diminue, il consommera à nouveau des données.
La possibilité de définir la simultanéité d'un bloc est essentielle pour les performances. Si un bloc exécute une tâche gourmande en CPU ou en E/S, il y a une tendance naturelle à paralléliser le travail pour augmenter le débit. Mais l'ajout de la simultanéité peut causer un problème de traitement de l'ordre. Si vous ajoutez un thread à la tâche d'un bloc, vous ne pouvez pas garantir l'ordre de sortie des données. Dans certains cas, l'ordre n'a pas d'importance, mais lorsqu'il est important, c'est un compromis important à prendre en compte : un débit plus élevé avec simultanéité par rapport au traitement de la sortie de l'ordre. Heureusement, vous n'avez pas à faire ce compromis avec la bibliothèque de flux de données.
Lorsque vous définissez le parallélisme d'un bloc sur plusieurs, le framework garantit qu'il conservera l'ordre d'origine des enregistrements d'entrée (notez que le maintien de l'ordre avec le parallélisme est configurable, la valeur par défaut étant true). Si l'ordre original des données est A, B, C, alors l'ordre de sortie sera A, B, C. Sceptique ? Je sais que je l'étais, alors je l'ai testé et j'ai découvert qu'il fonctionnait comme annoncé. Nous parlerons de ce test un peu plus loin dans ce post. Notez que l'augmentation du parallélisme ne doit être effectuée qu'avec des opérations sans état ou avec état qui sont associatives et commutatives , ce qui signifie que la modification de l'ordre ou du regroupement des opérations n'affectera pas le résultat.
À ce stade, vous pouvez voir où cela se passe. Vous avez un sujet Kafka représentant les événements que vous devez gérer le plus rapidement possible. Vous allez donc créer une application de streaming composée d'un bloc source avec un .NET KafkaConsumer, des blocs de traitement pour accomplir la logique métier et un bloc récepteur contenant un .NET KafkaProducer pour réécrire les résultats finaux dans un sujet Kafka. Voici une illustration d'une vue de haut niveau de l'application :
L'application aura la structure suivante :
BufferBlock
BufferBlock
Vient ensuite une description du flux global de l'application et quelques points critiques sur l'utilisation de Kafka et de la bibliothèque de flux de données pour créer une puissante application de diffusion d'événements.
Voici notre scénario : vous avez une rubrique Kafka qui reçoit des enregistrements d'achats de votre boutique en ligne, et le format des données entrantes est JSON. Vous souhaitez traiter ces événements d'achat en appliquant l'inférence ML aux détails de l'achat. De plus, vous souhaitez transformer les enregistrements JSON au format Protobuf, car il s'agit du format de données à l'échelle de l'entreprise. Bien sûr, le débit de l'application est essentiel. Les opérations ML sont gourmandes en CPU, vous avez donc besoin d'un moyen de maximiser le débit de l'application, afin de tirer parti de la parallélisation de cette partie de l'application.
Faisons le tour des points critiques de l'application de streaming, en commençant par le bloc source. J'ai déjà mentionné l'implémentation de l'interface ISourceBlock
, et puisque le BufferBlock
implémente également ISourceBlock
, nous l'utiliserons comme délégué pour satisfaire toutes les méthodes d'interface. Ainsi, l'implémentation du bloc source enveloppera un KafkaConsumer et le BufferBlock. À l'intérieur de notre bloc source, nous aurons un thread séparé dont la seule responsabilité est que le consommateur transmette les enregistrements qu'il a consommés dans la mémoire tampon. À partir de là, le tampon transmettra les enregistrements au bloc suivant dans le pipeline.
Avant de transférer l'enregistrement dans la mémoire tampon, ConsumeRecord
(renvoyé par l'appel Consumer.consume
) est encapsulé par une abstraction Record
qui, en plus de la clé et de la valeur, capture la partition et le décalage d'origine, ce qui est essentiel pour l'application - et J'expliquerai pourquoi sous peu. Il convient également de noter que l'ensemble du pipeline fonctionne avec l'abstraction Record
, de sorte que toute transformation se traduit par un nouvel objet Record
enveloppant la clé, la valeur et d'autres champs essentiels comme le décalage d'origine en les préservant tout au long du pipeline.
L'application décompose le traitement en plusieurs blocs différents. Chaque bloc est lié à l'étape suivante de la chaîne de traitement, de sorte que le bloc source est lié au premier bloc, qui gère la désérialisation. Alors que le .NET KafkaConsumer peut gérer la désérialisation des enregistrements, nous demandons au consommateur de transmettre la charge utile sérialisée et de la désérialiser dans un bloc Transform. La désérialisation peut être gourmande en CPU, donc le mettre dans son bloc de traitement nous permet de paralléliser l'opération si nécessaire.
Après la désérialisation, les enregistrements sont transférés dans un autre bloc Transform qui convertit la charge utile JSON en un objet de modèle de données Purchase au format Protobuf. La partie la plus intéressante survient lorsque les données entrent dans le bloc suivant, ce qui représente une tâche gourmande en CPU nécessaire pour mener à bien la transaction d'achat. L'application simule cette partie et la fonction fournie s'endort avec un temps aléatoire compris entre une et trois secondes.
C'est dans ce bloc de traitement simulé que nous exploitons la puissance du framework de blocs Dataflow. Lorsque vous instanciez un bloc Dataflow, vous fournissez une instance déléguée Func qu'il applique à chaque enregistrement qu'il rencontre et une instance ExecutionDataflowBlockOptions
. J'ai déjà mentionné la configuration des blocs Dataflow, mais nous les reverrons rapidement ici. ExecutionDataflowBlockOptions
contient deux propriétés essentielles : la taille maximale de la mémoire tampon pour ce bloc et le degré maximal de parallélisation.
Alors que nous définissons la configuration de la taille de la mémoire tampon pour tous les blocs du pipeline sur 10 000 enregistrements, nous nous en tenons au niveau de parallélisation par défaut de 1, à l'exception de notre utilisation intensive du processeur simulée, où nous l'avons définie sur 4. Notez que la taille de la mémoire tampon du flux de données par défaut est illimité. Nous discuterons des implications sur les performances dans la section suivante, mais pour l'instant, nous allons terminer la présentation de l'application.
Le bloc de traitement intensif est transmis à un bloc de transformation de sérialisation qui alimente le bloc récepteur, qui encapsule ensuite un KafkaProducer .NET et produit les résultats finaux dans un sujet Kafka. Le bloc récepteur utilise également un BufferBlock
délégué et un thread séparé pour la production. Le thread récupère le prochain enregistrement disponible dans la mémoire tampon. Ensuite, il appelle la méthode KafkaProducer.Produce
en transmettant un délégué Action
enveloppant le DeliveryReport
- le thread d'E/S du producteur exécutera le délégué d' Action
une fois la demande de production terminée.
Cela termine la procédure pas à pas de haut niveau de l'application. Maintenant, discutons d'une partie cruciale de notre configuration - comment gérer les décalages de validation - ce qui est vital étant donné que nous canalisons les enregistrements du consommateur.
Lors du traitement des données avec Kafka, vous validerez périodiquement les décalages (un décalage est la position logique d'un enregistrement dans une rubrique Kafka) des enregistrements que votre application a traités avec succès jusqu'à un point donné. Alors, pourquoi engage-t-on les compensations ? C'est une question à laquelle il est facile de répondre : lorsque votre consommateur s'arrête de manière contrôlée ou suite à une erreur, il reprend le traitement à partir du dernier décalage validé connu. En validant périodiquement les décalages, votre consommateur ne retraitera pas les enregistrements ou au moins une quantité minimale si votre application se ferme après avoir traité quelques enregistrements mais avant de valider. Cette approche est connue sous le nom de traitement au moins une fois, ce qui garantit que les enregistrements sont traités au moins une fois, et en cas d'erreurs, certains d'entre eux peuvent être retraités, mais c'est une excellente option lorsque l'alternative est de risquer la perte de données. Kafka fournit également des garanties de traitement une seule fois, et même si nous n'aborderons pas les transactions dans cet article de blog, vous pouvez en savoir plus sur les transactions dans Kafka dans
Bien qu'il existe plusieurs façons de valider des décalages, la plus simple et la plus basique est l'approche de validation automatique. Le consommateur lit les enregistrements et l'application les traite. Après un laps de temps configurable (basé sur les horodatages des enregistrements), le consommateur validera les décalages des enregistrements déjà consommés. Habituellement, la validation automatique est une approche raisonnable ; dans une boucle de processus de consommation typique, vous ne reviendrez pas au consommateur tant que vous n'aurez pas traité avec succès tous les enregistrements précédemment consommés. S'il y avait eu une erreur inattendue ou un arrêt, le code ne revient jamais au consommateur, donc aucune validation ne se produit. Mais dans notre application ici, nous mettons en pipeline - nous prenons les enregistrements consommés et les poussons dans une mémoire tampon et retournons pour en consommer davantage - il n'y a pas d'attente pour un traitement réussi.
Avec l'approche pipeline, comment garantissons-nous un traitement au moins une fois ? Nous allons tirer parti de la méthode IConsumer.StoreOffset
, qui charge un seul paramètre - un TopicPartitionOffset
- et le stocke (avec d'autres décalages) pour le prochain commit. Notez que cette approche de la gestion des décalages contraste le fonctionnement de la validation automatique avec l'API Java.
Ainsi, la procédure de validation fonctionne de cette manière : lorsque le bloc récepteur récupère un enregistrement à produire pour Kafka, il le fournit également au délégué Action. Lorsque le producteur exécute le rappel, il transmet le décalage d'origine au consommateur (la même instance dans le bloc source) et le consommateur utilise la méthode StoreOffset. La validation automatique est toujours activée pour le consommateur, mais vous fournissez les décalages à valider au lieu de demander au consommateur de valider aveuglément les derniers décalages qu'il a consommés jusqu'à présent.
Ainsi, même si l'application utilise le pipelining, elle ne s'engage qu'après avoir reçu un accusé de réception du courtier, ce qui signifie que le courtier et l'ensemble minimum de courtiers de répliques ont stocké l'enregistrement. Travailler de cette manière permet à l'application de progresser plus rapidement car le consommateur peut continuellement récupérer et alimenter le pipeline pendant que les blocs effectuent leur travail. Cette approche est possible car le client consommateur .NET est thread-safe (certaines méthodes ne le sont pas et sont documentées comme telles). Nous pouvons donc faire en sorte que notre seul consommateur fonctionne en toute sécurité dans les threads de bloc source et récepteur.
Pour toute erreur au cours de l'étape de production, l'application enregistre l'erreur et replace l'enregistrement dans le BufferBlock
imbriqué afin que le producteur réessaye d'envoyer l'enregistrement au courtier. Mais cette logique de nouvelle tentative est effectuée aveuglément et, en pratique, vous souhaiterez probablement une solution plus robuste.
Maintenant que nous avons couvert le fonctionnement de l'application, examinons les chiffres de performance. Tous les tests ont été exécutés localement sur un ordinateur portable macOS Big Sur (11.6), votre kilométrage peut donc varier dans ce scénario. La configuration du test de performance est simple :
Produisez 1 million d'enregistrements dans un sujet Kafka au format JSON. Cette étape a été effectuée à l'avance et n'a pas été incluse dans les mesures du test.
Démarrez l'application compatible Kafka Dataflow et définissez la parallélisation sur tous les blocs sur 1 (valeur par défaut).
L'application s'exécute jusqu'à ce qu'elle ait traité avec succès 1 million d'enregistrements, puis elle s'arrête
Enregistrer le temps qu'il a fallu pour traiter tous les enregistrements
La seule différence pour le deuxième tour était de définir le MaxDegreeOfParallelism pour le bloc simulé gourmand en CPU à quatre.
Voici les résultats:
Nombre d'enregistrements | Facteur de simultanéité | Temps (minutes) |
---|---|---|
1M | 1 | 38 |
1M | 4 | 9 |
Ainsi, en définissant simplement une configuration, nous avons considérablement amélioré le débit tout en maintenant l'ordre des événements. Ainsi, en activant un degré maximum de parallélisme à quatre, nous obtenons l'accélération attendue d'un facteur supérieur à quatre. Mais la partie critique de cette amélioration des performances est que vous n'avez pas écrit de code concurrent, ce qui serait difficile à faire correctement.
Plus tôt dans le billet de blog, j'ai mentionné un test pour valider que la simultanéité avec les blocs Dataflow préserve l'ordre des événements, alors parlons-en maintenant. L'essai comportait les étapes suivantes :
Produire des entiers 1M (0-999,999) dans un sujet Kafka
Modifier l'application de référence pour travailler avec des types entiers
Exécutez l'application avec un niveau de simultanéité de un pour le bloc de processus distant simulé - produire vers un sujet Kafka
Réexécutez l'application avec un niveau de simultanéité de quatre et produisez les numéros vers un autre sujet Kafka
Exécutez un programme pour consommer les nombres entiers des deux rubriques de résultats et les stocker dans un tableau en mémoire
Comparez les deux tableaux et confirmez qu'ils sont dans le même ordre
Le résultat de ce test était que les deux tableaux contenaient les entiers dans l'ordre de 0 à 999 999, ce qui prouve que l'utilisation d'un bloc Dataflow avec un niveau de parallélisme supérieur à un maintenait l'ordre de traitement des données entrantes. Vous trouverez des informations plus détaillées sur le parallélisme Dataflow dans la documentation .
Dans cet article, nous avons expliqué comment utiliser les clients .NET Kafka et la bibliothèque parallèle de tâches pour créer une application de streaming d'événements robuste et à haut débit. Kafka fournit un streaming d'événements hautes performances et la bibliothèque parallèle de tâches vous fournit les éléments de base pour créer des applications simultanées avec une mise en mémoire tampon pour gérer tous les détails, permettant aux développeurs de se concentrer sur la logique métier. Bien que le scénario de l'application soit un peu artificiel, j'espère que vous pourrez voir l'utilité de combiner les deux technologies. Essaie-
Également publié ici.