Вы когда-нибудь задумывались, как некоторые из ваших любимых приложений обрабатывают обновления в режиме реального времени? Прямые спортивные результаты, тикеры фондового рынка или даже уведомления в социальных сетях — все они используют событийно-ориентированную архитектуру (EDA) для мгновенной обработки данных. EDA похожа на разговор, в котором каждая новая информация вызывает немедленную реакцию. Это то, что делает приложение более интерактивным и отзывчивым.
В этом пошаговом руководстве мы покажем вам, как создать простое приложение, управляемое событиями, с использованием Apache Kafka на Heroku. Мы рассмотрим:
Apache Kafka — мощный инструмент для создания систем EDA. Это платформа с открытым исходным кодом, предназначенная для обработки потоков данных в реальном времени. Apache Kafka on Heroku — это надстройка Heroku, которая предоставляет Kafka как услугу. Heroku позволяет довольно легко развертывать приложения и управлять ими, и в последнее время я все чаще использую его в своих проектах. Объединение Kafka с Heroku упрощает процесс установки, если вы хотите запустить приложение, управляемое событиями.
К концу этого руководства у вас будет работающее приложение, демонстрирующее возможности EDA с Apache Kafka на Heroku. Давайте начнем!
Прежде чем мы углубимся в код, давайте быстро рассмотрим некоторые основные концепции. Как только вы это поймете, следовать дальше будет легче.
Мы создадим приложение Node.js, используя библиотеку KafkaJS . Вот краткий обзор того, как будет работать наше приложение:
Наши погодные датчики (производители) будут периодически генерировать данные, такие как температура, влажность и барометрическое давление, и отправлять эти события в Apache Kafka. Для демонстрационных целей данные будут генерироваться случайным образом.
У нас будет потребитель, слушающий темы. При получении нового события данные записываются в журнал.
Мы развернем всю настройку в Heroku и будем использовать журналы Heroku для отслеживания событий по мере их возникновения.
Прежде чем мы начнем, убедитесь, что у вас есть следующее:
Кодовая база всего этого проекта доступна в этом репозитории GitHub . Не стесняйтесь клонировать код и следовать инструкциям в этом посте.
Теперь, когда мы рассмотрели основы, давайте настроим наш кластер Kafka на Heroku и начнем сборку.
Давайте настроим все на Heroku. Это довольно быстрый и простой процесс.
~/project$ heroku login
~/project$ heroku create weather-eda
(Я назвал свое приложение Heroku weather-eda
, но вы можете выбрать для своего приложения уникальное имя.)
~/project$ heroku addons:create heroku-kafka:basic-0 Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month) The cluster should be available in a few minutes. Run `heroku kafka:wait` to wait until the cluster is ready. You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka kafka-adjacent-07560 is being created in the background. The app will restart when complete... Use heroku addons:info kafka-adjacent-07560 to check creation progress Use heroku addons:docs heroku-kafka to view documentation
Дополнительную информацию о дополнении Apache Kafka on Heroku можно найти здесь . Для нашей демонстрации я добавляю базовый уровень дополнения 0. Стоимость дополнения составляет $0,139/час. Создавая это демонстрационное приложение, я использовал надстройку менее часа, а затем отключил ее.
Heroku требуется несколько минут, чтобы развернуть Kafka и подготовить его для вас. Совсем скоро вы увидите вот что:
~/project$ heroku addons:info kafka-adjacent-07560 === kafka-adjacent-07560 Attachments: weather-eda::KAFKA Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time) Max Price: $100/month Owning app: weather-eda Plan: heroku-kafka:basic-0 Price: ~$0.139/hour State: created
После развертывания нашего кластера Kafka нам потребуется получить учетные данные и другие конфигурации. Heroku создает несколько переменных конфигурации для нашего приложения, заполняя их информацией из только что созданного кластера Kafka. Мы можем увидеть все эти параметры конфигурации, выполнив следующее:
~/project$ heroku config === weather-eda Config Vars KAFKA_CLIENT_CERT: -----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... -----END CERTIFICATE----- KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3 ... -----END RSA PRIVATE KEY----- KAFKA_PREFIX: columbia-68051. KAFKA_TRUSTED_CERT: -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4= -----END CERTIFICATE----- KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096
Как видите, у нас есть несколько переменных конфигурации. Нам понадобится файл в корневой папке нашего проекта с именем .env
со всеми этими значениями конфигурационных переменных. Для этого мы просто запускаем следующую команду:
~/project$ heroku config --shell > .env
Наш файл .env
выглядит так:
KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----" KAFKA_PREFIX="columbia-68051." KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"
Кроме того, мы обязательно добавляем .env в наш файл .gitignore. Мы не хотели бы передавать эти конфиденциальные данные в наш репозиторий.
Интерфейс командной строки Heroku не поставляется с командами, связанными с Kafka, прямо из коробки. Поскольку мы используем Kafka, нам нужно установить плагин CLI .
~/project$ heroku plugins:install heroku-kafka Installing plugin heroku-kafka... installed v2.12.0
Теперь мы можем управлять нашим кластером Kafka из CLI.
~/project$ heroku kafka:info === KAFKA_URL Plan: heroku-kafka:basic-0 Status: available Version: 2.8.2 Created: 2024-05-27T18:44:38.023+00:00 Topics: [··········] 0 / 40 topics, see heroku kafka:topics Prefix: columbia-68051. Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor) Messages: 0 messages/s Traffic: 0 bytes/s in / 0 bytes/s out Data Size: [··········] 0 bytes / 4.00 GB (0.00%) Add-on: kafka-adjacent-07560 ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40)
В качестве проверки работоспособности давайте поиграемся с нашим кластером Kafka. Начнем с создания темы.
~/project$ heroku kafka:topics:create test-topic-01 Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done Use `heroku kafka:topics:info test-topic-01` to monitor your topic. Your topic is using the prefix columbia-68051.. ~/project$ heroku kafka:topics:info test-topic-01 ▸ topic test-topic-01 is not available yet
Примерно через минуту наша тема станет доступной.
~/project$ heroku kafka:topics:info test-topic-01 === kafka-adjacent-07560 :: test-topic-01 Topic Prefix: columbia-68051. Producers: 0 messages/second (0 bytes/second) total Consumers: 0 bytes/second total Partitions: 8 partitions Replication Factor: 3 Compaction: Compaction is disabled for test-topic-01 Retention: 24 hours
Далее в этом окне терминала мы будем действовать как потребитель, слушая эту тему, следя за ней.
~/project$ heroku kafka:topics:tail test-topic-01
Отсюда терминал просто ждет любых событий, опубликованных по теме.
В отдельном окне терминала мы выступим в роли продюсера и опубликуем некоторые сообщения по теме.
~/project$ heroku kafka:topics:write test-topic-01 "hello world!"
Вернувшись в окно терминала нашего потребителя, мы видим вот что:
~/project$ heroku kafka:topics:tail test-topic-01 test-topic-01 0 0 12 hello world!
Отличный! Мы успешно создали и использовали событие для темы в нашем кластере Kafka. Мы готовы перейти к нашему приложению Node.js. Давайте уничтожим эту тестовую тему, чтобы поддерживать порядок на нашей игровой площадке.
~/project$ heroku kafka:topics:destroy test-topic-01 ▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda ▸ To proceed, type weather-eda or re-run this command with --confirm weather-eda > weather-eda Deleting topic test-topic-01... done Your topic has been marked for deletion, and will be removed from the cluster shortly ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40).
Чтобы подготовить наше приложение к использованию Kafka, нам нужно будет создать две вещи: тему и группу потребителей.
Давайте создадим тему, которую будет использовать наше приложение.
~/project$ heroku kafka:topics:create weather-data
Далее мы создадим группу потребителей, частью которой будет потребитель нашего приложения:
~/project$ heroku kafka:consumer-groups:create weather-consumers
Мы готовы создать наше приложение Node.js!
Давайте инициализируем новый проект и установим наши зависимости.
~/project$ npm init -y ~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty
В нашем проекте будут запущены два процесса:
consumer.js
, который подписан на тему и регистрирует любые публикуемые события.
producer.js
, который каждые несколько секунд будет публиковать рандомизированные данные о погоде по этой теме.
Оба этих процесса должны будут использовать KafkaJS для подключения к нашему кластеру Kafka, поэтому мы модульизируем наш код, чтобы сделать его пригодным для повторного использования.
В папке src
проекта мы создаем файл kafka.js
. Это выглядит так:
const { Kafka } = require('kafkajs'); const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' )) const TOPIC = `${process.env.KAFKA_PREFIX}weather-data` const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers` const kafka = new Kafka({ clientId: 'weather-eda-app-nodejs-client', brokers: BROKER_URLS, ssl: { rejectUnauthorized: false, ca: process.env.KAFKA_TRUSTED_CERT, key: process.env.KAFKA_CLIENT_CERT_KEY, cert: process.env.KAFKA_CLIENT_CERT, }, }) const producer = async () => { const p = kafka.producer() await p.connect() return p; } const consumer = async () => { const c = kafka.consumer({ groupId: CONSUMER_GROUP, sessionTimeout: 30000 }) await c.connect() await c.subscribe({ topics: [TOPIC] }); return c; } module.exports = { producer, consumer, topic: TOPIC, groupId: CONSUMER_GROUP };
В этом файле мы начинаем с создания нового клиента Kafka. Для этого требуются URL-адреса брокеров Kafka, которые мы можем проанализировать из переменной KAFKA_URL
в нашем файле .env
(который изначально был получен при вызове конфигурации Heroku). Чтобы аутентифицировать попытку подключения, нам необходимо предоставить KAFKA_TRUSTED_CERT
, KAFKA_CLIENT_CERT_KEY
и KAFKA_CLIENT_CERT
.
Затем из нашего клиента Kafka мы создаем producer
и consumer
, обязательно подписывая наших потребителей на тему weather-data
.
Обратите внимание, что в kafka.js
мы добавляем KAFKA_PREFIX
к названию нашей темы и группы потребителей. Мы используем план Basic 0 для Apache Kafka на Heroku, который представляет собой многопользовательский план Kafka. Это означает, что мы работаем с KAFKA_PREFIX
. Несмотря на то, что мы назвали нашу тему weather-data
и нашу группу потребителей weather-consumers
, к их фактическим именам в нашем мультитенантном кластере Kafka должен быть добавлен KAFKA_PREFIX
(чтобы гарантировать их уникальность).
Итак, технически для нашей демонстрации фактическое название темы — columbia-68051.weather-data
, а не weather-data
. (Аналогично и для имени группы потребителей.)
Теперь давайте создадим наш фоновый процесс, который будет действовать как производитель датчиков погоды. В корневой папке нашего проекта есть файл producer.js
. Это выглядит так:
require('dotenv').config(); const kafka = require('./src/kafka.js'); const { faker } = require('@faker-js/faker'); const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05']; const MAX_DELAY_MS = 20000; const READINGS = ['temperature','humidity','barometric_pressure']; const MAX_TEMP = 130; const MIN_PRESSURE = 2910; const PRESSURE_RANGE = 160; const getRandom = (arr) => arr[faker.number.int(arr.length - 1)]; const getRandomReading = { temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100), humidity: () => faker.number.int(100) / 100, barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100 }; const sleep = (ms) => { return new Promise((resolve) => { setTimeout(resolve, ms); }); }; (async () => { const producer = await kafka.producer() while(true) { const sensor = getRandom(SENSORS) const reading = getRandom(READINGS) const value = getRandomReading[reading]() const data = { reading, value } await producer.send({ topic: kafka.topic, messages: [{ key: sensor, value: JSON.stringify(data) }] }) await sleep(faker.number.int(MAX_DELAY_MS)) } })()
Большая часть кода в файле связана с генерацией случайных значений. Выделю важные части:
SENSORS
.
temperature
, humidity
или barometric_pressure
. Объект getRandomReading
имеет функцию для каждого из этих показаний, позволяющую генерировать соответствующее соответствующее значение.
async
функция с бесконечным циклом while
.
В цикле while
мы:
sensor
наугад.reading
наугад.value
для этого чтения.producer.send
, чтобы опубликовать эти данные в теме. sensor
служит key
к событию, а reading
и value
формируют сообщение о событии. Фоновый процесс в consumer.js
значительно проще.
require('dotenv').config(); const logger = require('./src/logger.js'); const kafka = require('./src/kafka.js'); (async () => { const consumer = await kafka.consumer() await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const sensorId = message.key.toString() const messageObj = JSON.parse(message.value.toString()) const logMessage = { sensorId } logMessage[messageObj.reading] = messageObj.value logger.info(logMessage) } }) })()
Наш consumer
уже подписан на тему weather-data
. Мы вызываем consumer.run
, а затем настраиваем обработчик для eachMessage
. Всякий раз, когда Kafka уведомляет consumer
о сообщении, оно регистрируется. Вот и все.
Procfile
В файл package.json
нам нужно добавить несколько scripts
, которые запускают фоновые процессы производителя и потребителя. Теперь файл должен включать следующее:
... "scripts": { "start": "echo 'do nothing'", "start:consumer": "node consumer.js", "start:producer": "node producer.js" }, ...
Важными из них являются start:consumer
и start:producer
. Но мы продолжаем start
над нашим файлом (хотя это не делает ничего значимого), потому что сборщик Heroku ожидает, что он будет там.
Затем мы создаем Procfile
, который расскажет Heroku, как запускать различные рабочие процессы, необходимые для нашего приложения Heroku. В корневой папке нашего проекта Procfile
должен выглядеть так:
consumer_worker: npm run start:consumer producer_worker: npm run start:producer
Довольно просто, правда? У нас будет рабочий фоновый процесс с именем consumer_worker
и еще один с именем producer_worker
. Вы заметите, что у нас нет web
работника, который вы обычно видите в Procfile
для веб-приложения. Для нашего приложения Heroku нам нужны только два фоновых работника. Нам не нужен web
.
На этом весь наш код установлен. Мы зафиксировали весь наш код в репозитории и готовы к развертыванию.
~/project$ git push heroku main … remote: -----> Build succeeded! … remote: -----> Compressing... remote: Done: 48.6M remote: -----> Launching... … remote: Verifying deploy... done
После развертывания мы хотим убедиться, что мы правильно масштабируем наши динамометры. Нам не нужен динамометрический стенд для веб-процесса, но он нам понадобится как для consumer_worker
, так и producer_worker
. Мы запускаем следующую команду, чтобы настроить эти процессы в соответствии с нашими потребностями.
~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1 Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco
Теперь все должно работать. За кулисами наш producer_worker
должен подключиться к кластеру Kafka, а затем начать публиковать данные датчиков погоды каждые несколько секунд. Затем наш consumer_worker
должен подключиться к кластеру Kafka и регистрировать все сообщения, которые он получает из темы, на которую он подписан.
Чтобы увидеть, что делает наш consumer_worker
, мы можем посмотреть журналы Heroku.
~/project$ heroku logs --tail … heroku[producer_worker.1]: Starting process with command `npm run start:producer` heroku[producer_worker.1]: State changed from starting to up app[producer_worker.1]: app[producer_worker.1]: > [email protected] start:producer app[producer_worker.1]: > node producer.js app[producer_worker.1]: … heroku[consumer_worker.1]: Starting process with command `npm run start:consumer` heroku[consumer_worker.1]: State changed from starting to up app[consumer_worker.1]: app[consumer_worker.1]: > [email protected] start:consumer app[consumer_worker.1]: > node consumer.js app[consumer_worker.1]: app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"} app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041} app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84} app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3} app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11} app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71} app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55} app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58} app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25} app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1} app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34} app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61} app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36} app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}
Оно работает! Мы знаем, что наш производитель периодически публикует сообщения в Kafka, потому что наш потребитель их получает, а затем регистрирует.
Конечно, в более крупном приложении EDA каждый датчик является производителем. Они могут публиковать публикации по нескольким темам для разных целей или все они могут публиковаться по одной и той же теме. И ваш потребитель может быть подписан на несколько тем. Кроме того, в нашем демонстрационном приложении наши потребители просто отправляли много сообщений в eachMessage
; но в приложении EDA потребитель может ответить, вызвав сторонний API, отправив SMS-уведомление или запросив базу данных.
Теперь, когда у вас есть базовое представление о событиях, темах, производителях и потребителях и вы знаете, как работать с Kafka, вы можете начать проектировать и создавать свои собственные приложения EDA для удовлетворения более сложных случаев использования в бизнесе.
EDA довольно мощный инструмент — вы можете отделить свои системы, наслаждаясь такими ключевыми функциями, как простая масштабируемость и обработка данных в реальном времени. Для EDA Kafka — ключевой инструмент, помогающий легко обрабатывать потоки данных с высокой пропускной способностью. Использование Apache Kafka в Heroku поможет вам быстро приступить к работе. Поскольку это управляемая служба, вам не нужно беспокоиться о сложных частях управления кластером Kafka. Вы можете просто сосредоточиться на создании своих приложений.
Отсюда пришло время экспериментировать и создавать прототипы. Определите, какие варианты использования лучше всего подходят для EDA. Погрузитесь, протестируйте его на Heroku и создайте что-то потрясающее. Приятного кодирования!