Queues are an abstract data structure open at both ends. One end is used to insert data items (referred to as “enqueue”) and the other is used to remove data items (referred to as “dequeue”). Queues follow the “First-In-First-Out” method, meaning that the data item stored first will be accessed first. A similar data structure is called “Stack” which follows the “Last-In-First-Out” method, but we’ll not cover it in this article.
Like any other data structure, Queues are useful in some places, but not in others.
In particular, Queues should only be used on specific portions of a system and not the system as a whole. In this way of thinking, each part of the system needs its own decisions on how it should be developed. You might find them useful in parts dedicated to transferring data asynchronously between processes, distributed components that require high availability or sharing resources among multiple consumers. In the following lines, we’ll be focusing on implementing a queue for delayed jobs.
BullMQ is a Node.js library that implements a fast and reliable queue system based on Redis, which aids in building many modern-day micro-service architectures by encapsulating the complex logic of managing queues and providing an easy-to-use API.
The library is designed so that it will fulfil the following goals:
- Exactly once queue semantics, i.e., attempts to deliver every message exactly one time, but it will deliver at least once in the worst case scenario*.
- Easy to scale horizontally. Add more workers for processing jobs in parallel.
- Consistent.
- High performant. Try to get the highest possible throughput from Redis by combining efficient .lua scripts and pipelining.
You will need to install the followings:
Install the BullMQ module:
$ npm install bullmq
Let’s first create the Consumer that will read the messages from the queue when they’ll be available:
// consumer.js
const { Queue, Worker, QueueScheduler } = require('bullmq')
const redisConfiguration = {
connection: {
host: "localhost",
port: 49153,
username: "default",
password: "redispw"
}
}
function sendEmail(job) {
const { email, message } = job.data;
console.log(`Message ${message} was sent to ${email}.`)
}
const worker = new Worker('emailSchedule', sendEmail, redisConfiguration);
worker.on('completed', job => {
console.info(`${job.id} has completed!`);
});
worker.on('failed', (job, err) => {
console.error(`${job.id} has failed with ${err.message}`);
});
Now let’s create the Producer that will add data to the Queue:
// producer.js
const { Queue, QueueScheduler } = require('bullmq')
// Add your own configuration here
const redisConfiguration = {
connection: {
host: "localhost",
port: 49153,
username: "default",
password: "redispw"
}
}
// Delayed jobs will only be processed if there is at least one QueueScheduler instance configured in the Queue.
new QueueScheduler('emailSchedule', redisConfiguration );
const myQueue = new Queue('emailSchedule', redisConfiguration);
async function emailSchedule(email, message, delay) {
await myQueue.add('email', { email, message }, { delay});
}
emailSchedule("[email protected]", "Hello World!", 5000); // The email will be available for consumption after 5 seconds.
The result will look like this in the console:
The message "Hello World!" was sent to "[email protected]"
That's all folks!