Jonathan Gros-Dubois

@jonathangrosdubois

Using async generators as custom data streams

One of the most recent additions to ECMAScript/JavaScript are async generators; these build on top of regular generators to add async functionality. In case you’re not already familiar with them, generators are special functions which can ‘return’ multiple times using a yield statement like this:

// Generator functions are declared with an asterisk (function*)
function* makeRangeGenerator(start, end) {
let n = 0;
for (let i = start; i < end; i++) {
n += 1;
yield i;
}
return n;
}
let rangeGenerator = makeRangeGenerator(0, 3);
console.log(rangeGenerator.next()); // { value: 0, done: false }
console.log(rangeGenerator.next()); // { value: 1, done: false }
console.log(rangeGenerator.next()); // { value: 2, done: false }
console.log(rangeGenerator.next()); // { value: 3, done: true }

Generator functions are different from regular functions in at least two fundamental ways:

  • A generator function can yield (~return) values multiple times without exiting and losing its internal state.
  • A caller can push values back into the generator function at the point where it last yielded — the generator will resume execution from that point until it encounters the next yield.

Note that you can iterate over the output of a generator using a for-of loop like this:

for (let integer of rangeGenerator) {
console.log(integer);
}

^ This is slightly cleaner than using rangeGenerator.next() in a regular for loop; in this case our integer variable will be the raw value instead of the object {value: ..., done: false}.

Sometimes, we may want to pass values into a generator like this:

function* pushPullGenerator() {
let n = 0;
while (true) {
n = yield n * 2;
}
}
let gen = pushPullGenerator();
console.log(gen.next());  // { value: 0, done: false }
console.log(gen.next(2)); // { value: 4, done: false }
console.log(gen.next(3)); // { value: 6, done: false }
console.log(gen.next(4)); // { value: 8, done: false }

^ Here we’re passing values to the generator using gen.next(...) calls. Note that the first invocation of gen.next() doesn’t have any arguments; this is because the first yield in the generator is different from the others; it behaves more like a plain return statement. Subsequent calls to gen.next(value) will both pull the specified value into the generator function (in place of the last yield) and then push a new value out of it (on the following yield).

Coming up with use cases for generators might not be obvious at first. If overused, generators could make code more difficult to follow. However, if used correctly, generators can offer some unique capabilities. In this guide, we will look at a specific kind of generator; the async generator.

Async generators (as in async/await) are like regular generators except that instead of yielding normal values, they yield Promise objects which resolve asynchronously. This allows us to do some useful things; in particular, async generators are ideal for representing asynchronous streams of data.

If you’ve used RxJS Observables before, you should already be familiar with the idea of consuming asynchronous streams of data in a declarative (reactive) way. Async generators can be used as an alternative to Observables; one of the main benefits of using async generators over Observables is that they will make your code look more like regular synchronous JavaScript; this should make it more succinct and readable.

Here is an example of an async generator which streams some integers:

function waitForDelay(delay) {
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, delay);
});
}
async function* createRandomStream(randomness) {
let n = 0;
while (true) {
let randomDelay = Math.round(Math.random() * randomness);
await waitForDelay(randomDelay);
yield n++;
}
}
async function startConsumingStream() {
let randomStream = createRandomStream(1000);
for await (let value of randomStream) {
console.log(value);
}
}
startConsumingStream();

^ The randomStream in this case is a stream of integers which increments 0, 1, 2, 3, 4, … — it’s only random in the sense that the delay period between each iteration of the generator will be a random value between 0 and 1000 milliseconds; the result is that you can see the logging of numbers accelerate and decelerate randomly. Note that, in this case, we’ve used a for-await-of loop to iterate over the async generator; this is the async equivalent of the for-of loop which we used earlier to iterate over a normal generator. As a challenge; you could try to modify the code above to use a trigonometric function like Math.sin(...) to generate the delay value; it should only require a one-line change.

Random streams are fun, but what about something more useful? For example; maybe we want each iteration of our generator to correspond to an event; like receiving a message from a user.

We can do something like this:

