paint-brush
Node.js 教程:如何使用 Kafka 构建简单的事件驱动应用程序经过@alvinslee
新歷史

Node.js 教程:如何使用 Kafka 构建简单的事件驱动应用程序

经过 Alvin Lee14m2024/06/28
Read on Terminal Reader

太長; 讀書

了解如何在 Heroku 上使用 Node.js 和 Apache Kafka 构建简单的事件驱动应用程序。本指南介绍如何设置 Kafka 集群、创建 Node.js 应用程序来生成和使用事件,以及如何在 Heroku 上部署应用程序。最后,您将获得一个具有实时数据处理的事件驱动架构的工作示例。
featured image - Node.js 教程:如何使用 Kafka 构建简单的事件驱动应用程序
Alvin Lee HackerNoon profile picture

您是否想知道您最喜欢的一些应用程序如何处理实时更新?实时体育比分、股票行情甚至社交媒体通知 — 它们都依赖事件驱动架构 (EDA) 来即时处理数据。EDA 就像进行对话一样,每一条新信息都会触发即时响应。它使应用程序更具交互性和响应性。


在本演练中,我们将指导您在 Heroku 上使用 Apache Kafka 构建一个简单的事件驱动应用程序。我们将介绍:

  • 在 Heroku 上设置 Kafka 集群


  • 构建一个生成和使用事件的 Node.js 应用程序


  • 将应用程序部署到 Heroku


Apache Kafka是构建 EDA 系统的强大工具。它是一个专为处理实时数据馈送而设计的开源平台。Heroku上的 Apache Kafka是 Heroku 的一个附加组件,它提供 Kafka 即服务。Heroku 使应用程序的部署和管理变得非常容易,我最近在我的项目中更多地使用它。当您想要运行事件驱动的应用程序时,将 Kafka 与 Heroku 结合起来可以简化设置过程。


在本指南结束时,您将拥有一个正在运行的应用程序,该应用程序展示了 Heroku 上 Apache Kafka 的 EDA 功能。让我们开始吧!

入门

在深入研究代码之前,让我们快速回顾一些核心概念。一旦理解了这些概念,接下来的内容就会更容易。


  • 事件是表示系统中某些事件的数据片段,例如传感器的温度读数。


  • 主题是发布事件的类别或渠道。可以将它们视为您在新闻通讯中订阅的主题。


  • 生产者是创建事件并将其发送到主题的实体。在我们的演示 EDA 应用程序中,我们的生产者将是一组天气传感器。


  • 消费者是读取和处理来自主题的事件的实体。我们的应用程序将有一个消费者,用于监听天气数据事件并记录这些事件。

应用程序简介

我们将使用KafkaJS库构建一个 Node.js 应用程序。以下是我们应用程序工作原理的简要概述:


  1. 我们的天气传感器(生产者)将定期生成数据(例如温度、湿度和气压),并将这些事件发送到 Apache Kafka。出于演示目的,数据将随机生成。


  2. 我们将让消费者监听主题。当收到新事件时,它会将数据写入日志。


  3. 我们将把整个设置部署到 Heroku,并使用 Heroku 日志来监控发生的事件。

先决条件

在开始之前,请确保您已准备好以下内容:


  • Heroku 帐户:如果您没有,请在 Heroku 上注册
  • Heroku CLI:下载并安装Heroku CLI。
  • 在您的本地机器上安装 Node.js 以供开发。在我的机器上,我使用 Node (v.20.9.0) 和 npm (10.4.0)。


整个项目的代码库可以在这个GitHub 存储库中找到。欢迎随意克隆代码并关注本篇文章。


现在我们已经介绍了基础知识,让我们在 Heroku 上设置我们的 Kafka 集群并开始构建。

在 Heroku 上设置 Kafka 集群

让我们在 Heroku 上完成所有设置。这是一个非常快速和简单的过程。

步骤 1:通过 Heroku CLI 登录

