paint-brush
如何在 .NET 中构建事件流应用程序经过@bbejeck
2,959 讀數
2,959 讀數

如何在 .NET 中构建事件流应用程序

经过 Bill Bejeck14m2023/02/13
Read on Terminal Reader

太長; 讀書

流处理是一种软件开发方法,它将事件视为应用程序的主要输入或输出。在这篇博文中,我们将使用 Apache Kafka、.NET 生产者和消费者客户端以及 Microsoft 的任务并行库 (TPL) 构建一个事件流应用程序。 Kafka 客户端和 TPL 负责大部分繁重的工作;您只需要关注您的业务逻辑。
featured image - 如何在 .NET 中构建事件流应用程序
Bill Bejeck HackerNoon profile picture
0-item


当您停下来思考日常生活时,您可以轻松地将所有事情视为一个事件。考虑以下顺序:


  1. 您汽车的“低油量”指示灯亮起
  2. 结果,你在下一个加油站停下来加油
  3. 当你给汽车加油时,系统会提示你加入公司的奖励俱乐部以获得折扣
  4. 你进去注册并获得下次购买的积分


我们可以在这里继续下去,但我已经阐明了我的观点:生活是一连串的事件。鉴于这一事实,您今天将如何设计一个新的软件系统?您会收集不同的结果并以任意时间间隔处理它们,还是等到一天结束时再处理它们?不,你不会;您希望在每个事件发生时立即采取行动。当然,在某些情况下,您可能无法立即对个别情况做出反应……想想一次转储一天的交易量。但是,您仍然会在收到数据后立即采取行动,如果您愿意的话,这是一个相当大的一次性事件。


那么,如何实现处理事件的软件系统呢?答案是流处理。


什么是流处理?

作为处理事件数据的实际技术,流处理是一种将事件视为应用程序的主要输入或输出的软件开发方法。例如,等待根据信息采取行动或响应潜在的欺诈性信用卡购买是没有意义的。其他时候,它可能涉及处理微服务中的传入记录流,并且最有效地处理它们最适合您的应用程序。

无论用例是什么,可以肯定地说事件流方法是处理事件的最佳方法。


在这篇博文中,我们将使用 Apache Kafka®、.NET 生产者和消费者客户端以及 Microsoft 的任务并行库 (TPL)构建一个事件流应用程序。乍一看,您可能不会自动将所有这三者放在一起作为可能一起工作的候选人。当然,Kafka 和 .NET 客户端是一对很好的组合,但是 TPL 在哪里适合呢?


通常情况下,吞吐量是一个关键要求,为了避免由于 Kafka 消费和下游处理之间的阻抗不匹配而导致的瓶颈,我们通常建议在机会出现时进行进程内并行化。


继续阅读以了解所有三个组件如何协同工作以构建健壮且高效的事件流应用程序。最好的部分是 Kafka 客户端和 TPL 负责大部分繁重的工作;您只需要关注您的业务逻辑。


在深入研究该应用程序之前,让我们简要介绍一下每个组件。

阿帕奇卡夫卡

如果流处理是处理事件流的事实标准,那么Apache Kafka就是构建事件流应用程序的事实标准。 Apache Kafka 是以高度可扩展、弹性、容错和安全的方式提供的分布式日志。简而言之,Kafka 使用代理(服务器)和客户端。代理构成了 Kafka 集群的分布式存储层,可以跨越数据中心或云区域。客户端提供从代理集群读取和写入事件数据的能力。 Kafka 集群是容错的:如果任何一个 broker 发生故障,其他 broker 将接管工作以确保持续运行。

Confluent .NET 客户端

我在上一段中提到,客户端要么写入 Kafka 代理集群,要么从中读取。 Apache Kafka 与 Java 客户端捆绑在一起,但还有其他几个客户端可用,即 .NET Kafka 生产者和消费者,这是本博文中应用程序的核心。 .NET 生产者和消费者为 .NET 开发人员带来了 Kafka 事件流的强大功能。有关 .NET 客户端的更多信息,请参阅文档

任务并行库

任务并行库 ( TPL ) 是“System.Threading 和 System.Threading.Tasks 命名空间中的一组公共类型和 API”,简化了编写并发应用程序的工作。 TPL 通过处理以下细节使添加并发成为一项更易于管理的任务:


1. 处理工作的划分 2. 在 ThreadPool 上调度线程 3. 取消、状态管理等底层细节


底线是使用 TPL 可以最大限度地提高应用程序的处理性能,同时让您专注于业务逻辑。具体来说,您将使用 TPL 的数据流库子集。


