paint-brush
为什么我构建了 MongoDB 支持的消息队列by@allquiet
10,359
10,359

为什么我构建了 MongoDB 支持的消息队列

All Quiet12m2023/08/27
Read on Terminal Reader

您可以使用 MongoDB 创建 HA 和高性能消息队列,因为它提供并发原子读取/更新操作以及更改流。
featured image - 为什么我构建了 MongoDB 支持的消息队列
All Quiet HackerNoon profile picture
0-item
1-item
2-item


嘿👋


我是麦兹·奎斯特, All Quiet的创始人。我们已经实现了一个基于MongoDB 的国产消息队列,我在这里谈谈:

  1. 为什么我们重新发明轮子
  2. 我们如何重新发明轮子


1. 为什么我们要重新发明轮子

为什么我们需要消息队列?

All Quiet 是一个现代事件管理平台,类似于 PagerDuty。


我们的平台需要以下功能:


  • 用户注册后异步发送双重选择加入电子邮件
  • 注册后24小时发送提醒电子邮件
  • 使用 Firebase Cloud Messaging (FCM) 发送推送通知,这可能会因网络或负载问题而失败。由于推送通知对于我们的应用程序至关重要,因此如果出现问题,我们需要重新尝试发送它们。
  • 接受来自我们集成外部的电子邮件并将其处理为事件。此过程可能会失败,因此我们希望将其解耦并处理队列上的每个电子邮件负载。




我们的技术栈

要了解我们的具体要求,深入了解我们的技术堆栈非常重要:


  • 我们运行一个基于 .NET Core 7 的整体 Web 应用程序。
  • .NET Core 应用程序在 Docker 容器中运行。
  • 我们并行运行多个容器。
  • HAProxy 实例将 HTTP 请求平均分配到每个容器,确保高可用性设置。
  • 我们使用 MongoDB 作为底层数据库,跨可用区进行复制。
  • 上述所有组件均由 AWS 在通用 EC2 虚拟机上托管。

为什么我们重新发明轮子

  • 我们需要一个简单的排队机制,可以同时在多个进程中运行,同时保证每条消息只被处理一次。
  • 我们不需要发布/订阅模式。
  • 我们的目标不是基于 CQRS/事件源的复杂分布式系统,因为,你知道,分布式系统的第一条规则是不分发
  • 我们希望让事情尽可能简单,遵循选择“无聊技术”的理念。


最终,它是为了最大限度地减少基础设施中移动部件的数量。我们的目标是为我们优秀的客户构建出色的功能,并且可靠地维护我们的服务势在必行。管理单个数据库系统以实现超过五个九的正常运行时间已经足够具有挑战性了。那么,为什么要给自己增加管理额外 HA RabbitMQ 集群的负担呢?


为什么不直接使用 AWS SQS?

是的……AWS SQS、Google Cloud Tasks 或 Azure Queue Storage 等云解决方案非常棒!然而,它们会导致供应商锁定。我们只是渴望独立且具有成本效益,同时仍然为客户提供可扩展的服务。



2.我们如何重新发明轮子

什么是消息队列?

消息队列是存储消息的系统。消息的生产者将这些消息存储在队列中,稍后由消费者将其出队进行处理。这对于解耦组件非常有利,特别是当处理消息是一项资源密集型任务时。


我们的队列应该表现出什么特征?

  • 使用 MongoDB 作为我们的数据存储
  • 保证每条消息只被消费一次
  • 允许多个消费者同时处理消息
  • 确保如果消息处理失败,可以重试
  • 启用未来消息消费的调度
  • 不需要保证订购
  • 确保高可用性
  • 确保消息及其状态持久且能够承受重新启动或延长停机时间


MongoDB 多年来已经取得了显着的发展,并且可以满足上面列出的标准。


执行

在接下来的部分中,我将指导您完成消息队列的 MongoDB 特定实现。虽然您需要一个适合您首选编程语言的客户端库,例如 NodeJS、Go 或 All Quiet 中的 C#,但我将分享的概念与平台无关。


队列

您想要使用的每个队列都表示为 MongoDB 数据库中的专用集合。

消息模型

