Vous êtes-vous déjà demandé comment certaines de vos applications préférées gèrent les mises à jour en temps réel ? Résultats sportifs en direct, tickers boursiers ou même notifications sur les réseaux sociaux : ils s'appuient tous sur une architecture événementielle (EDA) pour traiter les données instantanément. EDA, c'est comme avoir une conversation où chaque nouvel élément d'information déclenche une réponse immédiate. C'est ce qui rend une application plus interactive et réactive.
Dans cette procédure pas à pas, nous vous guiderons dans la création d'une application simple basée sur les événements à l'aide d'Apache Kafka sur Heroku. Nous couvrirons :
Apache Kafka est un outil puissant pour créer des systèmes EDA. Il s'agit d'une plate-forme open source conçue pour gérer les flux de données en temps réel. Apache Kafka sur Heroku est un module complémentaire Heroku qui fournit Kafka en tant que service. Heroku facilite le déploiement et la gestion d'applications, et je l'utilise davantage dans mes projets récemment. La combinaison de Kafka avec Heroku simplifie le processus de configuration lorsque vous souhaitez exécuter une application basée sur des événements.
À la fin de ce guide, vous disposerez d'une application en cours d'exécution qui démontre la puissance d'EDA avec Apache Kafka sur Heroku. Commençons!
Avant de plonger dans le code, passons rapidement en revue quelques concepts fondamentaux. Une fois que vous les aurez compris, il sera plus facile de suivre.
Nous allons créer une application Node.js en utilisant la bibliothèque KafkaJS . Voici un bref aperçu du fonctionnement de notre application :
Nos capteurs météorologiques (les producteurs) généreront périodiquement des données, telles que la température, l'humidité et la pression barométrique, et enverront ces événements à Apache Kafka. À des fins de démonstration, les données seront générées aléatoirement.
Nous aurons un consommateur qui écoutera les sujets. Lorsqu'un nouvel événement est reçu, il écrira les données dans un journal.
Nous déploierons l'intégralité de la configuration sur Heroku et utiliserons les journaux Heroku pour surveiller les événements au fur et à mesure qu'ils se produisent.
Avant de commencer, assurez-vous d'avoir les éléments suivants :
La base de code de l'ensemble de ce projet est disponible dans ce référentiel GitHub . N'hésitez pas à cloner le code et à suivre tout au long de cet article.
Maintenant que nous avons couvert les bases, configurons notre cluster Kafka sur Heroku et commençons à le construire.
Mettons tout en place sur Heroku. C'est un processus assez rapide et facile.
~/project$ heroku login
~/project$ heroku create weather-eda
(J'ai nommé mon application Heroku weather-eda
, mais vous pouvez choisir un nom unique pour votre application.)
~/project$ heroku addons:create heroku-kafka:basic-0 Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month) The cluster should be available in a few minutes. Run `heroku kafka:wait` to wait until the cluster is ready. You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka kafka-adjacent-07560 is being created in the background. The app will restart when complete... Use heroku addons:info kafka-adjacent-07560 to check creation progress Use heroku addons:docs heroku-kafka to view documentation
Vous pouvez trouver plus d'informations sur le module complémentaire Apache Kafka sur Heroku ici . Pour notre démo, j'ajoute le niveau Basic 0 du module complémentaire. Le coût du module complémentaire est de 0,139 $/heure. Au cours de la création de cette application de démonstration, j'ai utilisé le module complémentaire pendant moins d'une heure, puis je l'ai ralenti.
Il faut quelques minutes à Heroku pour que Kafka soit lancé et prêt pour vous. Très bientôt, voici ce que vous verrez :
~/project$ heroku addons:info kafka-adjacent-07560 === kafka-adjacent-07560 Attachments: weather-eda::KAFKA Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time) Max Price: $100/month Owning app: weather-eda Plan: heroku-kafka:basic-0 Price: ~$0.139/hour State: created
Une fois notre cluster Kafka lancé, nous devrons obtenir des informations d'identification et d'autres configurations. Heroku crée plusieurs variables de configuration pour notre application, en les remplissant avec les informations du cluster Kafka qui vient d'être créé. Nous pouvons voir toutes ces variables de configuration en exécutant ce qui suit :
~/project$ heroku config === weather-eda Config Vars KAFKA_CLIENT_CERT: -----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... -----END CERTIFICATE----- KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3 ... -----END RSA PRIVATE KEY----- KAFKA_PREFIX: columbia-68051. KAFKA_TRUSTED_CERT: -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4= -----END CERTIFICATE----- KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096
Comme vous pouvez le voir, nous avons plusieurs variables de configuration. Nous aurons besoin d'un fichier dans le dossier racine de notre projet appelé .env
avec toutes ces valeurs de variable de configuration. Pour ce faire, nous exécutons simplement la commande suivante :
~/project$ heroku config --shell > .env
Notre fichier .env
ressemble à ceci :
KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----" KAFKA_PREFIX="columbia-68051." KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"
Nous veillons également à ajouter .env à notre fichier .gitignore. Nous ne voudrions pas confier ces données sensibles à notre référentiel.
La CLI Heroku n'est pas fournie directement avec les commandes liées à Kafka. Puisque nous utilisons Kafka, nous devrons installer le plugin CLI .
~/project$ heroku plugins:install heroku-kafka Installing plugin heroku-kafka... installed v2.12.0
Désormais, nous pouvons gérer notre cluster Kafka depuis la CLI.
~/project$ heroku kafka:info === KAFKA_URL Plan: heroku-kafka:basic-0 Status: available Version: 2.8.2 Created: 2024-05-27T18:44:38.023+00:00 Topics: [··········] 0 / 40 topics, see heroku kafka:topics Prefix: columbia-68051. Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor) Messages: 0 messages/s Traffic: 0 bytes/s in / 0 bytes/s out Data Size: [··········] 0 bytes / 4.00 GB (0.00%) Add-on: kafka-adjacent-07560 ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40)
Juste pour vérifier la cohérence, jouons avec notre cluster Kafka. On commence par créer un sujet.
~/project$ heroku kafka:topics:create test-topic-01 Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done Use `heroku kafka:topics:info test-topic-01` to monitor your topic. Your topic is using the prefix columbia-68051.. ~/project$ heroku kafka:topics:info test-topic-01 ▸ topic test-topic-01 is not available yet
En une minute environ, notre sujet devient disponible.
~/project$ heroku kafka:topics:info test-topic-01 === kafka-adjacent-07560 :: test-topic-01 Topic Prefix: columbia-68051. Producers: 0 messages/second (0 bytes/second) total Consumers: 0 bytes/second total Partitions: 8 partitions Replication Factor: 3 Compaction: Compaction is disabled for test-topic-01 Retention: 24 hours
Ensuite, dans cette fenêtre de terminal, nous agirons en tant que consommateur, écoutant ce sujet en le suivant.
~/project$ heroku kafka:topics:tail test-topic-01
A partir de là, le terminal attend simplement les événements publiés sur le sujet.
Dans une fenêtre de terminal séparée, nous agirons en tant que producteur et publierons quelques messages sur le sujet.
~/project$ heroku kafka:topics:write test-topic-01 "hello world!"
De retour dans la fenêtre du terminal de notre consommateur, voici ce que nous voyons :
~/project$ heroku kafka:topics:tail test-topic-01 test-topic-01 0 0 12 hello world!
Excellent! Nous avons produit et consommé avec succès un événement sur un sujet de notre cluster Kafka. Nous sommes prêts à passer à notre application Node.js. Détruisons ce sujet de test pour garder notre terrain de jeu bien rangé.
~/project$ heroku kafka:topics:destroy test-topic-01 ▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda ▸ To proceed, type weather-eda or re-run this command with --confirm weather-eda > weather-eda Deleting topic test-topic-01... done Your topic has been marked for deletion, and will be removed from the cluster shortly ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40).
Pour préparer notre application à utiliser Kafka, nous devrons créer deux éléments : un sujet et un groupe de consommateurs.
Créons le sujet que notre application utilisera.
~/project$ heroku kafka:topics:create weather-data
Ensuite, nous allons créer le groupe de consommateurs dont le consommateur de notre application fera partie :
~/project$ heroku kafka:consumer-groups:create weather-consumers
Nous sommes prêts à créer notre application Node.js !
Initialisons un nouveau projet et installons nos dépendances.
~/project$ npm init -y ~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty
Notre projet comportera deux processus en cours :
consumer.js
, qui est abonné au sujet et enregistre tous les événements publiés.
producer.js
, qui publiera des données météorologiques aléatoires sur le sujet toutes les quelques secondes.
Ces deux processus devront utiliser KafkaJS pour se connecter à notre cluster Kafka, nous allons donc modulariser notre code pour le rendre réutilisable.
Dans le dossier src
du projet, nous créons un fichier appelé kafka.js
. Cela ressemble à ceci :
const { Kafka } = require('kafkajs'); const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' )) const TOPIC = `${process.env.KAFKA_PREFIX}weather-data` const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers` const kafka = new Kafka({ clientId: 'weather-eda-app-nodejs-client', brokers: BROKER_URLS, ssl: { rejectUnauthorized: false, ca: process.env.KAFKA_TRUSTED_CERT, key: process.env.KAFKA_CLIENT_CERT_KEY, cert: process.env.KAFKA_CLIENT_CERT, }, }) const producer = async () => { const p = kafka.producer() await p.connect() return p; } const consumer = async () => { const c = kafka.consumer({ groupId: CONSUMER_GROUP, sessionTimeout: 30000 }) await c.connect() await c.subscribe({ topics: [TOPIC] }); return c; } module.exports = { producer, consumer, topic: TOPIC, groupId: CONSUMER_GROUP };
Dans ce fichier, nous commençons par créer un nouveau client Kafka. Cela nécessite des URL pour les courtiers Kafka, que nous pouvons analyser à partir de la variable KAFKA_URL
dans notre fichier .env
(qui provenait à l'origine de l'appel de heroku config). Pour authentifier la tentative de connexion, nous devons fournir KAFKA_TRUSTED_CERT
, KAFKA_CLIENT_CERT_KEY
et KAFKA_CLIENT_CERT
.
Ensuite, à partir de notre client Kafka, nous créons un producer
et un consumer
, en veillant à bien abonner nos consommateurs au sujet weather-data
.
Notez dans kafka.js
que nous ajoutons KAFKA_PREFIX
au nom de notre sujet et de notre groupe de consommateurs. Nous utilisons le plan Basic 0 pour Apache Kafka sur Heroku, qui est un plan Kafka multi-tenant. Cela signifie que nous travaillons avec un KAFKA_PREFIX
. Même si nous avons nommé notre sujet weather-data
et notre groupe de consommateurs weather-consumers
, leurs noms réels dans notre cluster Kafka multi-tenant doivent avoir le préfixe KAFKA_PREFIX
(pour garantir qu'ils sont uniques).
Donc, techniquement, pour notre démo, le nom réel du sujet est columbia-68051.weather-data
, et non weather-data
. (De même pour le nom du groupe de consommateurs.)
Maintenant, créons notre processus d'arrière-plan qui agira en tant que producteur de capteurs météorologiques. Dans le dossier racine de notre projet, nous avons un fichier appelé producer.js
. Cela ressemble à ceci :
require('dotenv').config(); const kafka = require('./src/kafka.js'); const { faker } = require('@faker-js/faker'); const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05']; const MAX_DELAY_MS = 20000; const READINGS = ['temperature','humidity','barometric_pressure']; const MAX_TEMP = 130; const MIN_PRESSURE = 2910; const PRESSURE_RANGE = 160; const getRandom = (arr) => arr[faker.number.int(arr.length - 1)]; const getRandomReading = { temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100), humidity: () => faker.number.int(100) / 100, barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100 }; const sleep = (ms) => { return new Promise((resolve) => { setTimeout(resolve, ms); }); }; (async () => { const producer = await kafka.producer() while(true) { const sensor = getRandom(SENSORS) const reading = getRandom(READINGS) const value = getRandomReading[reading]() const data = { reading, value } await producer.send({ topic: kafka.topic, messages: [{ key: sensor, value: JSON.stringify(data) }] }) await sleep(faker.number.int(MAX_DELAY_MS)) } })()
Une grande partie du code du fichier concerne la génération de valeurs aléatoires. Je vais souligner les parties importantes :
SENSORS
.
temperature
, humidity
ou barometric_pressure
. L'objet getRandomReading
a une fonction pour chacune de ces lectures, pour générer une valeur correspondante raisonnable.
async
avec une boucle while
infinie.
Dans la boucle while
, nous :
sensor
au hasard.reading
au hasard.value
aléatoire pour cette lecture.producer.send
pour publier ces données dans le sujet. Le sensor
sert de key
pour l'événement, tandis que la reading
et value
formeront le message d'événement. Le processus en arrière-plan dans consumer.js
est considérablement plus simple.
require('dotenv').config(); const logger = require('./src/logger.js'); const kafka = require('./src/kafka.js'); (async () => { const consumer = await kafka.consumer() await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const sensorId = message.key.toString() const messageObj = JSON.parse(message.value.toString()) const logMessage = { sensorId } logMessage[messageObj.reading] = messageObj.value logger.info(logMessage) } }) })()
Notre consumer
est déjà abonné au sujet weather-data
. Nous appelons consumer.run
, puis nous configurons un gestionnaire pour eachMessage
. Chaque fois que Kafka informe le consumer
d'un message, il enregistre le message. C'est tout ce qu'on peut en dire.
Procfile
Dans le fichier package.json
, nous devons ajouter quelques scripts
qui démarrent nos processus d'arrière-plan producteur et consommateur. Le fichier devrait maintenant inclure les éléments suivants :
... "scripts": { "start": "echo 'do nothing'", "start:consumer": "node consumer.js", "start:producer": "node producer.js" }, ...
Les plus importants sont start:consumer
et start:producer
. Mais nous gardons start
dans notre fichier (même si cela ne fait rien de significatif) car le constructeur Heroku s'attend à ce qu'il soit là.
Ensuite, nous créons un Procfile
qui indiquera à Heroku comment démarrer les différents travailleurs dont nous avons besoin pour notre application Heroku. Dans le dossier racine de notre projet, le Procfile
devrait ressembler à ceci :
consumer_worker: npm run start:consumer producer_worker: npm run start:producer
Assez simple, non ? Nous aurons un travailleur de processus en arrière-plan appelé consumer_worker
et un autre appelé producer_worker
. Vous remarquerez que nous n'avons pas de web
Worker, ce que vous verriez généralement dans Procfile
pour une application Web. Pour notre application Heroku, nous avons juste besoin des deux travailleurs en arrière-plan. Nous n'avons pas besoin web
.
Avec cela, tout notre code est défini. Nous avons engagé tout notre code dans le dépôt et nous sommes prêts à le déployer.
~/project$ git push heroku main … remote: -----> Build succeeded! … remote: -----> Compressing... remote: Done: 48.6M remote: -----> Launching... … remote: Verifying deploy... done
Après le déploiement, nous voulons nous assurer que nous adaptons correctement nos dynos. Nous n'avons pas besoin d'un banc d'essai pour un processus Web, mais nous en aurons besoin à la fois pour consumer_worker
et producer_worker
. Nous exécutons la commande suivante pour définir ces processus en fonction de nos besoins.
~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1 Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco
Maintenant, tout devrait être opérationnel. En coulisses, notre producer_worker
doit se connecter au cluster Kafka, puis commencer à publier les données des capteurs météorologiques toutes les quelques secondes. Ensuite, notre consumer_worker
doit se connecter au cluster Kafka et enregistrer tous les messages qu'il reçoit du sujet auquel il est abonné.
Pour voir ce que fait notre consumer_worker
, nous pouvons consulter nos journaux Heroku.
~/project$ heroku logs --tail … heroku[producer_worker.1]: Starting process with command `npm run start:producer` heroku[producer_worker.1]: State changed from starting to up app[producer_worker.1]: app[producer_worker.1]: > [email protected] start:producer app[producer_worker.1]: > node producer.js app[producer_worker.1]: … heroku[consumer_worker.1]: Starting process with command `npm run start:consumer` heroku[consumer_worker.1]: State changed from starting to up app[consumer_worker.1]: app[consumer_worker.1]: > [email protected] start:consumer app[consumer_worker.1]: > node consumer.js app[consumer_worker.1]: app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"} app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041} app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84} app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3} app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11} app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71} app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55} app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58} app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25} app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1} app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34} app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61} app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36} app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}
Ça marche! Nous savons que notre producteur publie périodiquement des messages sur Kafka parce que notre consommateur les reçoit et les enregistre ensuite.
Bien entendu, dans une application EDA plus vaste, chaque capteur est un producteur. Ils peuvent publier sur plusieurs sujets à des fins diverses, ou ils peuvent tous publier sur le même sujet. Et votre consommateur peut être abonné à plusieurs sujets. De plus, dans notre application de démonstration, nos consommateurs émettaient simplement beaucoup de choses sur eachMessage
; mais dans une application EDA, un consommateur peut répondre en appelant une API tierce, en envoyant une notification par SMS ou en interrogeant une base de données.
Maintenant que vous avez une compréhension de base des événements, des sujets, des producteurs et des consommateurs, et que vous savez comment travailler avec Kafka, vous pouvez commencer à concevoir et à créer vos propres applications EDA pour répondre à des cas d'utilisation métier plus complexes.
EDA est assez puissant : vous pouvez découpler vos systèmes tout en bénéficiant de fonctionnalités clés telles qu'une évolutivité facile et le traitement des données en temps réel. Pour EDA, Kafka est un outil clé, vous aidant à gérer facilement les flux de données à haut débit. L'utilisation d'Apache Kafka sur Heroku vous aide à démarrer rapidement. Puisqu'il s'agit d'un service géré, vous n'avez pas à vous soucier des éléments complexes de la gestion du cluster Kafka. Vous pouvez simplement vous concentrer sur la création de vos applications.
À partir de là, il est temps pour vous d’expérimenter et de prototyper. Identifiez les cas d'utilisation qui correspondent bien à l'EDA. Plongez, testez-le sur Heroku et créez quelque chose d'incroyable. Bon codage !