数据流库是一个基于参与者的编程模型,允许进程内消息传递和流水线任务。 Dataflow 组件建立在 TPL 的类型和调度基础结构之上,并与 C# 语言无缝集成。从 Kafka 读取通常很快,但处理(数据库调用或 RPC 调用)通常是瓶颈。我们可以利用的任何并行化机会都可以在不牺牲顺序保证的情况下实现更高的吞吐量,这些都值得考虑。


在这篇博文中,我们将利用这些 Dataflow 组件以及 .NET Kafka 客户端来构建一个流处理应用程序,该应用程序将在数据可用时对其进行处理。

数据流块

在我们进入您要构建的应用程序之前;我们应该提供一些关于什么构成 TPL 数据流库的背景信息。当您有需要高吞吐量的 CPU 和 I/O 密集型任务时,此处详述的方法最适用。 TPL 数据流库由可以缓冲和处理传入数据或记录的块组成,这些块属于以下三类之一:


  1. 源块——充当数据源,其他块可以从中读取。

  2. 目标块——数据的接收者或接收器,可以被其他块写入。

  3. 传播块——既作为源块又作为目标块。


您采用不同的块并将它们连接起来以形成线性处理管道或更复杂的处理图。考虑以下插图:



图上的每个节点代表不同的处理或计算任务。



数据流库提供了几种预定义的块类型,它们分为三类:缓冲、执行和分组。我们正在为为这篇博文开发的项目使用缓冲和执行类型。 BufferBlock<T> 是一种用于缓冲数据的通用结构,非常适合用于生产者/消费者应用程序。 BufferBlock 使用先进先出队列来处理传入数据。


BufferBlock(以及扩展它的类)是数据流库中唯一提供直接写入和读取消息的块类型;其他类型期望从块接收消息或向块发送消息。为此,我们在创建源块和实现ISourceBlock接口以及接收块实现ITargetBlock接口时使用BufferBlock作为委托。


我们的应用程序中使用的另一种数据流块类型是TransformBlock <TInput, TOutput> 。与数据流库中的大多数块类型一样,您可以通过提供Func<TInput, TOutput>来创建 TransformBlock 的实例,以充当转换块为其接收的每个输入记录执行的委托。


Dataflow 块的两个基本特征是您可以控制它将缓冲的记录数和并行度级别。


通过设置最大缓冲容量,当应用程序在处理管道中的某个点遇到长时间等待时,您的应用程序将自动施加背压。这种背压对于防止数据过度积累是必要的。然后,一旦问题消退并且缓冲区大小减小,它将再次消耗数据。


为块设置并发的能力对于性能至关重要。如果一个块执行 CPU 或 I/O 密集型任务,则自然会倾向于并行工作以提高吞吐量。但是添加并发会导致一个问题——处理顺序。如果给block的task加上threading,就不能保证数据的输出顺序。在某些情况下,顺序无关紧要,但当它确实重要时,需要考虑一个严峻的权衡:并发吞吐量与处理顺序输出的更高吞吐量。幸运的是,您不必与数据流库进行这种权衡。


当您将块的并行度设置为多个时,框架保证它将保持输入记录的原始顺序(请注意,保持并行度的顺序是可配置的,默认值为 true)。如果数据的原始顺序是A、B、C,那么输出顺序将是A、B、C。怀疑?我知道我是,所以我测试了它并发现它像宣传的那样有效。我们稍后会在这篇文章中讨论这个测试。请注意,增加并行性只能通过无状态操作或关联和交换有状态操作来完成,这意味着更改操作的顺序或分组不会影响结果。


在这一点上,你可以看到这是怎么回事。您有一个 Kafka 主题,代表您需要以最快的方式处理的事件。因此,您将构建一个流式应用程序,该应用程序包含一个带有 .NET KafkaConsumer 的源块、用于完成业务逻辑的处理块,以及一个包含用于将最终结果写回 Kafka 主题的 .NET KafkaProducer 的接收器块。这是应用程序的高级视图的说明:




该应用程序将具有以下结构:


  1. 源块:包装 .NET KafkaConsumer 和BufferBlock委托
  2. 转换块:反序列化
  3. 转换块:将传入的 JSON 数据映射到购买对象
  4. 转换块:CPU 密集型任务(模拟)
  5. 转换块:序列化
  6. 目标块:包装 .NET KafkaProducer 和BufferBlock委托


接下来是对应用程序整体流程的描述,以及关于利用 Kafka 和数据流库构建强大的事件流应用程序的一些关键点。


