paint-brush
Por qué construí una cola de mensajes basada en MongoDBby@allquiet
10,337
10,337

Por qué construí una cola de mensajes basada en MongoDB

All Quiet12m2023/08/27
Read on Terminal Reader

Puede crear una cola de mensajes HA y de alto rendimiento con MongoDB porque ofrece operaciones atómicas de lectura/actualización simultáneas, así como flujos de cambio.
featured image - Por qué construí una cola de mensajes basada en MongoDB
All Quiet HackerNoon profile picture
0-item
1-item
2-item


Hola👋


Soy Mads Quist, fundador de All Quiet . Hemos implementado una cola de mensajes propia basada en MongoDB y estoy aquí para hablar sobre:

  1. Por qué reinventamos la rueda
  2. Cómo reinventamos la rueda


1. Por qué reinventamos la rueda

¿Por qué necesitamos colas de mensajes?

All Quiet es una plataforma moderna de gestión de incidentes, similar a PagerDuty.


Nuestra plataforma requiere características como:


  • Envío de un correo electrónico de doble suscripción de forma asincrónica después de que un usuario se registra
  • Envío de un correo electrónico de recordatorio 24 horas después del registro
  • Envío de notificaciones push con Firebase Cloud Messaging (FCM), que pueden fallar debido a problemas de carga o de red. Como las notificaciones automáticas son cruciales para nuestra aplicación, debemos volver a intentar enviarlas si hay algún problema.
  • Aceptar correos electrónicos ajenos a nuestra integración y procesarlos como incidencias. Este proceso puede fallar, por lo que queríamos desacoplarlo y procesar cada carga útil de correo electrónico en una cola.




Nuestra pila tecnológica

Para comprender nuestros requisitos específicos, es importante obtener algunos conocimientos sobre nuestra pila tecnológica:


  • Ejecutamos una aplicación web monolítica basada en .NET Core 7.
  • La aplicación .NET Core se ejecuta en un contenedor Docker.
  • Ejecutamos varios contenedores en paralelo.
  • Una instancia de HAProxy distribuye las solicitudes HTTP por igual a cada contenedor, lo que garantiza una configuración de alta disponibilidad.
  • Usamos MongoDB como nuestra base de datos subyacente, replicada en zonas de disponibilidad.
  • Todos los componentes anteriores están alojados en AWS en máquinas virtuales EC2 genéricas.

Por qué reinventamos la rueda

  • Queríamos un mecanismo de cola simple que pudiera ejecutarse en múltiples procesos simultáneamente y al mismo tiempo garantizar que cada mensaje se procesara solo una vez.
  • No necesitábamos un patrón pub/sub.
  • No buscamos un sistema distribuido complejo basado en CQRS / abastecimiento de eventos porque, ya sabes, la primera regla de los sistemas distribuidos es no distribuir .
  • Queríamos mantener las cosas lo más simples posible, siguiendo la filosofía de elegir " tecnología aburrida ".


En última instancia, se trata de minimizar la cantidad de piezas móviles en su infraestructura. Nuestro objetivo es crear características fantásticas para nuestros excelentes clientes y es imperativo mantener nuestros servicios de manera confiable. Administrar un único sistema de base de datos para lograr más de cinco nueves de tiempo de actividad ya es bastante desafiante. Entonces, ¿por qué preocuparse por administrar un clúster HA RabbitMQ adicional?


¿Por qué no utilizar simplemente AWS SQS?

Sí... ¡las soluciones en la nube como AWS SQS, Google Cloud Tasks o Azure Queue Storage son fantásticas! Sin embargo, habrían dado lugar a una dependencia del proveedor. Simplemente aspiramos a ser independientes y rentables y al mismo tiempo brindar un servicio escalable a nuestros clientes.



2. Cómo reinventamos la rueda

¿Qué es una cola de mensajes?

