paint-brush
Pourquoi j'ai créé une file d'attente de messages alimentée par MongoDBby@allquiet
10,359
10,359

Pourquoi j'ai créé une file d'attente de messages alimentée par MongoDB

All Quiet12m2023/08/27
Read on Terminal Reader

Vous pouvez créer une file d'attente de messages haute disponibilité et performante avec MongoDB car elle offre des opérations de lecture/mise à jour atomiques simultanées ainsi que des flux de modifications.
featured image - Pourquoi j'ai créé une file d'attente de messages alimentée par MongoDB
All Quiet HackerNoon profile picture
0-item
1-item
2-item


Hé👋


Je m'appelle Mads Quist, fondateur d' All Quiet . Nous avons implémenté une file d'attente de messages maison basée sur MongoDB et je suis ici pour parler de :

  1. Pourquoi nous avons réinventé la roue
  2. Comment nous avons réinventé la roue


1. Pourquoi nous avons réinventé la roue

Pourquoi avons-nous besoin d’une file d’attente de messages ?

All Quiet est une plateforme moderne de gestion des incidents, similaire à PagerDuty.


Notre plateforme nécessite des fonctionnalités telles que :


  • Envoi d'un e-mail de double opt-in de manière asynchrone après l'inscription d'un utilisateur
  • Envoi d'un email de rappel 24h après l'inscription
  • Envoi de notifications push avec Firebase Cloud Messaging (FCM), qui peuvent échouer en raison de problèmes de réseau ou de charge. Comme les notifications push sont cruciales pour notre application, nous devons réessayer de les envoyer en cas de problème.
  • Accepter les e-mails extérieurs à notre intégration et les traiter en incidents. Ce processus peut échouer, nous avons donc voulu le découpler et traiter chaque charge utile de courrier électronique dans une file d'attente.




Notre pile technologique

Pour comprendre nos besoins spécifiques, il est important d'avoir un aperçu de notre pile technologique :


  • Nous exécutons une application Web monolithique basée sur .NET Core 7.
  • L'application .NET Core s'exécute dans un conteneur Docker.
  • Nous gérons plusieurs conteneurs en parallèle.
  • Une instance HAProxy distribue les requêtes HTTP de manière égale à chaque conteneur, garantissant une configuration hautement disponible.
  • Nous utilisons MongoDB comme base de données sous-jacente, répliquée dans les zones de disponibilité.
  • Tous les composants ci-dessus sont hébergés par AWS sur des machines virtuelles EC2 génériques.

Pourquoi nous avons réinventé la roue

  • Nous souhaitions un mécanisme de file d'attente simple qui puisse s'exécuter dans plusieurs processus simultanément tout en garantissant que chaque message n'était traité qu'une seule fois.
  • Nous n’avions pas besoin d’un modèle pub/sub.
  • Nous n'avions pas visé un système distribué complexe basé sur le CQRS/le sourcing d'événements car, vous savez, la première règle des systèmes distribués est de ne pas distribuer .
  • Nous voulions garder les choses aussi simples que possible, en suivant la philosophie du choix d'une « technologie ennuyeuse ».


En fin de compte, il s'agit de minimiser le nombre de pièces mobiles dans votre infrastructure. Nous visons à créer des fonctionnalités fantastiques pour nos excellents clients, et il est impératif de maintenir nos services de manière fiable. Gérer un système de base de données unique pour atteindre un temps de disponibilité supérieur à cinq neuf est déjà un véritable défi. Alors pourquoi vous charger de la gestion d’un cluster HA RabbitMQ supplémentaire ?


Pourquoi ne pas simplement utiliser AWS SQS ?

Ouais… les solutions cloud comme AWS SQS, Google Cloud Tasks ou Azure Queue Storage sont fantastiques ! Cependant, elles auraient entraîné une dépendance vis-à-vis du fournisseur. Nous aspirons simplement à être indépendants et rentables tout en fournissant un service évolutif à nos clients.



2. Comment nous avons réinventé la roue

Qu'est-ce qu'une file d'attente de messages ?