~/project$ heroku login

第 2 步:创建 Heroku 应用

~/project$ heroku create weather-eda


(我将我的 Heroku 应用程序命名为weather-eda ,但您可以为您的应用程序选择一个唯一的名称。)

步骤 3:在 Heroku 附加组件上添加 Apache Kafka

 ~/project$ heroku addons:create heroku-kafka:basic-0 Creating heroku-kafka:basic-0 on ⬢ weather-eda... ~$0.139/hour (max $100/month) The cluster should be available in a few minutes. Run `heroku kafka:wait` to wait until the cluster is ready. You can read more about managing Kafka at https://devcenter.heroku.com/articles/kafka-on-heroku#managing-kafka kafka-adjacent-07560 is being created in the background. The app will restart when complete... Use heroku addons:info kafka-adjacent-07560 to check creation progress Use heroku addons:docs heroku-kafka to view documentation


您可以在此处找到有关 Heroku 上的 Apache Kafka 插件的更多信息。对于我们的演示,我添加了插件的 Basic 0 层。插件的费用为每小时 0.139 美元。在构建此演示应用程序的过程中,我使用该插件的时间不到一小时,然后我就将其关闭了。


Heroku 需要几分钟才能启动 Kafka 并为您做好准备。很快,您将看到以下内容:


 ~/project$ heroku addons:info kafka-adjacent-07560 === kafka-adjacent-07560 Attachments: weather-eda::KAFKA Installed at: Mon May 27 2024 11:44:37 GMT-0700 (Mountain Standard Time) Max Price: $100/month Owning app: weather-eda Plan: heroku-kafka:basic-0 Price: ~$0.139/hour State: created

步骤 4:获取 Kafka 凭证和配置

随着 Kafka 集群的启动,我们需要获取凭证和其他配置。Heroku 为我们的应用程序创建了几个配置变量,并使用刚刚创建的 Kafka 集群中的信息填充它们。我们可以通过运行以下命令查看所有这些配置变量:


 ~/project$ heroku config === weather-eda Config Vars KAFKA_CLIENT_CERT: -----BEGIN CERTIFICATE----- MIIDQzCCAiugAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... -----END CERTIFICATE----- KAFKA_CLIENT_CERT_KEY: -----BEGIN RSA PRIVATE KEY----- MIIEowIBAAKCAQEAsgv1oBiF4Az/IQsepHSh5pceL0XLy0uEAokD7ety9J0PTjj3 ... -----END RSA PRIVATE KEY----- KAFKA_PREFIX: columbia-68051. KAFKA_TRUSTED_CERT: -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIBADANBgkqhkiG9w0BAQsFADAyMTAwLgYDVQQDDCdjYS1h ... F+f3juViDqm4eLCZBAdoK/DnI4fFrNH3YzhAPdhoHOa8wi4= -----END CERTIFICATE----- KAFKA_URL: kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096


如您所见,我们有几个配置变量。我们需要在项目根文件夹中创建一个名为.env的文件,其中包含所有这些配置变量值。为此,我们只需运行以下命令:


 ~/project$ heroku config --shell > .env


我们的.env文件如下所示:


 KAFKA_CLIENT_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_CLIENT_CERT_KEY="-----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----" KAFKA_PREFIX="columbia-68051." KAFKA_TRUSTED_CERT="-----BEGIN CERTIFICATE----- ... -----END CERTIFICATE-----" KAFKA_URL="kafka+ssl://ec2-18-233-140-74.compute-1.amazonaws.com:9096,kafka+ssl://ec2-18-208-61-56.compute-1.amazonaws.com:9096...kafka+ssl://ec2-34-203-24-91.compute-1.amazonaws.com:9096"


另外,我们确保将 .env 添加到我们的 .gitignore 文件中。我们不想将这些敏感数据提交到我们的存储库。

步骤 5:将 Kafka 插件安装到 Heroku CLI

