paint-brush
Using Redis Streams with NestJS: Part 2 - Reading from Streamby@magickriss
3,247 reads
3,247 reads

Using Redis Streams with NestJS: Part 2 - Reading from Stream

by Krisjanis KallingsFebruary 22nd, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

This is part 2 of a 3-part series on how to use Redis streams with NestJS. By the end of this series, you will have the knowledge and tools necessary to create your own NestJS app that utilizes Redis stream to handle real-time data. The series is structured in 3 parts: Setting up the NestJS application and connecting to Redis, Populating Redis. streams and reading from in **fan-out mode**
featured image - Using Redis Streams with NestJS: Part 2 - Reading from Stream
Krisjanis Kallings HackerNoon profile picture


This is part 2 of a 3-part series, where we will explore how to use Redis streams with NestJS.

It is structured in 3 parts:


  1. Setting up the NestJS application and connecting to Redis

  2. Populating Redis streams and reading from in fan-out mode

  3. Using consumer groups to handle one stream from multiple actors in a way that one message is sent to and processed only by a single actor (consumer)


By the end of this series, you will have the knowledge and tools necessary to create your own NestJS app that utilizes Redis streams to handle real-time data.

Full code is available on the github

What we have

In part 1, we have created a working NestJS application that can connect to the Redis server. We have also created an endpoint /ping-redis for our application that will call a ping command on the Redis server and return its response.


For our development convenience, we run our application and Redis server in a docker. We are using Docker Compose to set up our docker-copose.yml file for simplicity.

Now we are ready to focus on the main topic - using Redis streams.

What are Redis streams?

Before we start with our implementation, it's essential to understand what Redis streams are and clear some misconceptions that might have arisen about what Redis streams are not.

Data structure

At its core, Redis streams are a data structure that holds data - messages, in order by the message IDs. By default, this ID is a timestamp on when this data was added to the stream (followed by sequence number, in case multiple entries happened at the same timestamp).

Not a list

This all sounds very similar to a list. However, the difference here is that while a list can be popped, streams can't. This is because reading the stream does not mutate it. So when calling LPOP or RPOP on a list, the element returned is removed. In streams, XREAD leaves the item in the stream and can be read multiple times.

Not a socket - there is no "subscribe"

Just by the name "stream," it's easy to think that it's a continuous "stream" of data - something that you would connect to and send you new data once it's appended.


It's a stream in the sense that the data represents some direction - older data with smaller IDs and newer data with larger IDs. But to access it, we will need to read from it manually, just like with other more superficial data structures, like sets or lists. Once a request has been executed and data is sent from the Redis to the application, the command has been completed, and no new data will be received unless it is called again.


So it would also be wrong to think about streams in the form of Pub-Sub. With Pub-sub, once you subscribe to channels, the connection is blocked, and all new messages published will be delivered. No need to re-subscribe.

Basic functionality

We mostly add to stream (XADD) and read from it (XREAD, XRANGE XREADGROUP). There is also functionality for deletion (XDEL), but in most cases, we think of the stream as append-only.

Managing memory

To avoid using up all of the space and evicting data randomly, the length of streams is managed by trimming them. It can be done manually via XTRIM, or we can move the responsibility to the Redis by using capped streams - adding MAXLEN option when using XADD.

Adding to streams

Finally, let's get back into implementing streams in our NestJS application. Let's create a method to add a message to the stream:


// redis.service.ts

// --snip--
  public async addToStream({
    fieldsToStore,
    streamName,
  }: AddToStreamParams): Promise<string> {
    // Converting object to record to store in redis
    const messageObject = Object.entries(fieldsToStore).reduce(
      (acc, [key, value]) => {
        if (typeof value === 'undefined') {
          return acc;
        }
        acc[key] = typeof value === 'string' ? value : JSON.stringify(value);

        return acc;
      },
      {} as Record<string, string>,
    );

    // Adding to stream with trimming - approximately max 100 messages
    return this.redis.xAdd(streamName, '*', messageObject, {
      TRIM: {
        strategy: 'MAXLEN',
        strategyModifier: '~',
        threshold: 100,
      },
    });
  }


