This is part 3 of a 3-part series, where we will explore how to use Redis streams with NestJS.
It is structured in 3 parts:
Populating Redis streams and reading from in fan-out mode
Using consumer groups to handle one stream from multiple actors in a way that one message is sent to and processed only by a single actor (consumer)
By the end of this series, you will have the knowledge and tools necessary to create your own NestJS app that utilizes Redis streams to handle real-time data.
Full code is available on the github
In the previous parts, we created a NestJS application and connected it to the Redis server. Then, we implemented functionality for adding messages to the Redis stream and reading from it in a fan-out fashion. Finally, we looked at how we could use the Redis stream data structure for real-time data stream processing by continuously fetching messages from the Redis stream.
In this post, we will build upon this idea by using consumer groups, thus adding the ability for distributed data processing.
In Redis, consumer groups are a way to distribute messages or tasks across multiple consumers while ensuring that each message is processed only once. When you have many messages to process, a single consumer might need help to keep up with the incoming message rate. You can create multiple consumer instances to share the workload in this scenario.
Redis consumer groups enable you to distribute messages to multiple consumers in a coordinated, fault-tolerant way. Each consumer in the group is assigned a unique name and a subset of the stream's messages to process. Redis uses a mechanism called "message acknowledgment" to ensure that each message is processed only once, even if there are multiple consumers in the group.
We used the XREAD
command to fetch data from a stream. If we want to use consumer groups, we will need to use a different command - XREADGROUP
. You can find general info on Redis Docs
Let's create a method in RedisService
for reading data in a consumer group manner:
// redis.service.ts
public async readConsumerGroup({
streamName,
group,
consumer,
blockMs,
count,
}: CosnumeStreamParams): Promise<RedisStreamMessage[] | null> {
let response: RedsXReadGroupResponse = null;
try {
response = await this.redis.xReadGroup(
commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
group,
consumer,
{
key: streamName,
id: '>',
},
{ BLOCK: blockMs, COUNT: count },
);
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
if (error.message.includes('NOGROUP')) {
console.log(`${error.message} ...CREATING GROUP`);
await this.createConsumerGroup(streamName, group);
return null;
}
console.error(
`Failed to xReadGroup from Redis stream: ${error.message}`,
error,
);
return null;
}
const messages = response?.[0]?.messages; // returning first stream (since only 1 stream used)
return messages || null;
}
We have introduced a new interface for params CosnumeStreamParams
. Since this shares many similarities with ReadStreamParams
, we extracted those to a base interface, StreamParamsBase
, and extended them from both of these interfaces.
// interfaces.ts
export interface StreamParamsBase {
/** Name of stream to read from */
streamName: string;
/** Max time in ms for how long to block Redis connection before returning
* If 0 is passed, it will block until at least one message is fetched, or timeout happens
* */
blockMs: number;
/** Max how many messages to fetch at a time from Redis */
count: number;
}
export interface ReadStreamParams extends StreamParamsBase {
/** ID of last fetched message */
lastMessageId: string;
}
export interface CosnumeStreamParams extends StreamParamsBase {
/** Name of consumer group */
group: string;
/** Name of consumer, must be unique within group */
consumer: string;
}
// --snip--
The resulting stream message - RedisStreamMessage
stays the same as we had for reading the stream.Similarly, as we did for XREAD
, we have extracted a response type RedsXReadGroupResponse
// redis-client.type.ts
export type RedsXReadGroupResponse = Awaited<
ReturnType<RedisClient['xReadGroup']>
The code should be straight forward:
// redis.service.ts
// --snip--
let response: RedsXReadGroupResponse = null;
try {
response = await this.redis.xReadGroup(
commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
group,
consumer,
{
key: streamName,
id: '>',
},
{ BLOCK: blockMs, COUNT: count },
);
}
// --snip--
We call xReadGroup
method of node-redis
client, and wrap it in try-catch to handle any error we might get from RedisClient
.
Same as with xRead, when using blocking commands, we want to utilize connection pool and execute these commands in isolation via commandOptions({ isolated: true })
Then we pass the name of the consumer group
this consumer belongs to and is sharing messages with.
Next, we need to identify the consumer
- entity reading this message.
Next, we specify from which stream we want to fetch a stream by key
parameter and starting from which message ID. >
is a special symbol meaning we want to fetch only messages that were never delivered to any other consumer. If we used 0 or any other valid ID, we would risk also getting messages that have been given to other users but have yet to be acknowledged.
Here, we are specifying only 1 stream. However, we could read from multiple streams by adding an array of objects containing key and id values.
Finally, we set read options. BLOCK
for max blocking time in ms and COUNT
for the max count of messages. These work exactly the same as with XREAD
option.
Next, we have some error handling to do:
// redis.service.ts
// --snip--
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
if (error.message.includes('NOGROUP')) {
console.log(`${error.message} ...CREATING GROUP`);
await this.createConsumerGroup(streamName, group);
return null;
}
console.error(
`Failed to xReadGroup from Redis stream: ${error.message}`,
error,
);
return null;
}
// --snip--
As with other Redis calls, we check for a closed connection - ClientClosedError
and try to reconnect in this case. Then We check for an error with a message including NOGROUP
The full error might look similar to this - NOGROUP No such key 'example-stream' or consumer group 'example-group' in XREADGROUP with GROUP option
, but if we see NOGROUP
, we know that this stream does not have the group we want our consumers to be part of.
So we need to create this group.: await this.createConsumerGroup(streamName, group);
// redis.service.ts
private async createConsumerGroup(streamName: string, group: string) {
try {
await this.redis.xGroupCreate(
streamName,
group,
'0', // use 0 to create group from the beginning of the stream, use '$' to create group from the end of the stream
{
MKSTREAM: true,
},
);
} catch (error) {
if (error.message.includes('BUSYGROUP')) {
// Consumer group already exists
return;
}
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
console.error(`Failed to xGroupCreate: ${error.message}`);
return null;
}
}
We are calling XGROUP CREATE
command providing streamName
, the group
name we want to create. We can also choose the start position where we want to start consuming messages. Here it's set to 0
, which means the latest message in the stream. It could be any valid ID or a special character $
- this would start consuming messages added only after group creation.
We also set MKSTREAM
option to true, so in case we don't have this stream yet for some reason, e.g., no messages have been added, Redis gets restarted/switched, or in case of bad memory management, the stream has evicted, we will create a new stream with this name (as well as add a consumer group).
We added error handling for BUSYGROUP
- meaning that this group (and stream) exists after all ( e.g., in case it has been just created by another group consumer) The rest of the error handling stays the same as for other Redis calls.
Finally, jumping back to readConsumerGroup
, after we have fetched messages and done our error handling, we extract the first stream's messages (since we added only one key
and id
object) and return them.
// redis.service.ts
// --snip--
const messages = response?.[0]?.messages; // returning first stream (since only 1 stream used)
return messages || null;
// --snip--
Consumer groups is a great way to distribute data on stream to different consumers only once per group. But how to guarantee that a message sent out by the Redis server was actually received and processed by the consumer it was sent to? The answer is "acknowledgment".
When a message is sent to a consumer, as a side effect, Redis puts the message ID and that consumer in the Pending Entries List (PEL) of that stream consumer group.
The consumer must send an acknowledgment (also known as an "ack") to the Redis server to signal that it has successfully processed the message. This acknowledgment is sent using the XACK
command, which takes the name of the consumer group, the stream's name, and the IDs of the messages that were processed.
Once the Redis server receives an acknowledgment from a consumer, it updates its internal tracking data to mark the message as processed by that consumer. The server also removes the message from the PEL for that consumer.
If a consumer fails to send an acknowledgment within a configurable period, the message is considered unacknowledged. The Redis server will then deliver the message again to another consumer in the same consumer group. This ensures that messages are not lost if a consumer fails or crashes before it can process a message.
Let's handle acknowledgment using the xAck
method:
// redis.service.ts
public async acknowledgeMessages({
streamName,
group,
messageIds,
}: AcknowledgeMessageParams) {
try {
await this.redis.xAck(streamName, group, messageIds);
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
console.error(`Failed to xAck from Redis stream: ${error.message}`);
return null;
}
}
For this, we have created yet another interface - AcknowledgeMessageParams
// interfaces.ts
export interface AcknowledgeMessageParams {
/** Name of stream to acknowledge message in */
streamName: string;
/** Name of consumer group */
group: string;
/** ID of messages to acknowledge */
messageIds: string[];
}
XACK
takes single or multiple message IDs, so we can batch multiple acks in one call.
The rest is straightforward - we send the command and handle errors as we do for other Redis methods.
XAUTOCLAIM
is a command in Redis that automates the process of claiming unacknowledged messages from a consumer group. It is similar to the XPENDING
and XCLAIM
commands, but it simplifies the process by automatically allowing Redis to claim unacknowledged messages on behalf of a consumer.
A consumer sends a XAUTOCLAIM
command with the consumer group's name, the consumer's name, and the name of the stream to consume from. Redis returns unacknowledged messages for the consumer to claim, which can be processed and acknowledged like a message received from XREADGROUP
. Another consumer can claim unacknowledged messages if the acknowledgment window expires. On subsequent XAUTOCLAIM
commands, Redis only returns unacknowledged messages that have not been previously returned to the consumer and have yet reached the maximum delivery attempts.
xAutoClaim
Here the code is almost the same as we had for xReadGroup
:
// redis.service.ts
public async autoClaimMessage({
streamName,
group,
consumer,
minIdleTimeMs,
count,
}: AutoclaimMessageParams) {
let response: RedsXAutoClaimResponse = null;
try {
response = await this.redis.xAutoClaim(
streamName,
group,
consumer,
minIdleTimeMs,
'0-0', // use 0-0 to claim all messages. In case of multiple consumers, this will be used to claim messages from other consumers
{
COUNT: count,
},
);
} catch (error) {
if (error instanceof ClientClosedError) {
console.log(`${error.message} ...RECONNECTING`);
await this.connectToRedis();
return null;
}
console.error(`Failed to xAutoClaim from Redis stream: ${error.message}`);
return null;
}
return response?.messages || null;
}
We have created a params interface AutoclaimMessageParams
// interfaces.ts
export interface AutoclaimMessageParams {
streamName: string;
group: string;
consumer: string;
minIdleTimeMs: number;
count: number;
}
Here the distinct params is minIdleTimeMs
- this is a minimum idle time in ms for message to be eligible for auto-claim.
We have the functionality to consume, ack, and even re-claim messages. Let's add all of that together and abstract all of that logic inside a generator so that the StreamHandlerService
clients don't need to worry about all of these mechanisms.
// stream-handler.service
public async *getConsumerMessageGenerator({
streamName,
group,
consumer,
count,
autoClaimMinIdleTimeMs,
autoAck = true,
}: ReadConsumerGroupParams): AsyncRedisStreamGenerator {
let fetchNewMessages = true; // Toggle for switching between fetching new messages and auto claiming messages
while (this.isAlive) {
let response: RedisStreamMessage[];
if (fetchNewMessages) {
response = await this.redisService.readConsumerGroup({
streamName,
group,
consumer,
blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
count,
});
} else {
// Try to auto claim messages that are idle for a certain amount of time
response = await this.redisService.autoClaimMessage({
streamName,
group,
consumer,
count,
minIdleTimeMs:
autoClaimMinIdleTimeMs || StreamHandlerService.DEFAULT_IDLE_TIME_MS,
});
}
// Acknowledge messages if autoAck is enabled
if (autoAck && response?.length > 0) {
await this.redisService.acknowledgeMessages({
streamName,
group,
messageIds: response.map((m) => m.id),
});
}
// Toggle between fetching new messages and auto claiming messages
fetchNewMessages = !fetchNewMessages;
// If no messages returned, continue to next iteration without yielding
if (!response || response.length === 0) {
continue;
}
for (const message of response) {
yield message;
}
}
}
Here's the new params interface ReadConsumerGroupParams
// interfaces.ts
export interface ReadConsumerGroupParams {
streamName: string;
group: string;
consumer: string;
count: number;
autoClaimMinIdleTimeMs?: number;
autoAck?: boolean;
}
We will expect streamName
, consumer group
name, and a unique consumer
name. A count
of how many messages to try to fetch in one call, as well as
autoClaimMinIdleTimeMs
- minimum idle time in ms for the message to be eligible for auto-claim, re-claim from another consumer, if not acknowledged.
And finally, autoAck
- Should messages be acknowledged automatically after being read. If set to false, the client must manually acknowledge messages using acknowledgeMessage
We will try to alternate between fetching new messages and auto-claiming forgotten ones. For this, a variable fetchNewMessages
is introduced.
// --snip--
let fetchNewMessages = true;
while (this.isAlive) {
// --snip--
As for our fan-out generator, we will use a while loop that will become invalid when the module is destroyed. Then we alternate between new messages and auto-claim and assigning the result to a response
variable.
// --snip--
let response: RedisStreamMessage[];
if (fetchNewMessages) {
response = await this.redisService.readConsumerGroup({
streamName,
group,
consumer,
blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
count,
});
} else {
// Try to auto claim messages that are idle for a certain amount of time
response = await this.redisService.autoClaimMessage({
streamName,
group,
consumer,
count,
minIdleTimeMs:
autoClaimMinIdleTimeMs || StreamHandlerService.DEFAULT_IDLE_TIME_MS,
});
}
// --snip--
Next, we'll handle the auto-ack functionality. This will allow us to remember to ack message after processing.
In production applications, it might or might not make sense. For example, if you need to ensure that a message has been received and successfully processed, you will need to do ack manually after processing.
// --snip--
if (autoAck && response?.length > 0) {
await this.redisService.acknowledgeMessages({
streamName,
group,
messageIds: response.map((m) => m.id),
});
// --snip--
Then we want to toggle the "fetching mode"
// --snip--
fetchNewMessages = !fetchNewMessages;
// --snip--
And finally, only if there are any messages, we want to yield
them:
// --snip--
if (!response || response.length === 0) {
continue;
}
for (const message of response) {
yield message;
}
}
The hard part is over. Now we can happily read the Redis stream via consumer groups.
Let's do that from our AppService
:
// app.service.ts
public async consumeMessageFromGroup(
group: string,
consumer: string,
count: number,
) {
const generator = this.streamService.getConsumerMessageGenerator({
streamName: EXAMPLE_STREAM_NAME,
group,
consumer,
count,
});
const messages: Record<string, string>[] = [];
let counter = 0;
for await (const messageObj of generator) {
messages.push(this.parseMessage(messageObj.message));
counter++;
if (counter >= count) {
break;
}
}
return {
group,
consumer,
messages,
};
}
The code is almost the same as for multiple new messages reads in a fan-out mode that we create in part 2. The only addition is group
and consumer
.
Let's also add a simple endpoint from which we can consume messages and specify the group, consumer name, and count:
// app.controller
@Get('consume/:group/:consumer/:count')
consumeMessages(
@Param('group') group: string,
@Param('consumer') consumer: string,
@Param('count') count: number,
) {
return this.appService.consumeMessageFromGroup(group, consumer, count);
}
In this case, we won't create multiple instances of our service, but it would work the same. Here is an example:
If we open our service with http://localhost:8081/consume/example-group/example-consumer/3
, we get messages from the stream beginning for a group named example-group
and for a consumer named example-consumer
.
If we try a different consumer, e.g., example-consumer-2
we will get new messages that are not pending and have not been ack by any of the consumers yet.
However, if we try to fetch messages from a different group, they will be fetched from the very beginning, no matter if another group or even the same consumer in another group has acknowledged them already.
Here is an example with example-consumer
consuming messages from the same stream, but on example-group-2
Notice that these messages are the same as first fetched by example-group
. That is because they were consumed and acknowledged by that group only. Different groups do not share the PEL.
Over this 3-part article series, we have successfully built a NestJS application that connects to a Redis server and utilizes the Redis stream data structure.
In part 1, we set up the Redis server, installed the Redis client library, and established a connection between the NestJS application and Redis.
In part 2, we explored the basics of Redis streams and learned how to use the Redis client to add messages to the stream. We also implemented fan-out mode, which allows multiple clients to consume messages from a single stream.
In part 3, we took our implementation to the next level by using Redis consumer groups. We learned how to create a consumer group and use it to consume messages in a more scalable and reliable way.
We also implemented some error handling to ensure our application can recover from errors and continue processing messages without interruption.
Overall, this article series has provided a comprehensive guide to building a NestJS application that utilizes Redis streams and consumer groups. By following along and experimenting with the code samples, you should now understand how to implement real-time systems that can easily handle large amounts of data, thanks to the power and flexibility of Redis and NestJS.