Heroku CLI 不附带与 Kafka 相关的现成命令。由于我们使用 Kafka,因此我们需要安装 CLI 插件


 ~/project$ heroku plugins:install heroku-kafka Installing plugin heroku-kafka... installed v2.12.0


现在,我们可以从 CLI 管理我们的 Kafka 集群。


 ~/project$ heroku kafka:info === KAFKA_URL Plan: heroku-kafka:basic-0 Status: available Version: 2.8.2 Created: 2024-05-27T18:44:38.023+00:00 Topics: [··········] 0 / 40 topics, see heroku kafka:topics Prefix: columbia-68051. Partitions: [··········] 0 / 240 partition replicas (partitions × replication factor) Messages: 0 messages/s Traffic: 0 bytes/s in / 0 bytes/s out Data Size: [··········] 0 bytes / 4.00 GB (0.00%) Add-on: kafka-adjacent-07560 ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40)

步骤 6:测试与集群的交互

为了进行健全性检查,让我们来试用一下 Kafka 集群。首先创建一个主题。


 ~/project$ heroku kafka:topics:create test-topic-01 Creating topic test-topic-01 with compaction disabled and retention time 1 day on kafka-adjacent-07560... done Use `heroku kafka:topics:info test-topic-01` to monitor your topic. Your topic is using the prefix columbia-68051.. ~/project$ heroku kafka:topics:info test-topic-01 ▸ topic test-topic-01 is not available yet


大约一分钟后,我们的主题就可用了。


 ~/project$ heroku kafka:topics:info test-topic-01 === kafka-adjacent-07560 :: test-topic-01 Topic Prefix: columbia-68051. Producers: 0 messages/second (0 bytes/second) total Consumers: 0 bytes/second total Partitions: 8 partitions Replication Factor: 3 Compaction: Compaction is disabled for test-topic-01 Retention: 24 hours


接下来,在这个终端窗口中,我们将充当消费者,通过跟踪来收听这个主题。


 ~/project$ heroku kafka:topics:tail test-topic-01


从这里开始,终端只需等待有关该主题的任何事件发布。


在单独的终端窗口中,我们将充当生产者,并发布有关该主题的一些消息。


 ~/project$ heroku kafka:topics:write test-topic-01 "hello world!"


回到我们的消费者终端窗口,我们看到的是:


 ~/project$ heroku kafka:topics:tail test-topic-01 test-topic-01 0 0 12 hello world!


太棒了!我们已成功生成事件并将其消费到 Kafka 集群中的主题。我们已准备好继续使用 Node.js 应用程序。让我们销毁这个测试主题以保持我们的操场整洁。


 ~/project$ heroku kafka:topics:destroy test-topic-01 ▸ This command will affect the cluster: kafka-adjacent-07560, which is on weather-eda ▸ To proceed, type weather-eda or re-run this command with --confirm weather-eda > weather-eda Deleting topic test-topic-01... done Your topic has been marked for deletion, and will be removed from the cluster shortly ~/project$ heroku kafka:topics === Kafka Topics on KAFKA_URL No topics found on this Kafka cluster. Use heroku kafka:topics:create to create a topic (limit 40).

步骤 7:为我们的应用程序准备 Kafka

为了准备在我们的应用程序中使用 Kafka,我们需要创建两个东西:一个主题和一个消费者组。


让我们创建我们的应用程序将使用的主题。


 ~/project$ heroku kafka:topics:create weather-data


接下来,我们将创建应用程序消费者所属的消费者组:


 ~/project$ heroku kafka:consumer-groups:create weather-consumers


我们已准备好构建我们的 Node.js 应用程序!

构建应用程序

让我们初始化一个新项目并安装我们的依赖项。


 ~/project$ npm init -y ~/project$ npm install kafkajs dotenv @faker-js/faker pino pino-pretty


我们的项目将运行两个进程:


  1. consumer.js ,订阅该主题并记录已发布的任何事件。


  2. producer.js ,它将每隔几秒发布一些关于该主题的随机天气数据。