As you can see, we have defined the interface for the parameters of this method:

// interfaces.ts

export interface AddToStreamParams {
  fieldsToStore: Record<string, any>;
  streamName: string;
}


We will take in the name of the stream we want to add data to and specify data as POJO named fieldsToStore


Next, we have a transformation code that transforms our object of any depth into key-value pairs of string type. Later, when reading data, we will parse this back to our original POJO:

// redis.service.ts

// --snip--
    const messageObject = Object.entries(fieldsToStore).reduce(
      (acc, [key, value]) => {
        if (typeof value === 'undefined') {
          return acc;
        }
        acc[key] = typeof value === 'string' ? value : JSON.stringify(value);

        return acc;
      },
      {} as Record<string, string>,
    );
// --snip--


Then we execute a call to Redis service with the following parameters:

// redis.service.ts

// --snip--
return this.redis.xAdd(streamName, '*', messageObject, {
      TRIM: {
        strategy: 'MAXLEN',
        strategyModifier: '~',
        threshold: 100,
      },
    });
// --snip--


  • streamName - the name of our stream that we want to append to

  • * - asterisk specifies Redis to auto-generate a unique ID (based on timestamp). Here we could set our own ID if we want.

  • messageObject - key-value pairs that contain the data we want to store


  • Options:

    • TRIM - as mentioned before, we don't want to handle trimming manually, so we specify that we wish Redis to trim the stream by itself. This includes additional options:
      • strategy MAXLEN - here, we specify that we want to trim when the stream has reached the maximum length of a threshold. Other option would be MINID, where the threshold would specify the smallest ID to keep. The rest are marked for eviction.

      • strategyModifier ~ - this tells Redis that we don't want to be strict in size, and Redis can go above the threshold for a little while, but the trim will eventually happen. This optimizes Redis to execute eviction at the most convenient time, not immediately.

      • threshold 100 - that is the max length of the stream we want. In the case of MINID, it would be the smallest ID to keep. And since we have strategy modifier ~, this is an approximate value.


So we would read it like this: Trim stream if the length (MAXLEN) of the stream has reached approximately (~) 100.

Stream handler

Since we will do more stuff with our stream data and we want to keep the basic functionality of Redis server calls on RedisService, we will create a new service called StreamHandlerService for additional handling of the streams.


For now, we will just add "pass-through" method addToStream that will call RedisService addToStream method.


To generate it, we'll call: nest g service redis/stream-handler

// stream-handler.service.ts

import { Injectable } from '@nestjs/common';
import { RedisService } from './redis.service';

@Injectable()
export class StreamHandlerService {

constructor(private readonly redisService: RedisService) {}

  public addToStream(fieldsToStore: Record<string, any>, streamName: string) {
    return this.redisService.addToStream({ fieldsToStore, streamName });
  }
}


And since we don't want to expose our RedisService to other modules and will want them to interact with StreamHandlerService, we will move the ping call functionality to it and remove export from the module.

// stream-handler.service.ts

// --snip--
  public ping() {
    return this.redisService.ping();
  }
// --snip--


To use it in other modules, we need to export it:

// redis.module.ts

import { Module } from '@nestjs/common';
import { redisClientFactory } from './redis-client.factory';
import { RedisService } from './redis.service';
import { StreamHandlerService } from './stream-handler.service';

@Module({
  providers: [redisClientFactory, RedisService, StreamHandlerService],
  exports: [StreamHandlerService], // Removed RedisService from exports
})
export class RedisModule {}


And in AppService, we are going to call ping from StreamHandlerService instead:

// app.service.ts

// --snip--
  constructor(private readonly streamService: StreamHandlerService) {} // changed from RedisService to StreamHandlerService

  redisPing() {
    return this.streamService.ping(); // changed here as well
  }
// --snip--



In theory, other modules should not care whether it is Redis, RabbitMQ, Kafka, or even something else we are calling. . It should only know that it is a stream, and we get the specific data by calling methods made public on exported services. This way, we can easily change our implementation of streams without needing to change other parts of code - they become de-coupled. Since this is a project about Redis and Redis streams, we have called this module RedisModule, but expanding on the previous taught, IRL it would make more sense to call it StreamModule or MessagingModule, or something else, depending on what these streams are going to be used for.

