Hey👋 I'm Mads Quist, founder of . We've implemented a home-grown message queue based on and I'm here to talk about: All Quiet MongoDB Why we re-invented the wheel How we re-invented the wheel 1. Why we re-invented the wheel Why do we need message queuing? All Quiet is a modern . incident management platform, similar to PagerDuty Our platform requires features like: Sending a double-opt-in email asynchronously after a user registers Sending a reminder email 24 hours after registration Sending push notifications with Firebase Cloud Messaging (FCM), which can fail due to network or load problems. As push notifications are crucial to our app, we need to retry sending them if there's an issue. Accepting emails from outside our integration and processing them into incidents. This process can fail, so we wanted to decouple it and process each email payload on a queue. Our tech stack To understand our specific requirements, it's important to get some insights into our tech stack: We run a monolithic web application based on .NET Core 7. The .NET Core application runs in a Docker container. We run multiple containers in parallel. An HAProxy instance distributes HTTP requests equally to each container, ensuring a highly available setup. We use MongoDB as our underlying database, replicated across availability zones. All of the above components are hosted by AWS on generic EC2 VMs. Why we re-invented the wheel We desired a simple queuing mechanism that could run in multiple processes simultaneously while guaranteeing that each message was processed only once. We didn't need a pub/sub pattern. We didn't aim for a complex distributed system based on CQRS / event sourcing because, you know, . the first rule of distributed systems is to not distribute We wanted to keep things as simple as possible, following the philosophy of choosing " ". boring technology Ultimately, it's about minimizing the number of moving parts in your infrastructure. We aim to build fantastic features for our excellent customers, and it's imperative to maintain our services reliably. Managing a single database system to achieve more than is challenging enough. So why burden yourself with managing an additional HA RabbitMQ cluster? five nines of uptime Why not just use AWS SQS? Yeah… cloud solutions like AWS SQS, Google Cloud Tasks, or Azure Queue Storage are fantastic! However, they would have resulted in vendor lock-in. We simply aspire to be independent and cost-effective while still providing a scalable service to our clients. 2. How we re-invented the wheel What is a message queue? A message queue is a system that stores messages. Producers of messages store these in the queue, which are later dequeued by consumers for processing. This is incredibly beneficial for decoupling components, especially when processing messages is a resource-intensive task. What characteristics should our queue show? Utilizing MongoDB as our data storage Guaranteeing that each message is consumed only once Allowing multiple consumers to process messages simultaneously Ensuring that if message processing fails, retries are possible Enabling scheduling of message consumption for the future Not needing guaranteed ordering Ensuring high availability Ensuring messages and their states are durable and can withstand restarts or extended downtimes MongoDB has significantly evolved over the years and can meet the criteria listed above. Implementation In the sections that follow, I'll guide you through the MongoDB-specific implementation of our message queue. While you'll need a client library suitable for your preferred programming language, such as NodeJS, Go, or C# in the case of All Quiet, the concepts I'll share are platform agnostic. Queues Each queue you want to utilize is represented as a dedicated collection in your MongoDB database. Message Model Here's an example of a processed message: { "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } } Let’s look at each property of the message. _id The field is the canonical unique identifier property of MongoDB. Here, it contains a , not an . We need instead of because: _id NumberLong ObjectId NumberLong ObjectId While values should increase over time, they are not necessarily monotonic. This is because they: ObjectId Only contain one second of temporal resolution, so ObjectId values created within the same second do not have a guaranteed ordering, and Are generated by clients, which may have differing system clocks. In our C# implementation, we generate an Id with millisecond precision and guaranteed ordering based on insertion time. Although we don't require strict processing order in a multi-consumer environment (similar to RabbitMQ), it's essential to maintain FIFO order when operating with just one consumer. Achieving this with ObjectId is not feasible. If this isn't crucial for you, you can still use ObjectId. Statuses The Statuses property consists of an array containing the message processing history. At index 0, you'll find the current status, which is crucial for indexing. The status object itself contains three properties: : Can be "Enqueued", "Processing", "Processed", or "Failed". Status : This captures the current timestamp. Timestamp : Records when the next evaluation should occur, which is essential for both retries and future scheduled executions. NextReevaluation Payload This property contains the specific payload of your message. Enqueuing a message Adding a message is a straightforward insert operation into the collection with the status set to "Enqueued". For immediate processing, set to . NextReevaluation null For future processing, set to a timestamp in the future, when you want your message to be processed. NextReevaluation db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }); Dequeuing a message Dequeuing is slightly more complex but still relatively straightforward. It heavily relies on the concurrent atomic read and update capabilities of MongoDB. This essential feature of MongoDB ensures: Each message is processed only once. Multiple consumers can safely process messages simultaneously. db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } }); So we are reading one message that is in state “Enqueued” and at the same time modify it by setting the status “Processing” at position 0. Since this operation is atomic it will guarantee that the message will not be picked up by another consumer. Marking a message as processed Once the processing of the message is complete, it's a simple matter of updating the message status to "Processed" using the message’s id. db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } }); Marking a message as failed If processing fails, we need to mark the message accordingly. Often, you might want to retry processing the message. This can be achieved by re-enqueuing the message. In many scenarios, it makes sense to reprocess the message after a specific delay, such as 10 seconds, depending on the nature of the processing failure. db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } }); The dequeuing loop We've established how we can easily enqueue and dequeue items from our "queue," which is, in fact, simply a MongoDB collection. We can even "schedule" messages for the future by leveraging the field. NextReevaluation What's missing is how we will dequeue regularly. Consumers need to execute the command in some kind of loop. A straightforward approach would be to create an endless loop in which we dequeue and process a message. This method is straightforward and effective. However, it will exert considerable pressure on the database and the network. findAndModify An alternative would be to introduce a delay, e.g., 100ms, between loop iterations. This will significantly reduce the load but will also decrease the speed of dequeuing. The solution to the problem is what MongoDB refers to as a . change stream MongoDB Change Streams What are change streams? I can’t explain it better than the guys at MongoDB: Change streams allow applications to access real-time data changes […]. Applications can use change streams to subscribe to all data changes on a single collection […] and immediately react to them. Great! What we can do is listen to newly created documents in our queue collection, which effectively means listening to newly enqueued messages This is dead simple: const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } }); Scheduled and Orphaned Messages The change stream approach, however, does not work for both scheduled and orphaned messages because there is obviously no change that we can listen to. Scheduled messages simply sit in the collection with the status "Enqueued" and a "NextReevaluation" field set to the future. Orphaned messages are those that were in the "Processing" status when their consumer process died. They remain in the collection with the status "Processing," but no consumer will ever change their status to "Processed" or "Failed." For these use cases, we need to revert to our simple loop. However, we can use a rather generous delay between iterations. Wrapping it up "Traditional" databases, like , , or MongoDB (which I also view as traditional), are incredibly powerful today. If used correctly (ensure your indexes are optimized!), they are swift, scale impressively, and are cost-effective on traditional hosting platforms. MySQL PostgreSQL Many use cases can be addressed using just a database and your preferred programming language. It's not always necessary to have the "right tool for the right job," meaning maintaining a diverse set of tools like Redis, Elasticsearch, RabbitMQ, etc. Often, the maintenance overhead isn't worth it. While the solution proposed might not match the performance of, for instance, RabbitMQ, it's usually sufficient and can scale to a point that would mark significant success for your startup. Software engineering is about navigating trade-offs. Choose yours wisely.