您是否想知道您最喜欢的一些应用程序如何处理实时更新?实时体育比分、股票行情甚至社交媒体通知 — 它们都依赖事件驱动架构 (EDA) 来即时处理数据。EDA 就像进行对话一样,每一条新信息都会触发即时响应。它使应用程序更具交互性和响应性。
在本演练中,我们将指导您在 Heroku 上使用 Apache Kafka 构建一个简单的事件驱动应用程序。我们将介绍:
Apache Kafka是构建 EDA 系统的强大工具。它是一个专为处理实时数据馈送而设计的开源平台。Heroku上的 Apache Kafka是 Heroku 的一个附加组件,它提供 Kafka 即服务。Heroku 使应用程序的部署和管理变得非常容易,我最近在我的项目中更多地使用它。当您想要运行事件驱动的应用程序时,将 Kafka 与 Heroku 结合起来可以简化设置过程。
在本指南结束时,您将拥有一个正在运行的应用程序,该应用程序展示了 Heroku 上 Apache Kafka 的 EDA 功能。让我们开始吧!
在深入研究代码之前,让我们快速回顾一些核心概念。一旦理解了这些概念,接下来的内容就会更容易。
我们将使用KafkaJS库构建一个 Node.js 应用程序。以下是我们应用程序工作原理的简要概述:
我们的天气传感器(生产者)将定期生成数据(例如温度、湿度和气压),并将这些事件发送到 Apache Kafka。出于演示目的,数据将随机生成。
我们将让消费者监听主题。当收到新事件时,它会将数据写入日志。
我们将把整个设置部署到 Heroku,并使用 Heroku 日志来监控发生的事件。
在开始之前,请确保您已准备好以下内容:
整个项目的代码库可以在这个GitHub 存储库中找到。欢迎随意克隆代码并关注本篇文章。
现在我们已经介绍了基础知识,让我们在 Heroku 上设置我们的 Kafka 集群并开始构建。
让我们在 Heroku 上完成所有设置。这是一个非常快速和简单的过程。
~/project$ heroku login
~/project$ heroku create weather-eda
(我将我的 Heroku 应用程序命名为weather-eda
,但您可以为您的应用程序选择一个唯一的名称。)
~/project$ heroku addons:create heroku-kafka:basic-0 Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month) The cluster should be available in a few minutes. Run `heroku kafka:wait` to wait until the cluster is ready. You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka kafka-adjacent-07560 is being created in the background. The app will restart when complete... Use heroku addons:info kafka-adjacent-07560 to check creation progress Use heroku addons:docs heroku-kafka to view documentation
您可以在此处找到有关 Heroku 上的 Apache Kafka 插件的更多信息。对于我们的演示,我添加了插件的 Basic 0 层。插件的费用为每小时 0.139 美元。在构建此演示应用程序的过程中,我使用该插件的时间不到一小时,然后我就将其关闭了。
Heroku 需要几分钟才能启动 Kafka 并为您做好准备。很快,您将看到以下内容:
~/project$ heroku addons:info kafka-adjacent-07560 === kafka-adjacent-07560 Attachments: weather-eda::KAFKA Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time) Max Price: $100/month Owning app: weather-eda Plan: heroku-kafka:basic-0 Price: ~$0.139/hour State: created
随着 Kafka 集群的启动,我们需要获取凭证和其他配置。Heroku 为我们的应用程序创建了几个配置变量,并使用刚刚创建的 Kafka 集群中的信息填充它们。我们可以通过运行以下命令查看所有这些配置变量:
~/project$ heroku config === weather-eda Config Vars KAFKA_CLIENT_CERT: -----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... -----END CERTIFICATE----- KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3 ... -----END RSA PRIVATE KEY----- KAFKA_PREFIX: columbia-68051. KAFKA_TRUSTED_CERT: -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4= -----END CERTIFICATE----- KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096
如您所见,我们有几个配置变量。我们需要在项目根文件夹中创建一个名为.env
的文件,其中包含所有这些配置变量值。为此,我们只需运行以下命令:
~/project$ heroku config --shell > .env
我们的.env
文件如下所示:
KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----" KAFKA_PREFIX="columbia-68051." KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"
另外,我们确保将 .env 添加到我们的 .gitignore 文件中。我们不想将这些敏感数据提交到我们的存储库。
Heroku CLI 不附带与 Kafka 相关的现成命令。由于我们使用 Kafka,因此我们需要安装 CLI 插件。
~/project$ heroku plugins:install heroku-kafka Installing plugin heroku-kafka... installed v2.12.0
现在,我们可以从 CLI 管理我们的 Kafka 集群。
~/project$ heroku kafka:info === KAFKA_URL Plan: heroku-kafka:basic-0 Status: available Version: 2.8.2 Created: 2024-05-27T18:44:38.023+00:00 Topics: [··········] 0 / 40 topics, see heroku kafka:topics Prefix: columbia-68051. Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor) Messages: 0 messages/s Traffic: 0 bytes/s in / 0 bytes/s out Data Size: [··········] 0 bytes / 4.00 GB (0.00%) Add-on: kafka-adjacent-07560 ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40)
为了进行健全性检查,让我们来试用一下 Kafka 集群。首先创建一个主题。
~/project$ heroku kafka:topics:create test-topic-01 Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done Use `heroku kafka:topics:info test-topic-01` to monitor your topic. Your topic is using the prefix columbia-68051.. ~/project$ heroku kafka:topics:info test-topic-01 ▸ topic test-topic-01 is not available yet
大约一分钟后,我们的主题就可用了。
~/project$ heroku kafka:topics:info test-topic-01 === kafka-adjacent-07560 :: test-topic-01 Topic Prefix: columbia-68051. Producers: 0 messages/second (0 bytes/second) total Consumers: 0 bytes/second total Partitions: 8 partitions Replication Factor: 3 Compaction: Compaction is disabled for test-topic-01 Retention: 24 hours
接下来,在这个终端窗口中,我们将充当消费者,通过跟踪来收听这个主题。
~/project$ heroku kafka:topics:tail test-topic-01
从这里开始,终端只需等待有关该主题的任何事件发布。
在单独的终端窗口中,我们将充当生产者,并发布有关该主题的一些消息。
~/project$ heroku kafka:topics:write test-topic-01 "hello world!"
回到我们的消费者终端窗口,我们看到的是:
~/project$ heroku kafka:topics:tail test-topic-01 test-topic-01 0 0 12 hello world!
太棒了!我们已成功生成事件并将其消费到 Kafka 集群中的主题。我们已准备好继续使用 Node.js 应用程序。让我们销毁这个测试主题以保持我们的操场整洁。
~/project$ heroku kafka:topics:destroy test-topic-01 ▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda ▸ To proceed, type weather-eda or re-run this command with --confirm weather-eda > weather-eda Deleting topic test-topic-01... done Your topic has been marked for deletion, and will be removed from the cluster shortly ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40).
为了准备在我们的应用程序中使用 Kafka,我们需要创建两个东西:一个主题和一个消费者组。
让我们创建我们的应用程序将使用的主题。
~/project$ heroku kafka:topics:create weather-data
接下来,我们将创建应用程序消费者所属的消费者组:
~/project$ heroku kafka:consumer-groups:create weather-consumers
我们已准备好构建我们的 Node.js 应用程序!
让我们初始化一个新项目并安装我们的依赖项。
~/project$ npm init -y ~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty
我们的项目将运行两个进程:
consumer.js
,订阅该主题并记录已发布的任何事件。
producer.js
,它将每隔几秒发布一些关于该主题的随机天气数据。
这两个过程都需要使用 KafkaJS 连接到我们的 Kafka 集群,因此我们将模块化我们的代码以使其可重复使用。
在项目src
文件夹中,我们创建一个名为kafka.js
的文件。它看起来像这样:
const { Kafka } = require('kafkajs'); const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' )) const TOPIC = `${process.env.KAFKA_PREFIX}weather-data` const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers` const kafka = new Kafka({ clientId: 'weather-eda-app-nodejs-client', brokers: BROKER_URLS, ssl: { rejectUnauthorized: false, ca: process.env.KAFKA_TRUSTED_CERT, key: process.env.KAFKA_CLIENT_CERT_KEY, cert: process.env.KAFKA_CLIENT_CERT, }, }) const producer = async () => { const p = kafka.producer() await p.connect() return p; } const consumer = async () => { const c = kafka.consumer({ groupId: CONSUMER_GROUP, sessionTimeout: 30000 }) await c.connect() await c.subscribe({ topics: [TOPIC] }); return c; } module.exports = { producer, consumer, topic: TOPIC, groupId: CONSUMER_GROUP };
在此文件中,我们首先创建一个新的 Kafka 客户端。这需要 Kafka 代理的 URL,我们可以从.env
文件中的KAFKA_URL
变量(最初来自调用 heroku 配置)中解析这些 URL。要验证连接尝试,我们需要提供KAFKA_TRUSTED_CERT
、 KAFKA_CLIENT_CERT_KEY
和KAFKA_CLIENT_CERT
。
然后,从我们的 Kafka 客户端,我们创建一个producer
和一个consumer
,确保我们的消费者订阅weather-data
主题。
请注意,在kafka.js
中,我们将KAFKA_PREFIX
添加到主题和消费者组名称前面。我们在 Heroku 上使用 Apache Kafka 的 Basic 0 计划,这是一个多租户 Kafka 计划。这意味着我们使用KAFKA_PREFIX
。尽管我们将主题命名为weather-data
,将消费者组命名为weather-consumers
,但它们在多租户 Kafka 集群中的实际名称必须以KAFKA_PREFIX
为前缀(以确保它们是唯一的)。
因此,从技术上讲,对于我们的演示,实际的主题名称是columbia-68051.weather-data
,而不是weather-data
。 (消费者组名称也是如此。)
现在,让我们创建后台进程,它将充当我们的天气传感器生产者。在我们的项目根文件夹中,我们有一个名为producer.js
的文件。它看起来像这样:
require('dotenv').config(); const kafka = require('./src/kafka.js'); const { faker } = require('@faker-js/faker'); const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05']; const MAX_DELAY_MS = 20000; const READINGS = ['temperature','humidity','barometric_pressure']; const MAX_TEMP = 130; const MIN_PRESSURE = 2910; const PRESSURE_RANGE = 160; const getRandom = (arr) => arr[faker.number.int(arr.length - 1)]; const getRandomReading = { temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100), humidity: () => faker.number.int(100) / 100, barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100 }; const sleep = (ms) => { return new Promise((resolve) => { setTimeout(resolve, ms); }); }; (async () => { const producer = await kafka.producer() while(true) { const sensor = getRandom(SENSORS) const reading = getRandom(READINGS) const value = getRandomReading[reading]() const data = { reading, value } await producer.send({ topic: kafka.topic, messages: [{ key: sensor, value: JSON.stringify(data) }] }) await sleep(faker.number.int(MAX_DELAY_MS)) } })()
文件中的许多代码与生成随机值有关。我将重点介绍重要部分:
SENSORS
中找到。
temperature
、 humidity
或barometric_pressure
。 getRandomReading
对象针对每个读数都有一个函数,用于生成合理的对应值。
while
循环的async
函数运行。
在while
循环中,我们:
sensor
。reading
。value
。producer.send
将这些数据发布到主题。 sensor
作为事件的key
,而reading
和value
将构成事件消息。consumer.js
中的后台进程相当简单。
require('dotenv').config(); const logger = require('./src/logger.js'); const kafka = require('./src/kafka.js'); (async () => { const consumer = await kafka.consumer() await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const sensorId = message.key.toString() const messageObj = JSON.parse(message.value.toString()) const logMessage = { sensorId } logMessage[messageObj.reading] = messageObj.value logger.info(logMessage) } }) })()
我们的consumer
已经订阅了weather-data
主题。我们调用consumer.run
,然后为eachMessage
设置一个处理程序。每当 Kafka 通知consumer
有消息时,它都会记录该消息。这就是全部内容。
Procfile
在package.json
文件中,我们需要添加一些scripts
来启动生产者和消费者的后台进程。该文件现在应包含以下内容:
... "scripts": { "start": "echo 'do nothing'", "start:consumer": "node consumer.js", "start:producer": "node producer.js" }, ...
重要的是start:consumer
和start:producer
。但我们保留start
在我们的文件中(即使它没有做任何有意义的事情),因为 Heroku 构建器希望它在那里。
接下来,我们创建一个Procfile
,它将告诉 Heroku 如何启动 Heroku 应用程序所需的各种工作程序。在我们项目的根文件夹中, Procfile
应如下所示:
consumer_worker: npm run start:consumer producer_worker: npm run start:producer
很简单,对吧?我们将有一个名为consumer_worker
的后台进程工作者,以及另一个名为producer_worker
后台进程工作者。您会注意到,我们没有web
工作者,而这通常是您在Procfile
中为 Web 应用程序看到的。对于我们的 Heroku 应用程序,我们只需要两个后台工作者。我们不需要web
。
这样,我们的所有代码都设置好了。我们已将所有代码提交到存储库,并准备好进行部署。
~/project$ git push heroku main … remote: -----> Build succeeded! … remote: -----> Compressing... remote: Done: 48.6M remote: -----> Launching... … remote: Verifying deploy... done
部署完成后,我们要确保正确扩展我们的 dyno。我们不需要为 Web 进程使用 dyno,但为consumer_worker
和producer_worker
都需要一个 dyno。我们运行以下命令根据我们的需求设置这些进程。
~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1 Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco
现在,一切都应该正常运行。在后台,我们的producer_worker
应该连接到 Kafka 集群,然后开始每隔几秒发布一次天气传感器数据。然后,我们的consumer_worker
应该连接到 Kafka 集群并记录它从订阅的主题收到的任何消息。
要了解我们的consumer_worker
正在做什么,我们可以查看 Heroku 日志。
~/project$ heroku logs --tail … heroku[producer_worker.1]: Starting process with command `npm run start:producer` heroku[producer_worker.1]: State changed from starting to up app[producer_worker.1]: app[producer_worker.1]: > [email protected] start:producer app[producer_worker.1]: > node producer.js app[producer_worker.1]: … heroku[consumer_worker.1]: Starting process with command `npm run start:consumer` heroku[consumer_worker.1]: State changed from starting to up app[consumer_worker.1]: app[consumer_worker.1]: > [email protected] start:consumer app[consumer_worker.1]: > node consumer.js app[consumer_worker.1]: app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"} app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041} app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84} app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3} app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11} app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71} app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55} app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58} app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25} app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1} app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34} app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61} app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36} app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}
成功了!我们知道我们的生产者会定期向 Kafka 发布消息,因为我们的消费者会接收这些消息,然后记录下来。
当然,在大型 EDA 应用中,每个传感器都是一个生产者。它们可能出于各种目的在多个主题上发布信息,也可能都在同一主题上发布信息。并且您的消费者可以订阅多个主题。此外,在我们的演示应用中,我们的消费者只是在eachMessage
上发出了很多信息;但在 EDA 应用中,消费者可能会通过调用第三方 API、发送短信通知或查询数据库来做出响应。
现在您已经对事件、主题、生产者和消费者有了基本的了解,并且知道如何使用 Kafka,您可以开始设计和构建自己的 EDA 应用程序来满足更复杂的业务用例。
EDA 非常强大 - 您可以解耦系统,同时享受易扩展性和实时数据处理等关键功能。对于 EDA,Kafka 是一个关键工具,可帮助您轻松处理高吞吐量数据流。在 Heroku 上使用 Apache Kafka 可帮助您快速入门。由于它是一项托管服务,因此您无需担心 Kafka 集群管理的复杂部分。您只需专注于构建应用程序即可。
从现在开始,您可以进行实验和原型设计。确定哪些用例适合 EDA。深入研究,在 Heroku 上进行测试,然后构建出令人惊叹的东西。祝您编码愉快!