Populating Redis with messages

Let's use our addToStream method to populate our Redis. We could go the same route as with ping and expose it to our API endpoint, but this time, since we want to "simulate" data flow, we'll call this method in an interval so that we have an ongoing flow of data in our stream.

Let's add it to our AppService:

// app.service.ts

// --snip--

@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
  private interval: NodeJS.Timeout = null;

// --snip--

  private populateStream() {
    this.interval = setInterval(() => {
      this.streamService.addToStream(
        {
          hello: 'world',
          date: new Date(),
          nestedObj: { num: Date.now() % 100 },
        },
        EXAMPLE_STREAM_NAME,
      );
    }, 1000);
  }

  async onModuleInit() {
    this.populateStream();
  }

  onModuleDestroy() {
    clearInterval(this.interval);
  }
}


As you can see, we have created a populateStream method that will add some dummy data into our stream.

EXAMPLE_STREAM_NAME is just a constant of the stream name we have defined in constants.ts file and imported it here:


// constants.ts
export const EXAMPLE_STREAM_NAME = 'example-stream';


We created an interval that we call on onModuleInit lifestyle event, and we clear it in our onModuleDestroy lifecycle event so that we don't leave dangling references. This will add a message to our Redis stream on every 1000 ms.

To verify that, let's now create a way to read from stream.

Reading from stream

To read from Redis streams, we can use either XREAD or XRANGE (and REVXRANGE) commands. Since the goal of this project is to read new messages, both commands could be used to achieve this goal. We are going to use XREAD in this article.

Let's add a method that wraps XREAD command call and does some extra error handling for us:

// redis.service.ts

// --snip--

  public async readStream({
    streamName,
    blockMs,
    count,
    lastMessageId,
  }: ReadStreamParams): Promise<RedisStreamMessage[] | null> {
    try {
      const response = await this.redis.xRead(
        commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
        [
          {
            key: streamName,
            id: lastMessageId,
          },
        ],
        { BLOCK: blockMs, COUNT: count },
      );

      const { messages } = response?.[0]; // returning first stream (since only 1 stream used)

      return messages || null;
    } catch (error) {
      if (error instanceof ClientClosedError) {
        console.log(`${error.message} ...RECONNECTING`);
        await this.connectToRedis();
        return null;
      }
      console.error(
        `Failed to xRead from Redis Stream: ${error.message}`,
        error,
      );
      return null;
    }
  }


Let's break down this code:


First, notice that we have added a parameters interface ReadStreamParams:

// interfaces.ts

export interface ReadStreamParams {
  streamName: string;
  blockMs: number;
  lastMessageId: string;
}
// --snip--


as well as we have defined our response message RedisStreamMessage. Since node-redis is scarce with its type exports, we need to extract it on our own:



// redis-client.types

// --snip--

export type RedisStreamMessage = Awaited<
  ReturnType<RedisClient['xRead']>
>[number]['messages'][number];

// --snip--


In the try block, we have our command call:


