paint-brush
Building Your Own Custom Pipable RxJS Operatorby@kzarman
1,902 reads
1,902 reads

Building Your Own Custom Pipable RxJS Operator

by Arman MurzabulatovAugust 11th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

RxJS is a toolkit for asynchronous JavaScript programming using observable sequences. This article guides developers through understanding basic RxJS operators, combining them, and crafting custom operators, including advanced ones like mergeMap and switchMap. For production-grade applications, the emphasis is on robust error handling, resource management, and the importance of testing. Mastery of custom RxJS operators aids in encapsulating complex logic, enhancing code readability, and improving maintainability.
featured image - Building Your Own Custom Pipable RxJS Operator
Arman Murzabulatov HackerNoon profile picture

Hello there, JavaScripters! If you’re into asynchronous programming, you’ve probably heard of RxJS — short for Reactive Extensions for JavaScript.


It’s a pretty cool library, a toolkit of sorts for creating programs that are asynchronous and event-based, thanks to the magic of observable sequences.


Now, if you’re here to level up your RxJS game, you’re at the right place! We’re going to dive into a nifty little thing called operators. They’re the superheroes of RxJS, transforming, combining, and controlling the flow of data through observables.


With a neat pipe function, you can string them together and create amazing data pipelines. So, buckle up as we're going to craft some custom pipeable RxJS operators!

Pipeable Operators: The Basics

Before we start crafting our custom operators, let’s go through a quick refresher on pipeable operators in RxJS. They’re essentially functions that take an observable, do something to the data emitted by the observable, and spit out a new observable.


Consider this: You’ve got an observable spitting out numbers. You can use the map() operator to transform each number by squaring it:

import { of } from 'rxjts';
import { map } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);

numbers$.pipe(
  map(num => num * num)
).subscribe(square => console.log(square));
// Output: 1, 4, 9, 16, 25


In this scenario, we whipped up an Observable numbers$ using the of() function. Then, we used the map() operator to transform each number by squaring it, giving us the squared numbers as the output. Easy, right?

Mixing and Matching Operators

Operators can be mixed and matched. Check out this example:

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);

numbers$.pipe(
  filter(number => number % 2 === 0),
  map(number => number * 2)
).subscribe(number => console.log(number));
// Output: 4, 8


We combined two operators, filter() and map(), to filter out even numbers and double them. It's like Lego blocks for code!

Meet the Custom Pipeable Operators

Now, RxJS has quite a number of built-in operators that cover most programming needs. But sometimes, you might need something a little more custom. Enter custom pipeable operators.


They’re just functions that take in an observable and return a new observable. These operators are perfect for reducing code redundancy, improving readability, and making testing easier.

Why Customize?

Creating your custom pipeable operators makes complex or reusable logic a breeze. They help in managing side effects, performing complex transformations, or wrapping business logic.


By encapsulating logic into a single function, they help you keep your code maintainable, simple, and less prone to errors.

How to Craft Custom Operators

Time to roll up our sleeves and start crafting custom pipeable RxJS operators. Let’s create a simple operator that doubles each value emitted by an observable:

import { pipe } from 'rxjs';
import { map } from 'rxjs/operators';

function multiplyByTwo() {
  return function (source) {
    return source.pipe(map(value => value * 2));
  }
}


In this snippet, we crafted a function named multiplyByTwo that churns out another function. This new function takes in an observable (source) and returns a new observable that doubles every value.


You can use this operator just like you would use any other RxJS operator:

import { of } from 'rxjs';

of(1, 2, 3, 4, 5).pipe(multiplyByTwo()).subscribe(console.log);
// Output: 2, 4, 6, 8, 10

What About Parameters?

You can also create custom operators that accept parameters. Let’s make our multiplyByTwo operator more generic:

function multiplyBy(factor) {
  return function (source) {
    return source.pipe(map(value => value * factor));
  }
}


Here’s how to use this operator:

of(1, 2, 3, 4, 5).pipe(multiplyBy(3)).subscribe(console.log);
// Output: 3, 6, 9, 12, 15

An Operator From Scratch

Can we build an operator from scratch? Absolutely! Let’s build from scratch the same multiplyBy operator, but this time without using any of the existing operators:

import { Observable } from 'rxjs';