// If the receiveMessage function is called before we
// started consuming the data stream, then we don't care about that
// data. In this case the function will be a no-op.
let receiveMessage = function () {};
function waitForNextMessage() {
return new Promise((resolve) => {
receiveMessage = resolve;
});
}
async function* createMessageStream() {
while (true) {
yield waitForNextMessage();
}
}
async function startConsumingMessageStream() {
let messageStream = createMessageStream();
for await (let message of messageStream) {
console.log(message);
}
}
startConsumingMessageStream();
setTimeout(() => {
receiveMessage('Hello');
}, 500);
setTimeout(() => {
receiveMessage('world');
}, 1000);
setTimeout(() => {
receiveMessage('!!!');
}, 3000);

^ The trick here is to assign the resolve function from the Promise in the waitForNextMessage function to our own receiveMessage function; that way it can be invoked anytime from anywhere else in our code — the message in this case is triggered by a setTimeout but it could also have come from any other source such as an event from an HTTP request or a WebSocket message. Note that because we’re using async/await, it doesn’t cost any CPU to wait indefinitely.

A small problem…

There is a potential problem with the approach above; what would happen if you tried calling receiveMessage(message) synchronously multiple times? Like this:

setTimeout(() => {
// Call receiveMessage multiple times synchronously.
receiveMessage('Hello');
receiveMessage('A');
receiveMessage('B');
receiveMessage('C');
}, 500);
setTimeout(() => {
receiveMessage('world');
}, 1000);
setTimeout(() => {
receiveMessage('!!!');
}, 3000);

^ In this case, we would miss the messages ‘A’, ‘B’ and ‘C’ — this is because the Promise returned by our waitForNextMessage function will resolve asynchronously on the next tick. So if we call receiveMessage multiple times while in the same call stack, it’s going to end up trying to resolve the same Promise multiple times (instead of different promises for each message). There are several ways to get around this issue. One approach is to modify receiveMessage so that it buffers messages synchronously and resolves the promise with the buffer; that way we can keep pushing more messages into the buffer synchronously before the Promise finishes resolving.

The solution might look like this:

let receiveMessage = function () {};
function waitForNextMessageList() {
let messageBuffer = [];
return new Promise((resolve) => {
receiveMessage = (message) => {
messageBuffer.push(message);
// Only resolve the promise for the first message in the
// current call stack. After that we can keep pushing messages
// to the buffer.
if (messageBuffer.length === 1) {
resolve(messageBuffer);
}
};
});
}
async function* createMessageListStream() {
while (true) {
yield waitForNextMessageList();
}
}
async function* createMessageStream() {
let messageListStream = createMessageListStream();
for await (let messageList of messageListStream) {
for (let message of messageList) {
yield message;
}
}
}
async function startConsumingMessageStream() {
let messageStream = createMessageStream();
for await (let messages of messageStream) {
console.log(messages);
}
}
startConsumingMessageStream();
setTimeout(() => {
receiveMessage('Hello');
receiveMessage('A');
receiveMessage('B');
receiveMessage('C');
}, 500);
setTimeout(() => {
receiveMessage('world');
}, 1000);
setTimeout(() => {
receiveMessage('!!!');
}, 3000);

This code became slightly more complex after we added the buffering, but if you think about it from the stream consumer’s point of view, it still looks pretty simple:

async function startConsumingMessageStream() {
let messageStream = createMessageStream();
for await (let message of messageStream) {
console.log(message);
}
}
startConsumingMessageStream();

A usable solution

The code above may be useful in some situations where a stream only has a single consumer. If we call startConsumingMessageStream() multiple times, the consumers will compete for each message and only one of them will work. This is because our stream currently behaves like a FIFO queue so each new item pushed into it can only have one consumer.

For practical scenarios, we may want to allow streams to be shared between multiple consumers. Also, we should expose this functionality through a simple Iterable object.

After trying out several different approaches, I settled with a singly-linked-list-based solution. Check out WritableAsyncIterableStream: https://github.com/SocketCluster/writable-async-iterable-stream

If you would like to use async iterables as alternatives to EventEmitter objects, you may want to also check out StreamDemux: https://github.com/socketcluster/stream-demux

More by Jonathan Gros-Dubois

Topics of interest

More Related Stories