// --snip--
      const response = await this.redis.xRead(
        commandOptions({ isolated: true }), // uses new connection from pool not to block other redis calls
        [
          {
            key: streamName,
            id: lastMessageId,
          },
        ],
        { BLOCK: blockMs, COUNT: count },
      );

      const { messages } = response?.[0]; // returning first stream (since only 1 stream used)

      return messages || null;
    } catch (error) {
// --snip--


We first use the call command option to run it in isolation commandOptions({ isolated: true }). This is needed because we are going to BLOCK the connection. This means that once this command call takes a connection to the Redis server if the results won't be on the Redis at the time the call arrives, it will wait for the specified amount of time (BLOCK parameter) and, if during that period, data arrives that matches the request, it will be returned immediately, or it will wait till the end of the specified time and return nothing.

During this time, the connection is used-up, and no other commands can be called. Essentially, our command BLOCKS the connection.


If we would like to create multiple calls, we would quickly run into delays and performance problems. For this, we can use isolated execution. This means that node-redis will create multiple connections using Generic resource pool under the hood, so we'll have multiple connections to use in parallel.


Next, we say which streams we want to read and starting from what ID. We need to pass it in an array, since we could call multiple streams in a single call. We'll handling of multiple streams in one request out of the scope of this article:


        [
          {
            key: streamName,
            id: lastMessageId,
          },
        ],


Next, we pass optional parameters:

        { BLOCK: blockMs, COUNT: count },


  • BLOCK - is blocking time in milliseconds. **If 0 is specified, it will block indefinitely until at least 1 message can be returned.

  • COUNT - this is a maximum count we will get returned. If there are fewer messages available on Redis server, it will not wait for the count to match but return that many messages how many there are.


Next, we unravel the data:

      const { messages } = response?.[0];

      return messages || null;


Since we called only 1 stream, the response array will always be of length 1. All messages are returned in an messages object. And finally, we return either the stream messages array or null.


Here is also the place we can handle some initial errors:

// --- snip---
    } catch (error) {
      if (error instanceof ClientClosedError) {
        console.log(`${error.message} ...RECONNECTING`);
        await this.connectToRedis();
        return null;
      }
      console.error(
        `Failed to xRead from Redis Stream: ${error.message}`,
        error,
      );
      return null;
    }


In case of an error, we will return null value. If we have ClientClosedError, there is no use trying to fetch anything further, so we try to reconnect to the Redis.


// --- snip---
  private async connectToRedis() {
    try {
      // Try to reconnect only if the connection socket is closed. Else let it be handled by reconnect strategy.
      if (!this.redis.isOpen) {
        await this.redis.connect();
      }
    } catch (error) {
      console.error(
        `[${error.name}] ${error.message}`,
        error,
      );
    }
  }


We check for isOpen to see if the socket is open. If it is, we will trust reconnect strategy to do its job and reconnect to the client. If it's not, then we are trying to do it ourselves. And that's how we can fetch a single message from a stream. Let's expand on this and add the ability to read multiple messages.

Exposing read to our API

Exposing the Redis stream to our API is similar to what we did with ping. Let's add another method to our StreamHandlerService:


//stream-handler.service.ts

// --- snip --
  public readFromStream(streamName, count) {
    return this.redisService.readStream({
      streamName,
      blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
      count, // max how many messages to fetch at a time
      lastMessageId: '$',
    });
  }


Simple method where we pass streamName of the stream we want to read and count of messages, we would like to receive. As mentioned before, COUNT actually is "max count" since, if there are less or no messages on stream, Redis will return from 1 to count number of messages.

We are using special ID $ - this represents "anything that comes after the XREAD request.

Now let's handle reading a single message:


// app.service.ts

// --snip--
  public getSingleNewMessage() {
    return this.streamService.readFromStream(EXAMPLE_STREAM_NAME, 1);
  }
// --snip--


Again, using EXAMPLE_STREAM_NAME - same stream that we populate, and setting count to 1.

And finally, let's add a new endpoint to our controller:

// app.controller.ts

// --snip--
  @Get('message')
  getMessage() {
    return this.appService.getSingleNewMessage();
  }
// --snip--


Now calling the endpoint we should see single message generated by our populateStream method:




Handling results as a stream

Getting a single message is relatively straightforward.

But how do we get multiple messages?

What if we want to get a specific number of messages, e.g., precisely 3?


At first, it might seem simple - just set a count to the number of messages you want. But remember - COUNT does not guarantee that we will get the exact number of messages. It's more of a "max count". So you might be tempted to call XREAD multiple times. But, since we are passing $ as our last message ID, we might miss some messages that come in between our calls.

That means that we need to remember our last id.

And what if we want to read from a stream continuously and indefinitely?

This all adds additional complexity that we need to handle.

Generators

Generators are a valuable tool to have in our toolbox. Here they come in handy when we want to fetch the data from some external resource (Redis) lazily.