Une file d'attente de messages est un système qui stocke les messages. Les producteurs de messages les stockent dans la file d'attente, qui sont ensuite retirés de la file d'attente par les consommateurs pour traitement. Ceci est incroyablement bénéfique pour le découplage des composants, en particulier lorsque le traitement des messages est une tâche gourmande en ressources.


Quelles caractéristiques notre file d'attente doit-elle présenter ?

  • Utiliser MongoDB comme stockage de données
  • Garantir que chaque message n'est consommé qu'une seule fois
  • Permettre à plusieurs consommateurs de traiter les messages simultanément
  • S'assurer qu'en cas d'échec du traitement des messages, de nouvelles tentatives sont possibles
  • Permettre la planification de la consommation des messages pour l'avenir
  • Pas besoin de commande garantie
  • Assurer une haute disponibilité
  • S'assurer que les messages et leurs états sont durables et peuvent résister aux redémarrages ou aux temps d'arrêt prolongés


MongoDB a considérablement évolué au fil des années et peut répondre aux critères énumérés ci-dessus.


Mise en œuvre

Dans les sections qui suivent, je vais vous guider à travers l'implémentation spécifique à MongoDB de notre file d'attente de messages. Bien que vous ayez besoin d'une bibliothèque client adaptée à votre langage de programmation préféré, tel que NodeJS, Go ou C# dans le cas d'All Quiet, les concepts que je partagerai sont indépendants de la plate-forme.


Files d'attente

Chaque file d'attente que vous souhaitez utiliser est représentée comme une collection dédiée dans votre base de données MongoDB.

Modèle de message

Voici un exemple de message traité :

 { "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }


Examinons chaque propriété du message.


_identifiant

Le champ _id est la propriété d'identifiant unique canonique de MongoDB. Ici, il contient un NumberLong , pas un ObjectId . Nous avons besoin NumberLong au lieu d' ObjectId car :



Même si les valeurs ObjectId devraient augmenter avec le temps, elles ne sont pas nécessairement monotones. C'est parce qu'ils :

  • Ne contiennent qu'une seconde de résolution temporelle, donc les valeurs ObjectId créées dans la même seconde n'ont pas d'ordre garanti, et
  • Sont générés par les clients, qui peuvent avoir des horloges système différentes.


Dans notre implémentation C#, nous générons un identifiant avec une précision à la milliseconde et un ordre garanti basé sur le temps d'insertion. Bien que nous n'exigeions pas un ordre de traitement strict dans un environnement multi-consommateur (similaire à RabbitMQ), il est essentiel de maintenir l'ordre FIFO lorsque l'on fonctionne avec un seul consommateur. Y parvenir avec ObjectId n’est pas réalisable. Si ce n'est pas crucial pour vous, vous pouvez toujours utiliser ObjectId.


Statuts

La propriété Statuses est constituée d'un tableau contenant l'historique de traitement des messages. À l'index 0, vous trouverez l'état actuel, ce qui est crucial pour l'indexation.


L'objet status lui-même contient trois propriétés :

  • Status : peut être "En file d'attente", "En cours de traitement", "Traité" ou "Échec".
  • Timestamp : Ceci capture l'horodatage actuel.
  • NextReevaluation : enregistre le moment où la prochaine évaluation doit avoir lieu, ce qui est essentiel à la fois pour les nouvelles tentatives et les futures exécutions planifiées.


Charge utile

Cette propriété contient la charge utile spécifique de votre message.


Mettre un message en file d'attente

L'ajout d'un message est une opération d'insertion simple dans la collection avec le statut défini sur « En file d'attente ».

  • Pour un traitement immédiat, définissez NextReevaluation sur null .
  • Pour un traitement futur, définissez NextReevaluation sur un horodatage futur, lorsque vous souhaitez que votre message soit traité.
 db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });


Retirer un message de la file d'attente

Le retrait de la file d'attente est légèrement plus complexe mais reste relativement simple. Il s'appuie fortement sur les capacités simultanées de lecture et de mise à jour atomiques de MongoDB.


Cette fonctionnalité essentielle de MongoDB assure :

  • Chaque message n'est traité qu'une seule fois.
  • Plusieurs consommateurs peuvent traiter les messages simultanément et en toute sécurité.


 db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