function multiplyBy(factor) {
  return function (source) {
    return new Observable(observer => {
      const subscription = source.subscribe({
        next(value) {
          observer.next(value * factor);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
      
      return {
        unsubscribe() {
          subscription.unsubscribe();
        }
      }
    });
  }
}


Whew! We did it! We crafted an operator from scratch that takes care of next()error(), and complete(). This operator even cleans up after itself when the observable is done, and it’s important not to forget the unsubscribe logic while implementing your own operators later.


You’re almost there on becoming an RxJS operators’ maestro. But first, let’s get some more practice in. I’ll guide you through two more examples where we’ll recreate the existing tap and delay operators.


Creating a custom tap operator is pretty straightforward:

function customTap<T>(callback: (value: T) => void): OperatorFunction<T, T> {
    return function(source) {
        return new Observable(observer => {
            const subscription = source.subscribe({
                next(value) {
                    callback(value);
                    observer.next(value);
                },
                error(err) { observer.error(err); },
                complete() { observer.complete(); }
            });

            return subscription;
        });
    }
}


So, what’s happening here? This operator just calls the callback function to do some side effect, then forwards the original value without changing it. Pretty neat, right?


Now, let’s create a custom delay operator. This one needs JavaScript's setTimeout:

function customDelay<T>(delayDuration: number): OperatorFunction<T, T> {
    return function(source) {
        return new Observable(observer => {
            const subscription = source.subscribe({
                next(value) {
                    setTimeout(() => observer.next(value), delayDuration);
                },
                error(err) { observer.error(err); },
                complete() { observer.complete(); }
            });

            return subscription;
        });
    }
}


In this case, we’re using setTimeout to hold off on sending each value from the source observable for a certain amount of time. Cool, right?


Please note that these implementations are simplified versions of these operators and don’t cover important aspects like error handling or unsubscription logic. But don’t worry, we’ll touch on those soon.

Building Complex Operators

So far, we’ve been making simple operators, which isn’t usually the case in the real world. So now, let’s roll up our sleeves and start handling some more complexity.


We’re going to design and recreate the well-known mergeMap andswitchMap operators, which are some of the most commonly used built-in operators.


Making a mergeMap operator is a bit trickier. Here's a simple version:

function customMergeMap<T, R>(projection: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
    return function(source) {
        return new Observable(observer => {
            let index = 0;
            const outerSubscription = source.subscribe({
                next(value) {
                    const innerObservable = from(projection(value, index++));
                    innerObservable.subscribe({
                        next(innerValue) {
                            observer.next(innerValue);
                        },
                        error(err) { observer.error(err); }
                    });
                },
                error(err) { observer.error(err); },
                complete() { observer.complete(); }
            });

            return () => outerSubscription.unsubscribe();
        });
    }
}


The customMergeMap function takes in the source Observable and, for each value it emits, uses the projection function to create an inner Observable. It then subscribes to this inner Observable and sends out its values.


Now, the switchMap operator works somewhat like mergeMap, but it cancels the inner subscription when the source sends out a new value:

function customSwitchMap<T, R>(projection: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
    return function(source) {
        return new Observable(observer => {
            let index = 0;
            let innerSubscription: Subscription;

            const outerSubscription = source.subscribe({
                next(value) {
                    if (innerSubscription) {
                        innerSubscription.unsubscribe();
                    }
                    const innerObservable = from(projection(value, index++));
                    innerSubscription = innerObservable.subscribe({
                        next(innerValue) {
                            observer.next(innerValue);
                        },
                        error(err) { observer.error(err); }
                    });
                },
                error(err) { observer.error(err); },
                complete() { observer.complete(); }
            });

            return () => {
                outerSubscription.unsubscribe();
                if (innerSubscription) {
                    innerSubscription.unsubscribe();
                }
            }
        });
    }
}


customSwitchMap uses a projection function to map each value from the source Observable to a new Observable. Every time the source emits a value, it unsubscribes from the current inner Observable and subscribes to a new one, hence "switching" Observables.

Crafting Robust Operators

Our current operator implementations, while they do work, are missing crucial error handling and resource management, which are vital for production-ready operators.


Consider adding try/catch blocks to handle errors in user-provided functions:

try {
    const result = projection(value);
    observer.next(result);
} catch (err) {
    observer.error(err);
}


Also, make sure to handle errors and completions in inner subscriptions:

innerObservable.subscribe({
    next(innerValue) { observer.next(innerValue); },
    error(err) { observer.error(err); },
    complete() { /* Handle completion here */ }
});


Unsubscription logic can get a bit tricky when multiple subscriptions are involved. Always remember to clean up:

return () => {
    outerSubscription.unsubscribe();
    innerSubscription?.unsubscribe();
    /* Additional cleanup logic here */
};


In a nutshell, creating custom RxJS operators is about understanding the ins and outs of the RxJS library. Not only do you need to know how each operator works, but also how observables, operators, and subscribers interact with each other.


Remember to thoroughly test your custom operators, carefully handle exceptions, and properly clean up resources. And don’t hesitate to explore the source code of the RxJS library itself — it’s a gold mine of knowledge and advanced patterns.

The Takeaway

Being able to create custom pipeable RxJS operators is like having a superpower. It makes your code more manageable, easier to read, and lets you encapsulate complex logic into reusable chunks.


With this trick up your sleeve, you’ll be able to handle asynchronous JavaScript like a pro. Happy coding!


Also published here