I will not focus on the "what and how" of generators. If you haven't used them before, I encourage you to check the JavaScript Generators reference and Dr. Axel Rauschmayer's book "JavaScript for impatient programmers" on asynchronous generators.


The main feature we can do is create an infinite loop that we can stop and restart whenever we want and need.


Let's add a new method to our StreamHandlerService:

// stream-handler.service.ts

export class StreamHandlerService implements OnModuleDestroy {

  private isAlive = true;

  onModuleDestroy() {
    this.isAlive = false;
  }

// --snip--
  public async *getStreamMessageGenerator(
    streamName: string,
    count: number,
  ): AsyncRedisStreamGenerator {
    // Start with latest data
    let lastMessageId = '$';
    while (this.isAlive) {
      const response = await this.redisService.readStream({
        streamName,
        blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
        count, // max how many messages to fetch at a time
        lastMessageId,
      });

      // If no messages returned, continue to next iteration without yielding
      if (!response || response.length === 0) {
        continue;
      }
      // Update last message id to be the last message returned from redis
      lastMessageId = response[response.length - 1].id;
      for (const message of response) {
        yield message;
      }
    }
  }
// --snip--


Let's break down what we have written:


First, you will notice that we have added a new boolean isAlive that we will initially set to true, and only during the destruction of the module we will set it to false. This will serve as a while-true loop that we can exit from when the application ends.


Also, for our convenience, we have created a new type AsyncRedisStreamGenerator


// redis-client.type.ts

// --snip--
export type AsyncRedisStreamGenerator = AsyncGenerator<
  RedisStreamMessage,
  void,
  unknown
>;
// --snip--


This is an async generator that will yield or produce RedisStreamMessage values will return nothing (void) when it returns and will accept unknown messages, since we won't pass anything to next method of our generator.


We are going to call readStream method in an infinite loop:

    let lastMessageId = '$';
    while (this.isAlive) {
      const response = await this.redisService.readStream({
        streamName,
        blockMs: 0, // 0 = infinite blocking until at least one message is fetched, or timeout happens
        count, // max how many messages to fetch at a time
        lastMessageId,
      });
// -- snip --
    }


As in our previous example, we will start to read data only when the generator is created and calls the stream by using $ as lastMessageId.

The rest is also the same as in the previous example, except we are calling it in a loop.


Next, we will handle the case where we get an empty response. We will simply go into the next iteration without yielding any results

   // If no messages returned, continue to next iteration without yielding
      if (!response || response.length === 0) {
        continue;
      }


If we have some responses, we are going to handle them:

      // Update last message id to be the last message returned from redis
      lastMessageId = response[response.length - 1].id;
      for (const message of response) {
        yield message;
      }


To avoid gaps in messages and read all messages from the time we started this generator, we will update lastMessageId variable with the last message we received.


Since we have count, we will get an array of sizes ranging from 1 to count with messages. However, this generator produces only a single message at a time. So we will iterate through the array and yield only one message at once.


That is for the generator part. Let's see how we can use it to solve our previous questions:

Reading multiple results

Now let's use our generator to generate multiple results by creating a new method in AppService


// app.service.ts

// --snip--
  public async getMultipleNewMessages(count: number) {
    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      count,
    );
    const messages: Record<string, string>[] = [];
    let counter = 0;
    for await (const messageObj of generator) {
      messages.push(this.parseMessage(messageObj.message));
      counter++;
      if (counter >= count) {
        break;
      }
    }
    return messages;
  }
// --snip--


First, we create a generator:

    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      count,
    );


We are passing it our count for "optimistic" fetch. We will try to get everything in a single Redis call. But if that does not happen, we will call Redis as often as needed.

We create an array to collect our messages:

    const messages: Record<string, string>[] = [];


and define a counter where we will count how many we have received.


One convenient feature of generators is that they are iterable as they implement @@iterator method. So we can call for..of on them. In this case, since the generator is async it's for await .. of

    for await (const messageObj of generator) {
      messages.push(this.parseMessage(messageObj.message));
      counter++;
      if (counter >= count) {
        break;
      }
    }
    return messages;


Once the count is reached, we simply break out of the loop and then return our collected messages.