这两个过程都需要使用 KafkaJS 连接到我们的 Kafka 集群,因此我们将模块化我们的代码以使其可重复使用。

使用 Kafka 客户端

在项目src文件夹中,我们创建一个名为kafka.js的文件。它看起来像这样:


 const { Kafka } = require('kafkajs'); const BROKER_URLS = process.env.KAFKA_URL.split(',').map(uri => uri.replace('kafka+ssl://','' )) const TOPIC = `${process.env.KAFKA_PREFIX}weather-data` const CONSUMER_GROUP = `${process.env.KAFKA_PREFIX}weather-consumers` const kafka = new Kafka({ clientId: 'weather-eda-app-nodejs-client', brokers: BROKER_URLS, ssl: { rejectUnauthorized: false, ca: process.env.KAFKA_TRUSTED_CERT, key: process.env.KAFKA_CLIENT_CERT_KEY, cert: process.env.KAFKA_CLIENT_CERT, }, }) const producer = async () => { const p = kafka.producer() await p.connect() return p; } const consumer = async () => { const c = kafka.consumer({ groupId: CONSUMER_GROUP, sessionTimeout: 30000 }) await c.connect() await c.subscribe({ topics: [TOPIC] }); return c; } module.exports = { producer, consumer, topic: TOPIC, groupId: CONSUMER_GROUP };


在此文件中,我们首先创建一个新的 Kafka 客户端。这需要 Kafka 代理的 URL,我们可以从.env文件中的KAFKA_URL变量(最初来自调用 heroku 配置)中解析这些 URL。要验证连接尝试,我们需要提供KAFKA_TRUSTED_CERTKAFKA_CLIENT_CERT_KEYKAFKA_CLIENT_CERT


然后,从我们的 Kafka 客户端,我们创建一个producer和一个consumer ,确保我们的消费者订阅weather-data主题。

关于 Kafka 前缀的说明

请注意,在kafka.js中,我们将KAFKA_PREFIX添加到主题和消费者组名称前面。我们在 Heroku 上使用 Apache Kafka 的 Basic 0 计划,这是一个多租户 Kafka 计划。这意味着我们使用KAFKA_PREFIX 。尽管我们将主题命名为weather-data ,将消费者组命名为weather-consumers ,但它们在多租户 Kafka 集群中的实际名称必须以KAFKA_PREFIX为前缀(以确保它们是唯一的)。


因此,从技术上讲,对于我们的演示,实际的主题名称是columbia-68051.weather-data ,而不是weather-data 。 (消费者组名称也是如此。)

生产者流程

现在,让我们创建后台进程,它将充当我们的天气传感器生产者。在我们的项目根文件夹中,我们有一个名为producer.js的文件。它看起来像这样:


 require('dotenv').config(); const kafka = require('./src/kafka.js'); const { faker } = require('@faker-js/faker'); const SENSORS = ['sensor01','sensor02','sensor03','sensor04','sensor05']; const MAX_DELAY_MS = 20000; const READINGS = ['temperature','humidity','barometric_pressure']; const MAX_TEMP = 130; const MIN_PRESSURE = 2910; const PRESSURE_RANGE = 160; const getRandom = (arr) => arr[faker.number.int(arr.length - 1)]; const getRandomReading = { temperature: () => faker.number.int(MAX_TEMP) + (faker.number.int(100) / 100), humidity: () => faker.number.int(100) / 100, barometric_pressure: () => (MIN_PRESSURE + faker.number.int(PRESSURE_RANGE)) / 100 }; const sleep = (ms) => { return new Promise((resolve) => { setTimeout(resolve, ms); }); }; (async () => { const producer = await kafka.producer() while(true) { const sensor = getRandom(SENSORS) const reading = getRandom(READINGS) const value = getRandomReading[reading]() const data = { reading, value } await producer.send({ topic: kafka.topic, messages: [{ key: sensor, value: JSON.stringify(data) }] }) await sleep(faker.number.int(MAX_DELAY_MS)) } })()