事件流应用程序

这是我们的场景:您有一个 Kafka 主题,它从您的在线商店接收购买记录,传入的数据格式为 JSON。您希望通过对购买详细信息应用 ML 推理来处理这些购买事件。此外,您希望将 JSON 记录转换为 Protobuf 格式,因为这是公司范围内的数据格式。当然,应用程序的吞吐量是必不可少的。 ML 操作是 CPU 密集型操作,因此您需要一种方法来最大化应用程序吞吐量,以便您可以利用并行化该应用程序的那一部分。


将数据消费到管道中

让我们从源代码块开始,了解流式应用程序的关键点。我之前提到过实现ISourceBlock接口,并且由于BufferBlock也实现了ISourceBlock ,我们将使用它作为委托来满足所有接口方法。所以源块实现将包装一个 KafkaConsumer 和 BufferBlock。在我们的源代码块中,我们将有一个单独的线程,其唯一职责是让消费者将其消费的记录传递到缓冲区中。从那里,缓冲区会将记录转发到管道中的下一个块。


在将记录转发到缓冲区之前, ConsumeRecord (由Consumer.consume调用返回)由Record抽象包装,除了键和值外,它还捕获原始分区和偏移量,这对应用程序至关重要 - 并且稍后我会解释原因。还值得注意的是,整个管道都与Record抽象一起工作,因此任何转换都会导致一个新的Record对象包装键、值和其他基本字段,例如在整个管道中保留它们的原始偏移量。


加工块

该应用程序将处理分解为几个不同的块。每个块链接到处理链中的下一步,因此源块链接到处理反序列化的第一个块。虽然 .NET KafkaConsumer 可以处理记录的反序列化,但我们让消费者传递序列化的有效负载并在 Transform 块中进行反序列化。反序列化可能是 CPU 密集型的,因此将其放入其处理块中可以让我们在必要时并行化操作。


反序列化后,记录流入另一个 Transform 块,该块将 JSON 有效负载转换为 Protobuf 格式的 Purchase 数据模型对象。更有趣的部分出现在数据进入下一个块时,代表完全完成购买交易所需的 CPU 密集型任务。应用程序模拟这部分,提供的函数以一到三秒之间的任意时间随机休眠。


这个模拟处理块是我们利用数据流块框架功能的地方。当您实例化数据流块时,您提供一个委托 Func 实例,它应用于它遇到的每条记录和一个ExecutionDataflowBlockOptions实例。我之前提到过配置数据流块,但我们将在这里再次快速回顾它们。 ExecutionDataflowBlockOptions包含两个基本属性:该块的最大缓冲区大小和最大并行化程度。


虽然我们将管道中所有块的缓冲区大小配置设置为 10,000 条记录,但我们坚持使用默认并行化级别 1,除了我们模拟的 CPU 密集型,我们将其设置为 4。请注意,默认数据流缓冲区大小为无限。我们将在下一节讨论性能影响,但现在,我们将完成应用程序概述。


密集处理块转发到一个序列化转换块,该块提供给接收器块,然后包装 .NET KafkaProducer 并将最终结果生成到 Kafka 主题。接收器块还使用一个委托BufferBlock和一个单独的线程来进行生产。线程从缓冲区中检索下一条可用记录。然后它调用KafkaProducer.Produce方法,传递一个包装DeliveryReport Action委托——生产者 I/O 线程将在生产请求完成后执行Action委托。


这完成了应用程序的高级演练。现在,让我们讨论我们设置的一个关键部分——如何处理提交偏移量——鉴于我们正在从消费者那里流水线记录,这是至关重要的。


提交偏移量

使用 Kafka 处理数据时,您将定期提交您的应用程序已成功处理到给定点的记录的偏移量(偏移量是 Kafka 主题中记录的逻辑位置)。那么为什么要提交抵消呢?这是一个很容易回答的问题:当您的消费者以受控方式或错误关闭时,它将从最后已知的提交偏移量恢复处理。通过定期提交偏移量,如果您的应用程序在处理了一些记录之后但在提交之前关闭,您的消费者将不会重新处理记录或至少最小数量。这种方法被称为至少一次处理,它保证记录至少被处理一次,并且在出现错误的情况下,可能其中一些会被重新处理,但是当备选方案是冒着数据丢失的风险时,这是一个很好的选择。 Kafka 还提供恰好一次处理保证,虽然我们不会在这篇博文中讨论事务,但您可以阅读更多关于 Kafka 事务的信息这篇博文.


