嘿👋
我是麦兹·奎斯特, All Quiet的创始人。我们已经实现了一个基于MongoDB 的国产消息队列,我在这里谈谈:
All Quiet 是一个现代事件管理平台,类似于 PagerDuty。
我们的平台需要以下功能:
要了解我们的具体要求,深入了解我们的技术堆栈非常重要:
最终,它是为了最大限度地减少基础设施中移动部件的数量。我们的目标是为我们优秀的客户构建出色的功能,并且可靠地维护我们的服务势在必行。管理单个数据库系统以实现超过五个九的正常运行时间已经足够具有挑战性了。那么,为什么要给自己增加管理额外 HA RabbitMQ 集群的负担呢?
是的……AWS SQS、Google Cloud Tasks 或 Azure Queue Storage 等云解决方案非常棒!然而,它们会导致供应商锁定。我们只是渴望独立且具有成本效益,同时仍然为客户提供可扩展的服务。
消息队列是存储消息的系统。消息的生产者将这些消息存储在队列中,稍后由消费者将其出队进行处理。这对于解耦组件非常有利,特别是当处理消息是一项资源密集型任务时。
MongoDB 多年来已经取得了显着的发展,并且可以满足上面列出的标准。
在接下来的部分中,我将指导您完成消息队列的 MongoDB 特定实现。虽然您需要一个适合您首选编程语言的客户端库,例如 NodeJS、Go 或 All Quiet 中的 C#,但我将分享的概念与平台无关。
您想要使用的每个队列都表示为 MongoDB 数据库中的专用集合。
以下是已处理消息的示例:
{ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }
让我们看看消息的每个属性。
_id
字段是 MongoDB 的规范唯一标识符属性。在这里,它包含一个NumberLong
,而不是一个ObjectId 。我们需要NumberLong
而不是ObjectId
因为:
虽然ObjectId值应该随着时间的推移而增加,但它们不一定是单调的。这是因为他们:
- 仅包含一秒的时间分辨率,因此同一秒内创建的 ObjectId 值没有保证的顺序,并且
- 由客户端生成,这些客户端可能具有不同的系统时钟。
在我们的 C# 实现中,我们生成一个具有毫秒精度的 Id,并根据插入时间保证排序。虽然我们在多消费者环境(类似于 RabbitMQ)中不需要严格的处理顺序,但在仅使用一个消费者进行操作时保持 FIFO 顺序至关重要。使用 ObjectId 来实现这一点是不可行的。如果这对您来说并不重要,您仍然可以使用 ObjectId。
Statuses 属性由一个包含消息处理历史记录的数组组成。在索引 0 处,您将找到当前状态,这对于索引至关重要。
状态对象本身包含三个属性:
Status
:可以是“已排队”、“正在处理”、“已处理”或“失败”。Timestamp
:捕获当前时间戳。NextReevaluation
:记录下一次评估应发生的时间,这对于重试和未来的计划执行都至关重要。
此属性包含消息的特定负载。
添加消息是向集合中执行的简单插入操作,状态设置为“已入队”。
NextReevaluation
设置为null
。NextReevaluation
设置为将来您希望处理消息时的时间戳。 db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });
出队稍微复杂一些,但仍然相对简单。它严重依赖 MongoDB 的并发原子读取和更新能力。
MongoDB 的这一基本功能可确保:
db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
因此,我们正在读取一条处于“Enqueued”状态的消息,同时通过在位置 0 设置状态“Processing”来修改它。由于此操作是原子操作,因此将保证该消息不会被另一个消费者获取。
消息处理完成后,只需使用消息 ID 将消息状态更新为“已处理”即可。
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
如果处理失败,我们需要对消息进行相应的标记。通常,您可能想重试处理消息。这可以通过重新排队消息来实现。在许多情况下,根据处理失败的性质,在特定延迟(例如 10 秒)后重新处理消息是有意义的。
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });
我们已经确定了如何轻松地将项目从“队列”中入队和出队,“队列”实际上只是一个 MongoDB 集合。我们甚至可以利用NextReevaluation
字段来“安排”未来的消息。
缺少的是我们如何定期出队。消费者需要在某种循环中执行findAndModify
命令。一种简单的方法是创建一个无限循环,在其中出队并处理消息。这种方法简单有效。但会给数据库和网络带来相当大的压力。
另一种选择是在循环迭代之间引入延迟,例如 100ms。这将显着减少负载,但也会降低出队速度。
该问题的解决方案就是 MongoDB 所说的变更流。
什么是变更流?我无法比 MongoDB 的人更好地解释它:
更改流允许应用程序访问实时数据更改[...]。应用程序可以使用更改流来订阅单个集合上的所有数据更改[...]并立即对它们做出反应。
伟大的!我们能做的是监听队列集合中新创建的文档,这实际上意味着监听新入队的消息
这非常简单:
const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
然而,更改流方法不适用于计划消息和孤立消息,因为显然没有我们可以监听的更改。
对于这些用例,我们需要恢复到简单的循环。然而,我们可以在迭代之间使用相当大的延迟。
“传统”数据库,如MySQL 、 PostgreSQL或 MongoDB(我也认为是传统的),如今非常强大。如果使用正确(确保您的索引得到优化!),它们速度很快,可扩展性令人印象深刻,并且在传统托管平台上具有成本效益。
许多用例只需使用数据库和您喜欢的编程语言即可解决。并不总是需要“为正确的工作使用正确的工具”,这意味着维护一组不同的工具,如 Redis、Elasticsearch、RabbitMQ 等。通常,维护开销是不值得的。
虽然所提出的解决方案可能与 RabbitMQ 等的性能不匹配,但它通常已经足够,并且可以扩展到标志着您的初创公司取得重大成功的程度。
软件工程就是要进行权衡。明智地选择你的。