Una cola de mensajes es un sistema que almacena mensajes. Los productores de mensajes los almacenan en la cola, que luego los consumidores retiran de la cola para su procesamiento. Esto es increíblemente beneficioso para desacoplar componentes, especialmente cuando el procesamiento de mensajes es una tarea que consume muchos recursos.


¿Qué características debe tener nuestra cola?

  • Utilizando MongoDB como nuestro almacenamiento de datos
  • Garantizar que cada mensaje se consuma una sola vez
  • Permitir que varios consumidores procesen mensajes simultáneamente
  • Garantizar que si falla el procesamiento del mensaje, sea posible volver a intentarlo
  • Permitir la programación del consumo de mensajes para el futuro
  • No necesita pedido garantizado
  • Garantizar una alta disponibilidad
  • Garantizar que los mensajes y sus estados sean duraderos y puedan resistir reinicios o tiempos de inactividad prolongados.


MongoDB ha evolucionado significativamente a lo largo de los años y puede cumplir con los criterios enumerados anteriormente.


Implementación

En las secciones siguientes, lo guiaré a través de la implementación específica de MongoDB de nuestra cola de mensajes. Si bien necesitará una biblioteca cliente adecuada para su lenguaje de programación preferido, como NodeJS, Go o C# en el caso de All Quiet, los conceptos que compartiré son independientes de la plataforma.


Colas

Cada cola que desee utilizar se representa como una colección dedicada en su base de datos MongoDB.

Modelo de mensaje

A continuación se muestra un ejemplo de un mensaje procesado:

 { "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }


Veamos cada propiedad del mensaje.


_identificación

El campo _id es la propiedad de identificador único canónico de MongoDB. Aquí contiene un NumberLong , no un ObjectId . Necesitamos NumberLong en lugar de ObjectId porque:



Si bien los valores de ObjectId deberían aumentar con el tiempo, no son necesariamente monótonos. Esto se debe a que ellos:

  • Solo contienen un segundo de resolución temporal, por lo que los valores de ObjectId creados dentro del mismo segundo no tienen un orden garantizado, y
  • Son generados por clientes, que pueden tener diferentes relojes del sistema.


En nuestra implementación de C#, generamos una identificación con precisión de milisegundos y orden garantizado según el tiempo de inserción. Aunque no requerimos un orden de procesamiento estricto en un entorno de múltiples consumidores (similar a RabbitMQ), es esencial mantener el orden FIFO cuando se opera con un solo consumidor. Lograr esto con ObjectId no es factible. Si esto no es crucial para usted, aún puede usar ObjectId.


Estados

La propiedad Estados consta de una matriz que contiene el historial de procesamiento de mensajes. En el índice 0, encontrará el estado actual, que es crucial para la indexación.


El objeto de estado en sí contiene tres propiedades:

  • Status : puede ser "En cola", "Procesando", "Procesado" o "Error".
  • Timestamp : esto captura la marca de tiempo actual.
  • NextReevaluation : registra cuándo debe ocurrir la siguiente evaluación, lo cual es esencial tanto para los reintentos como para futuras ejecuciones programadas.


Carga útil

Esta propiedad contiene la carga útil específica de su mensaje.


Poner en cola un mensaje

Agregar un mensaje es una operación de inserción sencilla en la colección con el estado establecido en "En cola".

  • Para un procesamiento inmediato, establezca NextReevaluation en null .
  • Para procesamiento futuro, configure NextReevaluation en una marca de tiempo en el futuro, cuando desee que se procese su mensaje.
 db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });


Quitar un mensaje de la cola

Quitar la cola es un poco más complejo pero relativamente sencillo. Depende en gran medida de las capacidades simultáneas de lectura y actualización atómica de MongoDB.


Esta característica esencial de MongoDB garantiza:

  • Cada mensaje se procesa solo una vez.
  • Varios consumidores pueden procesar mensajes de forma segura y simultánea.


 db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


Entonces, estamos leyendo un mensaje que está en estado "En cola" y al mismo tiempo lo modificamos estableciendo el estado "Procesando" en la posición 0. Dado que esta operación es atómica, garantizará que el mensaje no será recogido por otro consumidor. .


