Hé👋
Je m'appelle Mads Quist, fondateur d' All Quiet . Nous avons implémenté une file d'attente de messages maison basée sur MongoDB et je suis ici pour parler de :
All Quiet est une plateforme moderne de gestion des incidents, similaire à PagerDuty.
Notre plateforme nécessite des fonctionnalités telles que :
Pour comprendre nos besoins spécifiques, il est important d'avoir un aperçu de notre pile technologique :
En fin de compte, il s'agit de minimiser le nombre de pièces mobiles dans votre infrastructure. Nous visons à créer des fonctionnalités fantastiques pour nos excellents clients, et il est impératif de maintenir nos services de manière fiable. Gérer un système de base de données unique pour atteindre un temps de disponibilité supérieur à cinq neuf est déjà un véritable défi. Alors pourquoi vous charger de la gestion d’un cluster HA RabbitMQ supplémentaire ?
Ouais… les solutions cloud comme AWS SQS, Google Cloud Tasks ou Azure Queue Storage sont fantastiques ! Cependant, elles auraient entraîné une dépendance vis-à-vis du fournisseur. Nous aspirons simplement à être indépendants et rentables tout en fournissant un service évolutif à nos clients.
Une file d'attente de messages est un système qui stocke les messages. Les producteurs de messages les stockent dans la file d'attente, qui sont ensuite retirés de la file d'attente par les consommateurs pour traitement. Ceci est incroyablement bénéfique pour le découplage des composants, en particulier lorsque le traitement des messages est une tâche gourmande en ressources.
MongoDB a considérablement évolué au fil des années et peut répondre aux critères énumérés ci-dessus.
Dans les sections qui suivent, je vais vous guider à travers l'implémentation spécifique à MongoDB de notre file d'attente de messages. Bien que vous ayez besoin d'une bibliothèque client adaptée à votre langage de programmation préféré, tel que NodeJS, Go ou C# dans le cas d'All Quiet, les concepts que je partagerai sont indépendants de la plate-forme.
Chaque file d'attente que vous souhaitez utiliser est représentée comme une collection dédiée dans votre base de données MongoDB.
Voici un exemple de message traité :
{ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }
Examinons chaque propriété du message.
Le champ _id
est la propriété d'identifiant unique canonique de MongoDB. Ici, il contient un NumberLong
, pas un ObjectId . Nous avons besoin NumberLong
au lieu d' ObjectId
car :
Même si les valeurs ObjectId devraient augmenter avec le temps, elles ne sont pas nécessairement monotones. C'est parce qu'ils :
- Ne contiennent qu'une seconde de résolution temporelle, donc les valeurs ObjectId créées dans la même seconde n'ont pas d'ordre garanti, et
- Sont générés par les clients, qui peuvent avoir des horloges système différentes.
Dans notre implémentation C#, nous générons un identifiant avec une précision à la milliseconde et un ordre garanti basé sur le temps d'insertion. Bien que nous n'exigeions pas un ordre de traitement strict dans un environnement multi-consommateur (similaire à RabbitMQ), il est essentiel de maintenir l'ordre FIFO lorsque l'on fonctionne avec un seul consommateur. Y parvenir avec ObjectId n’est pas réalisable. Si ce n'est pas crucial pour vous, vous pouvez toujours utiliser ObjectId.
La propriété Statuses est constituée d'un tableau contenant l'historique de traitement des messages. À l'index 0, vous trouverez l'état actuel, ce qui est crucial pour l'indexation.
L'objet status lui-même contient trois propriétés :
Status
: peut être "En file d'attente", "En cours de traitement", "Traité" ou "Échec".Timestamp
: Ceci capture l'horodatage actuel.NextReevaluation
: enregistre le moment où la prochaine évaluation doit avoir lieu, ce qui est essentiel à la fois pour les nouvelles tentatives et les futures exécutions planifiées.
Cette propriété contient la charge utile spécifique de votre message.
L'ajout d'un message est une opération d'insertion simple dans la collection avec le statut défini sur « En file d'attente ».
NextReevaluation
sur null
.NextReevaluation
sur un horodatage futur, lorsque vous souhaitez que votre message soit traité. db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });
Le retrait de la file d'attente est légèrement plus complexe mais reste relativement simple. Il s'appuie fortement sur les capacités simultanées de lecture et de mise à jour atomiques de MongoDB.
Cette fonctionnalité essentielle de MongoDB assure :
db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Nous lisons donc un message qui est dans l'état « Enqueued » et en même temps le modifions en mettant le statut « Processing » à la position 0. Puisque cette opération est atomique, elle garantira que le message ne sera pas récupéré par un autre consommateur. .
Une fois le traitement du message terminé, il suffit de mettre à jour le statut du message sur « Traité » en utilisant l'identifiant du message.
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Si le traitement échoue, nous devons marquer le message en conséquence. Souvent, vous souhaiterez peut-être réessayer de traiter le message. Ceci peut être réalisé en remettant le message en file d'attente. Dans de nombreux scénarios, il est judicieux de retraiter le message après un délai spécifique, par exemple 10 secondes, en fonction de la nature de l'échec du traitement.
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });
Nous avons établi comment nous pouvons facilement mettre et retirer des éléments de notre « file d'attente », qui est en fait simplement une collection MongoDB. Nous pouvons même « planifier » des messages pour le futur en tirant parti du champ NextReevaluation
.
Ce qui manque, c'est la façon dont nous retirerons régulièrement la file d'attente. Les consommateurs doivent exécuter la commande findAndModify
dans une sorte de boucle. Une approche simple consisterait à créer une boucle sans fin dans laquelle nous retirons et traitons un message. Cette méthode est simple et efficace. Cependant, cela exercera une pression considérable sur la base de données et le réseau.
Une alternative serait d'introduire un délai, par exemple de 100 ms, entre les itérations de boucle. Cela réduira considérablement la charge mais diminuera également la vitesse de retrait de la file d'attente.
La solution au problème est ce que MongoDB appelle un flux de changement .
Que sont les flux de changement ? Je ne peux pas l'expliquer mieux que les gars de MongoDB :
Les flux de modifications permettent aux applications d'accéder aux modifications des données en temps réel […]. Les applications peuvent utiliser des flux de modifications pour s'abonner à toutes les modifications de données sur une seule collection […] et y réagir immédiatement.
Super! Ce que nous pouvons faire, c'est écouter les documents nouvellement créés dans notre collection de files d'attente, ce qui signifie effectivement écouter les messages nouvellement mis en file d'attente.
C'est très simple :
const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Cependant, l'approche du flux de modifications ne fonctionne pas pour les messages planifiés et orphelins, car il n'y a évidemment aucun changement que nous pouvons écouter.
Pour ces cas d'utilisation, nous devons revenir à notre simple boucle. Cependant, on peut utiliser un délai assez généreux entre les itérations.
Les bases de données « traditionnelles », comme MySQL , PostgreSQL ou MongoDB (que je considère également comme traditionnelles), sont aujourd'hui incroyablement puissantes. S'ils sont utilisés correctement (assurez-vous que vos index sont optimisés !), ils sont rapides, évolutifs de manière impressionnante et rentables sur les plateformes d'hébergement traditionnelles.
De nombreux cas d'utilisation peuvent être traités en utilisant simplement une base de données et votre langage de programmation préféré. Il n'est pas toujours nécessaire d'avoir le « bon outil pour le bon travail », ce qui signifie maintenir un ensemble diversifié d'outils comme Redis, Elasticsearch, RabbitMQ, etc. Souvent, les frais de maintenance n'en valent pas la peine.
Bien que la solution proposée puisse ne pas correspondre aux performances de RabbitMQ, par exemple, elle est généralement suffisante et peut évoluer jusqu'à un point qui marquerait un succès significatif pour votre startup.
Le génie logiciel consiste à faire des compromis. Choisissez le vôtre judicieusement.