Nous lisons donc un message qui est dans l'état « Enqueued » et en même temps le modifions en mettant le statut « Processing » à la position 0. Puisque cette opération est atomique, elle garantira que le message ne sera pas récupéré par un autre consommateur. .


Marquer un message comme traité

Une fois le traitement du message terminé, il suffit de mettre à jour le statut du message sur « Traité » en utilisant l'identifiant du message.

 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


Marquer un message comme ayant échoué

Si le traitement échoue, nous devons marquer le message en conséquence. Souvent, vous souhaiterez peut-être réessayer de traiter le message. Ceci peut être réalisé en remettant le message en file d'attente. Dans de nombreux scénarios, il est judicieux de retraiter le message après un délai spécifique, par exemple 10 secondes, en fonction de la nature de l'échec du traitement.


 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });


La boucle de retrait de la file d'attente

Nous avons établi comment nous pouvons facilement mettre et retirer des éléments de notre « file d'attente », qui est en fait simplement une collection MongoDB. Nous pouvons même « planifier » des messages pour le futur en tirant parti du champ NextReevaluation .


Ce qui manque, c'est la façon dont nous retirerons régulièrement la file d'attente. Les consommateurs doivent exécuter la commande findAndModify dans une sorte de boucle. Une approche simple consisterait à créer une boucle sans fin dans laquelle nous retirons et traitons un message. Cette méthode est simple et efficace. Cependant, cela exercera une pression considérable sur la base de données et le réseau.


Une alternative serait d'introduire un délai, par exemple de 100 ms, entre les itérations de boucle. Cela réduira considérablement la charge mais diminuera également la vitesse de retrait de la file d'attente.


La solution au problème est ce que MongoDB appelle un flux de changement .


Flux de modifications MongoDB

Que sont les flux de changement ? Je ne peux pas l'expliquer mieux que les gars de MongoDB :


Les flux de modifications permettent aux applications d'accéder aux modifications des données en temps réel […]. Les applications peuvent utiliser des flux de modifications pour s'abonner à toutes les modifications de données sur une seule collection […] et y réagir immédiatement.


Super! Ce que nous pouvons faire, c'est écouter les documents nouvellement créés dans notre collection de files d'attente, ce qui signifie effectivement écouter les messages nouvellement mis en file d'attente.


C'est très simple :

 const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });



Messages programmés et orphelins

Cependant, l'approche du flux de modifications ne fonctionne pas pour les messages planifiés et orphelins, car il n'y a évidemment aucun changement que nous pouvons écouter.


  • Les messages planifiés se trouvent simplement dans la collection avec le statut « En file d'attente » et un champ « NextReevaluation » défini sur le futur.
  • Les messages orphelins sont ceux qui étaient dans le statut « En cours de traitement » lorsque leur processus consommateur est mort. Ils restent dans la collection avec le statut « En cours de traitement », mais aucun consommateur ne changera jamais leur statut en « Traité » ou « Échec ».


Pour ces cas d'utilisation, nous devons revenir à notre simple boucle. Cependant, on peut utiliser un délai assez généreux entre les itérations.


L'emballer

Les bases de données « traditionnelles », comme MySQL , PostgreSQL ou MongoDB (que je considère également comme traditionnelles), sont aujourd'hui incroyablement puissantes. S'ils sont utilisés correctement (assurez-vous que vos index sont optimisés !), ils sont rapides, évolutifs de manière impressionnante et rentables sur les plateformes d'hébergement traditionnelles.


De nombreux cas d'utilisation peuvent être traités en utilisant simplement une base de données et votre langage de programmation préféré. Il n'est pas toujours nécessaire d'avoir le « bon outil pour le bon travail », ce qui signifie maintenir un ensemble diversifié d'outils comme Redis, Elasticsearch, RabbitMQ, etc. Souvent, les frais de maintenance n'en valent pas la peine.


Bien que la solution proposée puisse ne pas correspondre aux performances de RabbitMQ, par exemple, elle est généralement suffisante et peut évoluer jusqu'à un point qui marquerait un succès significatif pour votre startup.


Le génie logiciel consiste à faire des compromis. Choisissez le vôtre judicieusement.