虽然有几种不同的方法来提交偏移量,但最简单和最基本的是自动提交方法。消费者读取记录,应用程序处理它们。经过一段可配置的时间后(基于记录时间戳),消费者将提交已消费记录的偏移量。通常,自动提交是一种合理的方法;在典型的消费流程循环中,在成功处理之前消费的所有记录之前,您不会返回给消费者。如果出现意外错误或关闭,代码永远不会返回给消费者,因此不会发生提交。但是在我们这里的应用程序中,我们是流水线——我们获取消耗的记录并将它们推入缓冲区并返回以消耗更多——没有等待成功处理。


使用流水线方法,我们如何保证至少处理一次?我们将利用IConsumer.StoreOffset方法,该方法负责处理单个参数( TopicPartitionOffset )并将其(连同其他偏移量)存储起来用于下一次提交。请注意,这种偏移量管理方法对比了自动提交与 Java API 的工作方式。


因此,提交过程以这种方式运行:当接收器块检索到要生成给 Kafka 的记录时,它还会将其提供给 Action 委托。生产者执行回调时,将原始偏移量传递给消费者(源块中的同一实例),消费者使用 StoreOffset 方法。您仍然为消费者启用了自动提交,但是您提供了要提交的偏移量,而不是让消费者盲目地提交它到目前为止消耗的最新偏移量。



提交偏移量


因此,即使应用程序使用流水线,它也仅在收到来自代理的确认后才提交,这意味着代理和最小副本代理集已存储记录。以这种方式工作可以让应用程序更快地进行,因为消费者可以在块执行它们的工作时不断地获取和馈送管道。这种方法是可能的,因为 .NET 消费者客户端是线程安全的(有些方法不是,并且被记录为这样),所以我们可以让我们的单一消费者在源和接收器块线程中安全地工作。


对于生产阶段的任何错误,应用程序都会记录错误并将记录放回嵌套的BufferBlock中,以便生产者将重试将记录发送给代理。但是这种重试逻辑是盲目完成的,在实践中,您可能需要更健壮的解决方案。

性能影响

现在我们已经介绍了应用程序的工作原理,让我们看看性能数据。所有测试都是在 macOS Big Sur (11.6) 笔记本电脑上本地执行的,因此在这种情况下您的情况可能会有所不同。性能测试设置很简单:


  1. 以 JSON 格式向 Kafka 主题生成 1M 条记录。此步骤是提前完成的,未包含在测试测量中。

  2. 启动支持 Kafka Dataflow 的应用程序并将所有块的并行化设置为 1(默认值)

  3. 应用程序一直运行到成功处理了 1M 条记录,然后关闭

  4. 记录处理所有记录所花费的时间


第二轮的唯一区别是将模拟的 CPU 密集型块的 MaxDegreeOfParallelism 设置为 4。

以下是结果:


记录数

并发因素

时间(分钟)

1M

1个

38

1M

4个

9


因此,通过简单地设置配置,我们在保持事件顺序的同时显着提高了吞吐量。因此,通过将最大并行度设置为四,我们可以得到预期的加速比四倍以上。但这种性能改进的关键部分是您没有编写任何并发代码,这将很难正确完成。


在博客文章的前面,我提到了一个测试来验证与 Dataflow 块的并发性保留事件顺序,所以现在让我们谈谈这个。试验涉及以下步骤:


  1. 为 Kafka 主题生成 1M 个整数(0-999,999)

  2. 修改参考应用程序以使用整数类型

  3. 为模拟的远程进程块运行并发级别为 1 的应用程序——生成一个 Kafka 主题

  4. 重新运行并发级别为 4 的应用程序并生成另一个 Kafka 主题的数字

  5. 运行程序以使用来自两个结果主题的整数并将它们存储在内存中的数组中

  6. 比较两个数组并确认它们的顺序相同


该测试的结果是两个数组都包含从 0 到 999,999 的整数,证明使用并行级别超过 1 的 Dataflow 块确实保持了传入数据的处理顺序。您可以在文档中找到有关数据流并行性的更多详细信息。

概括

这篇文章中,我们介绍了如何使用 .NET Kafka 客户端和任务并行库来构建一个健壮的、高吞吐量的事件流应用程序。 Kafka 提供高性能事件流,Task Parallel Library 为您提供创建并发应用程序的构建块,并带有缓冲来处理所有细节,让开发人员可以专注于业务逻辑。虽然应用程序的场景有点做作,但希望您能看到结合这两种技术的用处。试一试-这是 GitHub 存储库.



也发布在这里。