文件中的许多代码与生成随机值有关。我将重点介绍重要部分:


  • 我们将模拟有五个不同的天气传感器。它们的名称可以在SENSORS中找到。


  • 传感器将发出(发布)三个可能读数之一的值: temperaturehumiditybarometric_pressuregetRandomReading对象针对每个读数都有一个函数,用于生成合理的对应值。


  • 整个过程作为具有无限while循环的async函数运行。


while循环中,我们:


  • 随机选择一个sensor
  • 随机选择一个reading
  • 为该读数生成一个随机value
  • 调用producer.send将这些数据发布到主题。 sensor作为事件的key ,而readingvalue将构成事件消息。
  • 然后,我们等待最多 20 秒,然后进行下一次循环。

消费者流程

consumer.js中的后台进程相当简单。


 require('dotenv').config(); const logger = require('./src/logger.js'); const kafka = require('./src/kafka.js'); (async () => { const consumer = await kafka.consumer() await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const sensorId = message.key.toString() const messageObj = JSON.parse(message.value.toString()) const logMessage = { sensorId } logMessage[messageObj.reading] = messageObj.value logger.info(logMessage) } }) })()


我们的consumer已经订阅了weather-data主题。我们调用consumer.run ,然后为eachMessage设置一个处理程序。每当 Kafka 通知consumer有消息时,它都会记录该消息。这就是全部内容。

进程和Procfile

package.json文件中,我们需要添加一些scripts来启动生产者和消费者的后台进程。该文件现在应包含以下内容:


 ... "scripts": { "start": "echo 'do nothing'", "start:consumer": "node consumer.js", "start:producer": "node producer.js" }, ...


重要的是start:consumerstart:producer 。但我们保留start在我们的文件中(即使它没有做任何有意义的事情),因为 Heroku 构建器希望它在那里。


接下来,我们创建一个Procfile ,它将告诉 Heroku 如何启动 Heroku 应用程序所需的各种工作程序。在我们项目的根文件夹中, Procfile应如下所示:


 consumer_worker: npm run start:consumer producer_worker: npm run start:producer


很简单,对吧?我们将有一个名为consumer_worker的后台进程工作者,以及另一个名为producer_worker后台进程工作者。您会注意到,我们没有web工作者,而这通常是您在Procfile中为 Web 应用程序看到的。对于我们的 Heroku 应用程序,我们只需要两个后台工作者。我们不需要web

部署并测试应用程序

这样,我们的所有代码都设置好了。我们已将所有代码提交到存储库,并准备好进行部署。


 ~/project$ git push heroku main … remote: -----> Build succeeded! … remote: -----> Compressing... remote: Done: 48.6M remote: -----> Launching... … remote: Verifying deploy... done


部署完成后,我们要确保正确扩展我们的 dyno。我们不需要为 Web 进程使用 dyno,但为consumer_workerproducer_worker都需要一个 dyno。我们运行以下命令根据我们的需求设置这些进程。


 ~/project$ heroku ps:scale web=0 consumer_worker=1 producer_worker=1 Scaling dynos... done, now running producer_worker at 1:Eco, consumer_worker at 1:Eco, web at 0:Eco


现在,一切都应该正常运行。在后台,我们的producer_worker应该连接到 Kafka 集群,然后开始每隔几秒发布一次天气传感器数据。然后,我们的consumer_worker应该连接到 Kafka 集群并记录它从订阅的主题收到的任何消息。