Marcar un mensaje como procesado

Una vez que se completa el procesamiento del mensaje, es sencillo actualizar el estado del mensaje a "Procesado" utilizando la identificación del mensaje.

 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


Marcar un mensaje como fallido

Si el procesamiento falla, debemos marcar el mensaje en consecuencia. A menudo, es posible que desee volver a intentar procesar el mensaje. Esto se puede lograr volviendo a poner en cola el mensaje. En muchos escenarios, tiene sentido reprocesar el mensaje después de un retraso específico, como 10 segundos, según la naturaleza del error de procesamiento.


 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });


El bucle de salida de la cola

Hemos establecido cómo podemos poner y quitar fácilmente elementos de nuestra "cola", que es, de hecho, simplemente una colección de MongoDB. Incluso podemos "programar" mensajes para el futuro aprovechando el campo NextReevaluation .


Lo que falta es cómo eliminaremos la cola con regularidad. Los consumidores deben ejecutar el comando findAndModify en algún tipo de bucle. Un enfoque sencillo sería crear un bucle sin fin en el que retiramos de la cola y procesamos un mensaje. Este método es sencillo y eficaz. Sin embargo, ejercerá una presión considerable sobre la base de datos y la red.


Una alternativa sería introducir un retraso, por ejemplo, 100 ms, entre las iteraciones del bucle. Esto reducirá significativamente la carga pero también disminuirá la velocidad de eliminación de la cola.


La solución al problema es lo que MongoDB denomina flujo de cambios .


Flujos de cambios de MongoDB

¿Qué son las corrientes de cambio? No puedo explicarlo mejor que los chicos de MongoDB:


Los flujos de cambios permiten que las aplicaciones accedan a cambios de datos en tiempo real […]. Las aplicaciones pueden utilizar flujos de cambios para suscribirse a todos los cambios de datos en una sola colección […] y reaccionar inmediatamente ante ellos.


¡Excelente! Lo que podemos hacer es escuchar los documentos recién creados en nuestra colección de colas, lo que efectivamente significa escuchar los mensajes recién creados en cola.


Esto es muy simple:

 const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });



Mensajes programados y huérfanos

Sin embargo, el enfoque de flujo de cambios no funciona tanto para mensajes programados como para mensajes huérfanos porque obviamente no hay ningún cambio que podamos escuchar.


  • Los mensajes programados simplemente se ubican en la colección con el estado "En cola" y un campo "PróximaReevaluación" configurado para el futuro.
  • Los mensajes huérfanos son aquellos que estaban en estado "Procesando" cuando su proceso de consumo finalizó. Permanecen en la colección con el estado "Procesando", pero ningún consumidor cambiará su estado a "Procesado" o "Error".


Para estos casos de uso, debemos volver a nuestro bucle simple. Sin embargo, podemos utilizar un retraso bastante generoso entre iteraciones.


Envolviendolo

Las bases de datos "tradicionales", como MySQL , PostgreSQL o MongoDB (que también considero tradicionales), son increíblemente poderosas hoy en día. Si se usan correctamente (¡asegúrese de que sus índices estén optimizados!), son rápidos, escalan de manera impresionante y rentables en las plataformas de alojamiento tradicionales.


Muchos casos de uso se pueden abordar utilizando solo una base de datos y su lenguaje de programación preferido. No siempre es necesario tener la "herramienta adecuada para el trabajo correcto", lo que significa mantener un conjunto diverso de herramientas como Redis, Elasticsearch, RabbitMQ, etc. A menudo, los gastos generales de mantenimiento no valen la pena.


Si bien es posible que la solución propuesta no iguale el rendimiento de, por ejemplo, RabbitMQ, generalmente es suficiente y puede escalar hasta un punto que marcaría un éxito significativo para su startup.


La ingeniería de software se trata de negociar compensaciones. Elige el tuyo sabiamente.