以下是已处理消息的示例:

 { "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }


让我们看看消息的每个属性。


_ID

_id字段是 MongoDB 的规范唯一标识符属性。在这里,它包含一个NumberLong ,而不是一个ObjectId 。我们需要NumberLong而不是ObjectId因为:



虽然ObjectId值应该随着时间的推移而增加,但它们不一定是单调的。这是因为他们:

  • 仅包含一秒的时间分辨率,因此同一秒内创建的 ObjectId 值没有保证的顺序,并且
  • 由客户端生成,这些客户端可能具有不同的系统时钟。


在我们的 C# 实现中,我们生成一个具有毫秒精度的 Id,并根据插入时间保证排序。虽然我们在多消费者环境(类似于 RabbitMQ)中不需要严格的处理顺序,但在仅使用一个消费者进行操作时保持 FIFO 顺序至关重要。使用 ObjectId 来实现这一点是不可行的。如果这对您来说并不重要,您仍然可以使用 ObjectId。


状态

Statuses 属性由一个包含消息处理历史记录的数组组成。在索引 0 处,您将找到当前状态,这对于索引至关重要。


状态对象本身包含三个属性:

  • Status :可以是“已排队”、“正在处理”、“已处理”或“失败”。
  • Timestamp :捕获当前时间戳。
  • NextReevaluation :记录下一次评估应发生的时间,这对于重试和未来的计划执行都至关重要。


有效载荷

此属性包含消息的特定负载。


将消息放入队列

添加消息是向集合中执行的简单插入操作,状态设置为“已入队”。

  • 要立即处理,请将NextReevaluation设置为null
  • 对于将来的处理,请将NextReevaluation设置为将来您希望处理消息时的时间戳。
 db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });


消息出队

出队稍微复杂一些,但仍然相对简单。它严重依赖 MongoDB 的并发原子读取和更新能力。


MongoDB 的这一基本功能可确保:

  • 每条消息仅被处理一次。
  • 多个消费者可以安全地同时处理消息。


 db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


因此,我们正在读取一条处于“Enqueued”状态的消息,同时通过在位置 0 设置状态“Processing”来修改它。由于此操作是原子操作,因此将保证该消息不会被另一个消费者获取。


将消息标记为已处理

消息处理完成后,只需使用消息 ID 将消息状态更新为“已处理”即可。

 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


将消息标记为失败

如果处理失败,我们需要对消息进行相应的标记。通常,您可能想重试处理消息。这可以通过重新排队消息来实现。在许多情况下,根据处理失败的性质,在特定延迟(例如 10 秒)后重新处理消息是有意义的。


 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });


出队循环

我们已经确定了如何轻松地将项目从“队列”中入队和出队,“队列”实际上只是一个 MongoDB 集合。我们甚至可以利用NextReevaluation字段来“安排”未来的消息。


缺少的是我们如何定期出队。消费者需要在某种循环中执行findAndModify命令。一种简单的方法是创建一个无限循环,在其中出队并处理消息。这种方法简单有效。但会给数据库和网络带来相当大的压力。


另一种选择是在循环迭代之间引入延迟,例如 100ms。这将显着减少负载,但也会降低出队速度。


该问题的解决方案就是 MongoDB 所说的变更流


MongoDB 变更流

什么是变更流?我无法比 MongoDB 的人更好地解释它:


更改流允许应用程序访问实时数据更改[...]。应用程序可以使用更改流来订阅单个集合上的所有数据更改[...]并立即对它们做出反应。


伟大的!我们能做的是监听队列集合中新创建的文档,这实际上意味着监听新入队的消息


这非常简单:

 const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });



预定消息和孤立消息

然而,更改流方法不适用于计划消息和孤立消息,因为显然没有我们可以监听的更改。


  • 计划的消息只是位于集合中,状态为“已排队”,并且“NextReevaluation”字段设置为未来。
  • 孤立消息是指当其消费者进程终止时处于“处理”状态的消息。它们仍保留在集合中,状态为“正在处理”,但消费者不会将其状态更改为“已处理”或“失败”。


对于这些用例,我们需要恢复到简单的循环。然而,我们可以在迭代之间使用相当大的延迟。


把它包起来

“传统”数据库,如MySQLPostgreSQL或 MongoDB(我也认为是传统的),如今非常强大。如果使用正确(确保您的索引得到优化!),它们速度很快,可扩展性令人印象深刻,并且在传统托管平台上具有成本效益。


许多用例只需使用数据库和您喜欢的编程语言即可解决。并不总是需要“为正确的工作使用正确的工具”,这意味着维护一组不同的工具,如 Redis、Elasticsearch、RabbitMQ 等。通常,维护开销是不值得的。


虽然所提出的解决方案可能与 RabbitMQ 等的性能不匹配,但它通常已经足够,并且可以扩展到标志着您的初创公司取得重大成功的程度。


软件工程就是要进行权衡。明智地选择你的。