要了解我们的consumer_worker正在做什么,我们可以查看 Heroku 日志。


 ~/project$ heroku logs --tail … heroku[producer_worker.1]: Starting process with command `npm run start:producer` heroku[producer_worker.1]: State changed from starting to up app[producer_worker.1]: app[producer_worker.1]: > [email protected] start:producer app[producer_worker.1]: > node producer.js app[producer_worker.1]: … heroku[consumer_worker.1]: Starting process with command `npm run start:consumer` heroku[consumer_worker.1]: State changed from starting to up app[consumer_worker.1]: app[consumer_worker.1]: > [email protected] start:consumer app[consumer_worker.1]: > node consumer.js app[consumer_worker.1]: app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:20.660Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"columbia-68051.weather-consumers"} app[consumer_worker.1]: {"level":"INFO","timestamp":"2024-05-28T02:31:23.702Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"columbia-68051.weather-consumers","memberId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","leaderId":"weather-eda-app-nodejs-client-3ee5d1fa-eba9-4b59-826c-d3b924a6e4e4","isLeader":true,"memberAssignment":{"columbia-68051.test-topic-1":[0,1,2,3,4,5,6,7]},"groupProtocol":"RoundRobinAssigner","duration":3041} app[consumer_worker.1]: [2024-05-28 02:31:23.755 +0000] INFO (21): {"sensorId":"sensor01","temperature":87.84} app[consumer_worker.1]: [2024-05-28 02:31:23.764 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.3} app[consumer_worker.1]: [2024-05-28 02:31:23.777 +0000] INFO (21): {"sensorId":"sensor03","temperature":22.11} app[consumer_worker.1]: [2024-05-28 02:31:37.773 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":29.71} app[consumer_worker.1]: [2024-05-28 02:31:54.495 +0000] INFO (21): {"sensorId":"sensor05","barometric_pressure":29.55} app[consumer_worker.1]: [2024-05-28 02:32:02.629 +0000] INFO (21): {"sensorId":"sensor04","temperature":90.58} app[consumer_worker.1]: [2024-05-28 02:32:03.995 +0000] INFO (21): {"sensorId":"sensor02","barometric_pressure":29.25} app[consumer_worker.1]: [2024-05-28 02:32:12.688 +0000] INFO (21): {"sensorId":"sensor04","humidity":0.1} app[consumer_worker.1]: [2024-05-28 02:32:32.127 +0000] INFO (21): {"sensorId":"sensor01","humidity":0.34} app[consumer_worker.1]: [2024-05-28 02:32:32.851 +0000] INFO (21): {"sensorId":"sensor02","humidity":0.61} app[consumer_worker.1]: [2024-05-28 02:32:37.200 +0000] INFO (21): {"sensorId":"sensor01","barometric_pressure":30.36} app[consumer_worker.1]: [2024-05-28 02:32:50.388 +0000] INFO (21): {"sensorId":"sensor03","temperature":104.55}


成功了!我们知道我们的生产者会定期向 Kafka 发布消息,因为我们的消费者会接收这些消息,然后记录下来。


当然,在大型 EDA 应用中,每个传感器都是一个生产者。它们可能出于各种目的在多个主题上发布信息,也可能都在同一主题上发布信息。并且您的消费者可以订阅多个主题。此外,在我们的演示应用中,我们的消费者只是在eachMessage上发出了很多信息;但在 EDA 应用中,消费者可能会通过调用第三方 API、发送短信通知或查询数据库来做出响应。


现在您已经对事件、主题、生产者和消费者有了基本的了解,并且知道如何使用 Kafka,您可以开始设计和构建自己的 EDA 应用程序来满足更复杂的业务用例。

结论

EDA 非常强大 - 您可以解耦系统,同时享受易扩展性和实时数据处理等关键功能。对于 EDA,Kafka 是一个关键工具,可帮助您轻松处理高吞吐量数据流。在 Heroku 上使用 Apache Kafka 可帮助您快速入门。由于它是一项托管服务,因此您无需担心 Kafka 集群管理的复杂部分。您只需专注于构建应用程序即可。


从现在开始,您可以进行实验和原型设计。确定哪些用例适合 EDA。深入研究,在 Heroku 上进行测试,然后构建出令人惊叹的东西。祝您编码愉快!