Since Redis stores all values as strings, we also have created a generic parser method:

//app.service.ts

// --snip--
  private parseMessage(message: Record<string, string>) {
    return Object.entries(message).reduce((acc, [key, value]) => {
      try{
      acc[key] = JSON.parse(value);
      }catch(e){
        acc[key] =value
      }
      return acc;
    }, {});
  }
// --snip--


To check that we indeed are getting specified amount of messages, we'll create a new endpoint /messages where we will fetch 3 messages :

// app.controller.ts

// --snip--
  @Get('messages')
  getMessages() {
    return this.appService.getMultipleNewMessages(3);
  }
// --snip--


Again, when we call our newly created endpoint, we indeed, have 3 messages fetched:




Continuous reading of the stream

The biggest power of a Redis stream comes from using it as a continuous data stream. To do that, we can re-use our generator:


// app.service.ts

// --snip--
  private isAlive = true;
// --snip--
  async onModuleInit() {
// --snip--
    this.continuousReadMessages();
  }
  onModuleDestroy() {
// --snip--

    this.isAlive = false;
  }
// --snip--
  private async continuousReadMessages() {
    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      3,
    );
    for await (const messageObj of generator) {
      console.log(
        `Got message with ID: ${messageObj.id}`,
        JSON.stringify(this.parseMessage(messageObj.message), undefined, 2),
      );
      if (!this.isAlive) {
        break;
      }
    }
  }
// --snip--


Same as with multiple message fetch, we start by creating a generator:

    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      10,
    );


We give it an arbitrary number for count since we want to try to fetch multiple responses in one Redis call, if possible.


And then we just do whatever we need to do with the data in the loop. So here it's simply logging the message.


And breaking the loop once the module is destroyed, we don't have any dangling references and can stop our loop.


    for await (const messageObj of generator) {
      console.log(
        `Got message with ID: ${messageObj.id}`,
        JSON.stringify(this.parseMessage(messageObj.message), undefined, 2),
      );
      if (!this.isAlive) {
        break;
      }
    }


And we call this method once on our module init, the same as we did with populating stream:


  async onModuleInit() {
    this.populateStream();
    this.continuousReadMessages();
  }


And in our console, we should see that we are fetching the messages continuously

...
app      | Got message with ID: 1675377159049-0 {
app      |   "hello": "world",
app      |   "date": "2023-02-02T22:32:39.048Z",
app      |   "nestedObj": {
app      |     "num": 48
app      |   }
app      | }
app      | Got message with ID: 1675377160050-0 {
app      |   "hello": "world",
app      |   "date": "2023-02-02T22:32:40.050Z",
app      |   "nestedObj": {
app      |     "num": 50
app      |   }
app      | }
...

Refactor for a single message

Since we are using the generator to handle stream messages, let's refactor our single message fetch example.

// app.service.ts

// --snip--
  public async getSingleNewMessage() {
    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      1,
    );
    const messageObj = await generator.next();
    if (!messageObj.done && messageObj.value) {
      return this.parseMessage(messageObj.value.message);
    }
  }
// --snip--


Same as for all examples, we are creating a generator on our EXAMPLE_STREAM_NAME stream, this time with count 1.

    const generator = this.streamService.getStreamMessageGenerator(
      EXAMPLE_STREAM_NAME,
      1,
    );


Since we need only a single message, we do not need to use a loop. Instead, we call next method manually:

    const messageObj = await generator.next();


and then handle the response - if the generator has not returned/ended - it's not done and we have a message t return, we parse message and return it to the client:

    if (!messageObj.done && messageObj.value) {
      return this.parseMessage(messageObj.value.message);
    }


Everything else stays the same for the client, and the same result is returned.

This is the end of part 2 of the 3-part series. We built on what we created in part 1.


As a result, you should have an application that can fetch single or multiple responses and continuously fetch new results from the Redis stream.


In part 3, we are going to look at how we can use consumer groups to divide messages to multiple consumers so that each one gets a unique set of messages (not like we have now, where if we connected more clients, all clients would get getting all of the messages).