¿Alguna vez te has preguntado cómo manejan algunas de tus aplicaciones favoritas las actualizaciones en tiempo real? Resultados deportivos en vivo, tickers del mercado de valores o incluso notificaciones de redes sociales: todos ellos dependen de la arquitectura basada en eventos (EDA) para procesar datos al instante. EDA es como tener una conversación en la que cada nueva información desencadena una respuesta inmediata. Es lo que hace que una aplicación sea más interactiva y receptiva.
En este tutorial, lo guiaremos en la creación de una aplicación sencilla basada en eventos utilizando Apache Kafka en Heroku. Cubriremos:
Apache Kafka es una poderosa herramienta para construir sistemas EDA. Es una plataforma de código abierto diseñada para manejar fuentes de datos en tiempo real. Apache Kafka en Heroku es un complemento de Heroku que proporciona Kafka como servicio. Heroku hace que sea bastante fácil implementar y administrar aplicaciones, y últimamente lo he estado usando más en mis proyectos. La combinación de Kafka con Heroku simplifica el proceso de configuración cuando desea ejecutar una aplicación basada en eventos.
Al final de esta guía, tendrá una aplicación en ejecución que demuestra el poder de EDA con Apache Kafka en Heroku. ¡Empecemos!
Antes de profundizar en el código, repasemos rápidamente algunos conceptos básicos. Una vez que los comprenda, será más fácil seguirlos.
Construiremos una aplicación Node.js usando la biblioteca KafkaJS . Aquí hay una descripción general rápida de cómo funcionará nuestra aplicación:
Nuestros sensores meteorológicos (los productores) generarán datos periódicamente, como temperatura, humedad y presión barométrica, y enviarán estos eventos a Apache Kafka. Para fines de demostración, los datos se generarán aleatoriamente.
Tendremos un consumidor escuchando los temas. Cuando se recibe un nuevo evento, escribirá los datos en un registro.
Implementaremos toda la configuración en Heroku y usaremos los registros de Heroku para monitorear los eventos a medida que ocurren.
Antes de comenzar, asegúrese de tener lo siguiente:
El código base para todo este proyecto está disponible en este repositorio de GitHub . Siéntete libre de clonar el código y seguir esta publicación.
Ahora que hemos cubierto los conceptos básicos, configuremos nuestro clúster Kafka en Heroku y comencemos a construir.
Configuremos todo en Heroku. Es un proceso bastante rápido y sencillo.
~/project$ heroku login
~/project$ heroku create weather-eda
(Le puse el nombre a mi aplicación Heroku weather-eda
, pero puedes elegir un nombre único para tu aplicación).
~/project$ heroku addons:create heroku-kafka:basic-0 Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month) The cluster should be available in a few minutes. Run `heroku kafka:wait` to wait until the cluster is ready. You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka kafka-adjacent-07560 is being created in the background. The app will restart when complete... Use heroku addons:info kafka-adjacent-07560 to check creation progress Use heroku addons:docs heroku-kafka to view documentation
Puede encontrar más información sobre el complemento Apache Kafka en Heroku aquí . Para nuestra demostración, agregaré el nivel Básico 0 del complemento. El costo del complemento es $0.139/hora. Mientras creaba esta aplicación de demostración, usé el complemento durante menos de una hora y luego lo dejé de usar.
Heroku tarda unos minutos en hacer girar a Kafka y prepararlo para ti. Muy pronto, esto es lo que verás:
~/project$ heroku addons:info kafka-adjacent-07560 === kafka-adjacent-07560 Attachments: weather-eda::KAFKA Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time) Max Price: $100/month Owning app: weather-eda Plan: heroku-kafka:basic-0 Price: ~$0.139/hour State: created
Con nuestro clúster Kafka en funcionamiento, necesitaremos obtener credenciales y otras configuraciones. Heroku crea varias variables de configuración para nuestra aplicación y las completa con información del clúster Kafka que se acaba de crear. Podemos ver todas estas variables de configuración ejecutando lo siguiente:
~/project$ heroku config === weather-eda Config Vars KAFKA_CLIENT_CERT: -----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... -----END CERTIFICATE----- KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3 ... -----END RSA PRIVATE KEY----- KAFKA_PREFIX: columbia-68051. KAFKA_TRUSTED_CERT: -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4= -----END CERTIFICATE----- KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096
Como puede ver, tenemos varias variables de configuración. Querremos un archivo en la carpeta raíz de nuestro proyecto llamado .env
con todos estos valores de configuración var. Para ello simplemente ejecutamos el siguiente comando:
~/project$ heroku config --shell > .env
Nuestro archivo .env
se ve así:
KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----" KAFKA_PREFIX="columbia-68051." KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"
Además, nos aseguramos de agregar .env a nuestro archivo .gitignore. No queremos enviar estos datos confidenciales a nuestro repositorio.
La CLI de Heroku no viene con comandos relacionados con Kafka listos para usar. Como usamos Kafka, necesitaremos instalar el complemento CLI .
~/project$ heroku plugins:install heroku-kafka Installing plugin heroku-kafka... installed v2.12.0
Ahora podemos administrar nuestro clúster Kafka desde la CLI.
~/project$ heroku kafka:info === KAFKA_URL Plan: heroku-kafka:basic-0 Status: available Version: 2.8.2 Created: 2024-05-27T18:44:38.023+00:00 Topics: [··········] 0 / 40 topics, see heroku kafka:topics Prefix: columbia-68051. Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor) Messages: 0 messages/s Traffic: 0 bytes/s in / 0 bytes/s out Data Size: [··········] 0 bytes / 4.00 GB (0.00%) Add-on: kafka-adjacent-07560 ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40)
Solo como control de cordura, juguemos con nuestro clúster Kafka. Empezamos creando un tema.
~/project$ heroku kafka:topics:create test-topic-01 Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done Use `heroku kafka:topics:info test-topic-01` to monitor your topic. Your topic is using the prefix columbia-68051.. ~/project$ heroku kafka:topics:info test-topic-01 ▸ topic test-topic-01 is not available yet
En aproximadamente un minuto, nuestro tema estará disponible.
~/project$ heroku kafka:topics:info test-topic-01 === kafka-adjacent-07560 :: test-topic-01 Topic Prefix: columbia-68051. Producers: 0 messages/second (0 bytes/second) total Consumers: 0 bytes/second total Partitions: 8 partitions Replication Factor: 3 Compaction: Compaction is disabled for test-topic-01 Retention: 24 hours
A continuación, en esta ventana de terminal, actuaremos como consumidores y escucharemos este tema siguiéndolo.
~/project$ heroku kafka:topics:tail test-topic-01
Desde aquí, el terminal simplemente espera cualquier evento publicado sobre el tema.
En una ventana de terminal separada, actuaremos como productores y publicaremos algunos mensajes sobre el tema.
~/project$ heroku kafka:topics:write test-topic-01 "hello world!"
De vuelta en la ventana de terminal de nuestro consumidor, esto es lo que vemos:
~/project$ heroku kafka:topics:tail test-topic-01 test-topic-01 0 0 12 hello world!
¡Excelente! Hemos producido y consumido con éxito un evento sobre un tema en nuestro clúster Kafka. Estamos listos para pasar a nuestra aplicación Node.js. Destruyamos este tema de prueba para mantener ordenado nuestro patio de recreo.
~/project$ heroku kafka:topics:destroy test-topic-01 ▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda ▸ To proceed, type weather-eda or re-run this command with --confirm weather-eda > weather-eda Deleting topic test-topic-01... done Your topic has been marked for deletion, and will be removed from the cluster shortly ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40).
Para prepararnos para que nuestra aplicación utilice Kafka, necesitaremos crear dos cosas: un tema y un grupo de consumidores.
Creemos el tema que utilizará nuestra aplicación.
~/project$ heroku kafka:topics:create weather-data
A continuación, crearemos el grupo de consumidores del que formará parte el consumidor de nuestra aplicación:
~/project$ heroku kafka:consumer-groups:create weather-consumers
¡Estamos listos para construir nuestra aplicación Node.js!
Inicialicemos un nuevo proyecto e instalemos nuestras dependencias.
~/project$ npm init -y ~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty
Nuestro proyecto tendrá dos procesos en ejecución:
consumer.js
, que está suscrito al tema y registra cualquier evento que se publique.
producer.js
, que publicará algunos datos meteorológicos aleatorios sobre el tema cada pocos segundos.
Ambos procesos necesitarán usar KafkaJS para conectarse a nuestro clúster Kafka, por lo que modularizaremos nuestro código para hacerlo reutilizable.
En la carpeta src
del proyecto, creamos un archivo llamado kafka.js
. Se parece a esto:
const { Kafka } = require('kafkajs'); const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' )) const TOPIC = `${process.env.KAFKA_PREFIX}weather-data` const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers` const kafka = new Kafka({ clientId: 'weather-eda-app-nodejs-client', brokers: BROKER_URLS, ssl: { rejectUnauthorized: false, ca: process.env.KAFKA_TRUSTED_CERT, key: process.env.KAFKA_CLIENT_CERT_KEY, cert: process.env.KAFKA_CLIENT_CERT, }, }) const producer = async () => { const p = kafka.producer() await p.connect() return p; } const consumer = async () => { const c = kafka.consumer({ groupId: CONSUMER_GROUP, sessionTimeout: 30000 }) await c.connect() await c.subscribe({ topics: [TOPIC] }); return c; } module.exports = { producer, consumer, topic: TOPIC, groupId: CONSUMER_GROUP };
En este archivo, comenzamos creando un nuevo cliente Kafka. Esto requiere URL para los corredores de Kafka, que podemos analizar a partir de la variable KAFKA_URL
en nuestro archivo .env
(que originalmente surgió al llamar a heroku config). Para autenticar el intento de conexión, debemos proporcionar KAFKA_TRUSTED_CERT
, KAFKA_CLIENT_CERT_KEY
y KAFKA_CLIENT_CERT
.
Luego, desde nuestro cliente Kafka, creamos un producer
y un consumer
, asegurándonos de suscribir a nuestros consumidores al tema weather-data
.
Observe que en kafka.js
anteponemos KAFKA_PREFIX
a nuestro tema y nombre de grupo de consumidores. Estamos usando el plan Básico 0 para Apache Kafka en Heroku, que es un plan Kafka multiinquilino. Esto significa que trabajamos con un KAFKA_PREFIX
. Aunque nombramos a nuestro tema weather-data
y a nuestro grupo de consumidores weather-consumers
, sus nombres reales en nuestro clúster Kafka multiinquilino deben tener KAFKA_PREFIX
antepuesto (para garantizar que sean únicos).
Entonces, técnicamente, para nuestra demostración, el nombre real del tema es columbia-68051.weather-data
, no weather-data
. (Lo mismo ocurre con el nombre del grupo de consumidores).
Ahora, creemos nuestro proceso en segundo plano que actuará como nuestros productores de sensores meteorológicos. En la carpeta raíz de nuestro proyecto, tenemos un archivo llamado producer.js
. Se parece a esto:
require('dotenv').config(); const kafka = require('./src/kafka.js'); const { faker } = require('@faker-js/faker'); const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05']; const MAX_DELAY_MS = 20000; const READINGS = ['temperature','humidity','barometric_pressure']; const MAX_TEMP = 130; const MIN_PRESSURE = 2910; const PRESSURE_RANGE = 160; const getRandom = (arr) => arr[faker.number.int(arr.length - 1)]; const getRandomReading = { temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100), humidity: () => faker.number.int(100) / 100, barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100 }; const sleep = (ms) => { return new Promise((resolve) => { setTimeout(resolve, ms); }); }; (async () => { const producer = await kafka.producer() while(true) { const sensor = getRandom(SENSORS) const reading = getRandom(READINGS) const value = getRandomReading[reading]() const data = { reading, value } await producer.send({ topic: kafka.topic, messages: [{ key: sensor, value: JSON.stringify(data) }] }) await sleep(faker.number.int(MAX_DELAY_MS)) } })()
Gran parte del código del archivo tiene que ver con la generación de valores aleatorios. Destacaré las partes importantes:
SENSORS
.
temperature
, humidity
o barometric_pressure
. El objeto getRandomReading
tiene una función para cada una de estas lecturas, para generar un valor correspondiente razonable.
async
con un bucle while
infinito.
Dentro del ciclo while
, nosotros:
sensor
al azar.reading
al azar.value
aleatorio para esa lectura.producer.send
para publicar estos datos en el tema. El sensor
sirve como key
para el evento, mientras que la reading
y value
formarán el mensaje del evento. El proceso en segundo plano en consumer.js
es considerablemente más sencillo.
require('dotenv').config(); const logger = require('./src/logger.js'); const kafka = require('./src/kafka.js'); (async () => { const consumer = await kafka.consumer() await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const sensorId = message.key.toString() const messageObj = JSON.parse(message.value.toString()) const logMessage = { sensorId } logMessage[messageObj.reading] = messageObj.value logger.info(logMessage) } }) })()
Nuestro consumer
ya está suscrito al tema weather-data
. Llamamos consumer.run
y luego configuramos un controlador para eachMessage
. Cada vez que Kafka notifica al consumer
sobre un mensaje, lo registra. Eso es todo al respecto.
Procfile
En el archivo package.json
, necesitamos agregar algunos scripts
que inician nuestros procesos en segundo plano de productor y consumidor. El archivo ahora debería incluir lo siguiente:
... "scripts": { "start": "echo 'do nothing'", "start:consumer": "node consumer.js", "start:producer": "node producer.js" }, ...
Los importantes son start:consumer
y start:producer
. Pero mantenemos start
en nuestro archivo (aunque no hace nada significativo) porque el constructor Heroku espera que esté allí.
A continuación, creamos un Procfile
que le indicará a Heroku cómo iniciar los distintos trabajadores que necesitamos para nuestra aplicación Heroku. En la carpeta raíz de nuestro proyecto, el Procfile
debería verse así:
consumer_worker: npm run start:consumer producer_worker: npm run start:producer
Bastante simple, ¿verdad? Tendremos un trabajador de proceso en segundo plano llamado consumer_worker
y otro llamado producer_worker
. Notarás que no tenemos un trabajador web
, que es lo que normalmente verías en Procfile
para una aplicación web. Para nuestra aplicación Heroku, solo necesitamos los dos trabajadores en segundo plano. No necesitamos web
.
Con eso, todo nuestro código está configurado. Hemos enviado todo nuestro código al repositorio y estamos listos para implementarlo.
~/project$ git push heroku main … remote: -----> Build succeeded! … remote: -----> Compressing... remote: Done: 48.6M remote: -----> Launching... … remote: Verifying deploy... done
Después de haberlo implementado, queremos asegurarnos de escalar nuestros dinamómetros correctamente. No necesitamos un banco de pruebas para un proceso web, pero necesitaremos uno tanto para consumer_worker
como producer_worker
. Ejecutamos el siguiente comando para configurar estos procesos según nuestras necesidades.
~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1 Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco
Ahora todo debería estar en funcionamiento. Detrás de escena, nuestro producer_worker
debería conectarse al clúster de Kafka y luego comenzar a publicar datos de los sensores meteorológicos cada pocos segundos. Luego, nuestro consumer_worker
debe conectarse al clúster de Kafka y registrar cualquier mensaje que reciba del tema al que está suscrito.
Para ver qué está haciendo nuestro consumer_worker
, podemos consultar nuestros registros de Heroku.
~/project$ heroku logs --tail … heroku[producer_worker.1]: Starting process with command `npm run start:producer` heroku[producer_worker.1]: State changed from starting to up app[producer_worker.1]: app[producer_worker.1]: > [email protected] start:producer app[producer_worker.1]: > node producer.js app[producer_worker.1]: … heroku[consumer_worker.1]: Starting process with command `npm run start:consumer` heroku[consumer_worker.1]: State changed from starting to up app[consumer_worker.1]: app[consumer_worker.1]: > [email protected] start:consumer app[consumer_worker.1]: > node consumer.js app[consumer_worker.1]: app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"} app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041} app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84} app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3} app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11} app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71} app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55} app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58} app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25} app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1} app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34} app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61} app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36} app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}
¡Funciona! Sabemos que nuestro productor publica mensajes periódicamente en Kafka porque nuestro consumidor los recibe y luego los registra.
Por supuesto, en una aplicación EDA más grande, cada sensor es un productor. Es posible que publiquen sobre varios temas para diversos fines o que todos publiquen sobre el mismo tema. Y su consumidor puede estar suscrito a múltiples temas. Además, en nuestra aplicación de demostración, nuestros consumidores simplemente emitieron mucho en eachMessage
; pero en una aplicación EDA, un consumidor podría responder llamando a una API de terceros, enviando una notificación por SMS o consultando una base de datos.
Ahora que tiene un conocimiento básico de eventos, temas, productores y consumidores, y sabe cómo trabajar con Kafka, puede comenzar a diseñar y crear sus propias aplicaciones EDA para satisfacer casos de uso empresarial más complejos.
EDA es bastante poderoso: puede desacoplar sus sistemas mientras disfruta de funciones clave como una fácil escalabilidad y procesamiento de datos en tiempo real. Para EDA, Kafka es una herramienta clave que le ayuda a manejar flujos de datos de alto rendimiento con facilidad. Usar Apache Kafka en Heroku le ayuda a empezar rápidamente. Dado que es un servicio administrado, no necesita preocuparse por las partes complejas de la administración del clúster Kafka. Puedes concentrarte simplemente en crear tus aplicaciones.
A partir de aquí, es hora de que experimentes y crees prototipos. Identifique qué casos de uso encajan bien con EDA. Sumérgete, pruébalo en Heroku y construye algo increíble. ¡Feliz codificación!