Hola👋
Soy Mads Quist, fundador de All Quiet . Hemos implementado una cola de mensajes propia basada en MongoDB y estoy aquí para hablar sobre:
All Quiet es una plataforma moderna de gestión de incidentes, similar a PagerDuty.
Nuestra plataforma requiere características como:
Para comprender nuestros requisitos específicos, es importante obtener algunos conocimientos sobre nuestra pila tecnológica:
En última instancia, se trata de minimizar la cantidad de piezas móviles en su infraestructura. Nuestro objetivo es crear características fantásticas para nuestros excelentes clientes y es imperativo mantener nuestros servicios de manera confiable. Administrar un único sistema de base de datos para lograr más de cinco nueves de tiempo de actividad ya es bastante desafiante. Entonces, ¿por qué preocuparse por administrar un clúster HA RabbitMQ adicional?
Sí... ¡las soluciones en la nube como AWS SQS, Google Cloud Tasks o Azure Queue Storage son fantásticas! Sin embargo, habrían dado lugar a una dependencia del proveedor. Simplemente aspiramos a ser independientes y rentables y al mismo tiempo brindar un servicio escalable a nuestros clientes.
Una cola de mensajes es un sistema que almacena mensajes. Los productores de mensajes los almacenan en la cola, que luego los consumidores retiran de la cola para su procesamiento. Esto es increíblemente beneficioso para desacoplar componentes, especialmente cuando el procesamiento de mensajes es una tarea que consume muchos recursos.
MongoDB ha evolucionado significativamente a lo largo de los años y puede cumplir con los criterios enumerados anteriormente.
En las secciones siguientes, lo guiaré a través de la implementación específica de MongoDB de nuestra cola de mensajes. Si bien necesitará una biblioteca cliente adecuada para su lenguaje de programación preferido, como NodeJS, Go o C# en el caso de All Quiet, los conceptos que compartiré son independientes de la plataforma.
Cada cola que desee utilizar se representa como una colección dedicada en su base de datos MongoDB.
A continuación se muestra un ejemplo de un mensaje procesado:
{ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }
Veamos cada propiedad del mensaje.
El campo _id
es la propiedad de identificador único canónico de MongoDB. Aquí contiene un NumberLong
, no un ObjectId . Necesitamos NumberLong
en lugar de ObjectId
porque:
Si bien los valores de ObjectId deberían aumentar con el tiempo, no son necesariamente monótonos. Esto se debe a que ellos:
- Solo contienen un segundo de resolución temporal, por lo que los valores de ObjectId creados dentro del mismo segundo no tienen un orden garantizado, y
- Son generados por clientes, que pueden tener diferentes relojes del sistema.
En nuestra implementación de C#, generamos una identificación con precisión de milisegundos y orden garantizado según el tiempo de inserción. Aunque no requerimos un orden de procesamiento estricto en un entorno de múltiples consumidores (similar a RabbitMQ), es esencial mantener el orden FIFO cuando se opera con un solo consumidor. Lograr esto con ObjectId no es factible. Si esto no es crucial para usted, aún puede usar ObjectId.
La propiedad Estados consta de una matriz que contiene el historial de procesamiento de mensajes. En el índice 0, encontrará el estado actual, que es crucial para la indexación.
El objeto de estado en sí contiene tres propiedades:
Status
: puede ser "En cola", "Procesando", "Procesado" o "Error".Timestamp
: esto captura la marca de tiempo actual.NextReevaluation
: registra cuándo debe ocurrir la siguiente evaluación, lo cual es esencial tanto para los reintentos como para futuras ejecuciones programadas.
Esta propiedad contiene la carga útil específica de su mensaje.
Agregar un mensaje es una operación de inserción sencilla en la colección con el estado establecido en "En cola".
NextReevaluation
en null
.NextReevaluation
en una marca de tiempo en el futuro, cuando desee que se procese su mensaje. db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });
Quitar la cola es un poco más complejo pero relativamente sencillo. Depende en gran medida de las capacidades simultáneas de lectura y actualización atómica de MongoDB.
Esta característica esencial de MongoDB garantiza:
db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Entonces, estamos leyendo un mensaje que está en estado "En cola" y al mismo tiempo lo modificamos estableciendo el estado "Procesando" en la posición 0. Dado que esta operación es atómica, garantizará que el mensaje no será recogido por otro consumidor. .
Una vez que se completa el procesamiento del mensaje, es sencillo actualizar el estado del mensaje a "Procesado" utilizando la identificación del mensaje.
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Si el procesamiento falla, debemos marcar el mensaje en consecuencia. A menudo, es posible que desee volver a intentar procesar el mensaje. Esto se puede lograr volviendo a poner en cola el mensaje. En muchos escenarios, tiene sentido reprocesar el mensaje después de un retraso específico, como 10 segundos, según la naturaleza del error de procesamiento.
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });
Hemos establecido cómo podemos poner y quitar fácilmente elementos de nuestra "cola", que es, de hecho, simplemente una colección de MongoDB. Incluso podemos "programar" mensajes para el futuro aprovechando el campo NextReevaluation
.
Lo que falta es cómo eliminaremos la cola con regularidad. Los consumidores deben ejecutar el comando findAndModify
en algún tipo de bucle. Un enfoque sencillo sería crear un bucle sin fin en el que retiramos de la cola y procesamos un mensaje. Este método es sencillo y eficaz. Sin embargo, ejercerá una presión considerable sobre la base de datos y la red.
Una alternativa sería introducir un retraso, por ejemplo, 100 ms, entre las iteraciones del bucle. Esto reducirá significativamente la carga pero también disminuirá la velocidad de eliminación de la cola.
La solución al problema es lo que MongoDB denomina flujo de cambios .
¿Qué son las corrientes de cambio? No puedo explicarlo mejor que los chicos de MongoDB:
Los flujos de cambios permiten que las aplicaciones accedan a cambios de datos en tiempo real […]. Las aplicaciones pueden utilizar flujos de cambios para suscribirse a todos los cambios de datos en una sola colección […] y reaccionar inmediatamente ante ellos.
¡Excelente! Lo que podemos hacer es escuchar los documentos recién creados en nuestra colección de colas, lo que efectivamente significa escuchar los mensajes recién creados en cola.
Esto es muy simple:
const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Sin embargo, el enfoque de flujo de cambios no funciona tanto para mensajes programados como para mensajes huérfanos porque obviamente no hay ningún cambio que podamos escuchar.
Para estos casos de uso, debemos volver a nuestro bucle simple. Sin embargo, podemos utilizar un retraso bastante generoso entre iteraciones.
Las bases de datos "tradicionales", como MySQL , PostgreSQL o MongoDB (que también considero tradicionales), son increíblemente poderosas hoy en día. Si se usan correctamente (¡asegúrese de que sus índices estén optimizados!), son rápidos, escalan de manera impresionante y rentables en las plataformas de alojamiento tradicionales.
Muchos casos de uso se pueden abordar utilizando solo una base de datos y su lenguaje de programación preferido. No siempre es necesario tener la "herramienta adecuada para el trabajo correcto", lo que significa mantener un conjunto diverso de herramientas como Redis, Elasticsearch, RabbitMQ, etc. A menudo, los gastos generales de mantenimiento no valen la pena.
Si bien es posible que la solución propuesta no iguale el rendimiento de, por ejemplo, RabbitMQ, generalmente es suficiente y puede escalar hasta un punto que marcaría un éxito significativo para su startup.
La ingeniería de software se trata de negociar compensaciones. Elige el tuyo sabiamente.