嘿👋 我是麦兹·奎斯特, 的创始人。我们已经实现了一个基于 国产消息队列,我在这里谈谈: All Quiet MongoDB 的 为什么我们重新发明轮子 我们如何重新发明轮子 1. 为什么我们要重新发明轮子 为什么我们需要消息队列? All Quiet 是一个现代事件管理平台,类似于 PagerDuty。 我们的平台需要以下功能: 用户注册后异步发送双重选择加入电子邮件 注册后24小时发送提醒电子邮件 使用 Firebase Cloud Messaging (FCM) 发送推送通知,这可能会因网络或负载问题而失败。由于推送通知对于我们的应用程序至关重要,因此如果出现问题,我们需要重新尝试发送它们。 接受来自我们集成外部的电子邮件并将其处理为事件。此过程可能会失败,因此我们希望将其解耦并处理队列上的每个电子邮件负载。 我们的技术栈 要了解我们的具体要求,深入了解我们的技术堆栈非常重要: 我们运行一个基于 .NET Core 7 的整体 Web 应用程序。 .NET Core 应用程序在 Docker 容器中运行。 我们并行运行多个容器。 HAProxy 实例将 HTTP 请求平均分配到每个容器,确保高可用性设置。 我们使用 MongoDB 作为底层数据库,跨可用区进行复制。 上述所有组件均由 AWS 在通用 EC2 虚拟机上托管。 为什么我们重新发明轮子 我们需要一个简单的排队机制,可以同时在多个进程中运行,同时保证每条消息只被处理一次。 我们不需要发布/订阅模式。 我们的目标不是基于 CQRS/事件源的复杂分布式系统,因为,你知道, 。 分布式系统的第一条规则是不分发 我们希望让事情尽可能简单,遵循选择“ ”的理念。 无聊技术 最终,它是为了最大限度地减少基础设施中移动部件的数量。我们的目标是为我们优秀的客户构建出色的功能,并且可靠地维护我们的服务势在必行。管理单个数据库系统以实现超过 已经足够具有挑战性了。那么,为什么要给自己增加管理额外 HA RabbitMQ 集群的负担呢? 五个九的正常运行时间 为什么不直接使用 AWS SQS? 是的……AWS SQS、Google Cloud Tasks 或 Azure Queue Storage 等云解决方案非常棒!然而,它们会导致供应商锁定。我们只是渴望独立且具有成本效益,同时仍然为客户提供可扩展的服务。 2.我们如何重新发明轮子 什么是消息队列? 消息队列是存储消息的系统。消息的生产者将这些消息存储在队列中,稍后由消费者将其出队进行处理。这对于解耦组件非常有利,特别是当处理消息是一项资源密集型任务时。 我们的队列应该表现出什么特征? 使用 MongoDB 作为我们的数据存储 保证每条消息只被消费一次 允许多个消费者同时处理消息 确保如果消息处理失败,可以重试 启用未来消息消费的调度 不需要保证订购 确保高可用性 确保消息及其状态持久且能够承受重新启动或延长停机时间 MongoDB 多年来已经取得了显着的发展,并且可以满足上面列出的标准。 执行 在接下来的部分中,我将指导您完成消息队列的 MongoDB 特定实现。虽然您需要一个适合您首选编程语言的客户端库,例如 NodeJS、Go 或 All Quiet 中的 C#,但我将分享的概念与平台无关。 队列 您想要使用的每个队列都表示为 MongoDB 数据库中的专用集合。 消息模型 以下是已处理消息的示例: { "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } } 让我们看看消息的每个属性。 _ID 字段是 MongoDB 的规范唯一标识符属性。在这里,它包含一个 ,而不是一个 。我们需要 而不是 因为: _id NumberLong ObjectId NumberLong ObjectId 虽然 值应该随着时间的推移而增加,但它们不一定是单调的。这是因为他们: ObjectId 仅包含一秒的时间分辨率,因此同一秒内创建的 ObjectId 值没有保证的顺序,并且 由客户端生成,这些客户端可能具有不同的系统时钟。 在我们的 C# 实现中,我们生成一个具有毫秒精度的 Id,并根据插入时间保证排序。虽然我们在多消费者环境(类似于 RabbitMQ)中不需要严格的处理顺序,但在仅使用一个消费者进行操作时保持 FIFO 顺序至关重要。使用 ObjectId 来实现这一点是不可行的。如果这对您来说并不重要,您仍然可以使用 ObjectId。 状态 Statuses 属性由一个包含消息处理历史记录的数组组成。在索引 0 处,您将找到当前状态,这对于索引至关重要。 状态对象本身包含三个属性: :可以是“已排队”、“正在处理”、“已处理”或“失败”。 Status :捕获当前时间戳。 Timestamp :记录下一次评估应发生的时间,这对于重试和未来的计划执行都至关重要。 NextReevaluation 有效载荷 此属性包含消息的特定负载。 将消息放入队列 添加消息是向集合中执行的简单插入操作,状态设置为“已入队”。 要立即处理,请将 设置为 。 NextReevaluation null 对于将来的处理,请将 设置为将来您希望处理消息时的时间戳。 NextReevaluation db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }); 消息出队 出队稍微复杂一些,但仍然相对简单。它严重依赖 MongoDB 的并发原子读取和更新能力。 MongoDB 的这一基本功能可确保: 每条消息仅被处理一次。 多个消费者可以安全地同时处理消息。 db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } }); 因此,我们正在读取一条处于“Enqueued”状态的消息,同时通过在位置 0 设置状态“Processing”来修改它。由于此操作是原子操作,因此将保证该消息不会被另一个消费者获取。 将消息标记为已处理 消息处理完成后,只需使用消息 ID 将消息状态更新为“已处理”即可。 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } }); 将消息标记为失败 如果处理失败,我们需要对消息进行相应的标记。通常,您可能想重试处理消息。这可以通过重新排队消息来实现。在许多情况下,根据处理失败的性质,在特定延迟(例如 10 秒)后重新处理消息是有意义的。 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } }); 出队循环 我们已经确定了如何轻松地将项目从“队列”中入队和出队,“队列”实际上只是一个 MongoDB 集合。我们甚至可以利用 字段来“安排”未来的消息。 NextReevaluation 缺少的是我们如何定期出队。消费者需要在某种循环中执行 命令。一种简单的方法是创建一个无限循环,在其中出队并处理消息。这种方法简单有效。但会给数据库和网络带来相当大的压力。 findAndModify 另一种选择是在循环迭代之间引入延迟,例如 100ms。这将显着减少负载,但也会降低出队速度。 该问题的解决方案就是 MongoDB 所说的 。 变更流 MongoDB 变更流 什么是变更流?我无法比 MongoDB 的人更好地解释它: 更改流允许应用程序访问实时数据更改[...]。应用程序可以使用更改流来订阅单个集合上的所有数据更改[...]并立即对它们做出反应。 伟大的!我们能做的是监听队列集合中新创建的文档,这实际上意味着监听新入队的消息 这非常简单: const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } }); 预定消息和孤立消息 然而,更改流方法不适用于计划消息和孤立消息,因为显然没有我们可以监听的更改。 计划的消息只是位于集合中,状态为“已排队”,并且“NextReevaluation”字段设置为未来。 孤立消息是指当其消费者进程终止时处于“处理”状态的消息。它们仍保留在集合中,状态为“正在处理”,但消费者不会将其状态更改为“已处理”或“失败”。 对于这些用例,我们需要恢复到简单的循环。然而,我们可以在迭代之间使用相当大的延迟。 把它包起来 “传统”数据库,如 、 或 MongoDB(我也认为是传统的),如今非常强大。如果使用正确(确保您的索引得到优化!),它们速度很快,可扩展性令人印象深刻,并且在传统托管平台上具有成本效益。 MySQL PostgreSQL 许多用例只需使用数据库和您喜欢的编程语言即可解决。并不总是需要“为正确的工作使用正确的工具”,这意味着维护一组不同的工具,如 Redis、Elasticsearch、RabbitMQ 等。通常,维护开销是不值得的。 虽然所提出的解决方案可能与 RabbitMQ 等的性能不匹配,但它通常已经足够,并且可以扩展到标志着您的初创公司取得重大成功的程度。 软件工程就是要进行权衡。明智地选择你的。