Hello there, JavaScripters! If you’re into asynchronous programming, you’ve probably heard of RxJS — short for Reactive Extensions for JavaScript.
It’s a pretty cool library, a toolkit of sorts for creating programs that are asynchronous and event-based, thanks to the magic of observable sequences.
Now, if you’re here to level up your RxJS game, you’re at the right place! We’re going to dive into a nifty little thing called operators. They’re the superheroes of RxJS, transforming, combining, and controlling the flow of data through observables.
With a neat pipe
function, you can string them together and create amazing data pipelines. So, buckle up as we're going to craft some custom pipeable RxJS operators!
Before we start crafting our custom operators, let’s go through a quick refresher on pipeable operators in RxJS. They’re essentially functions that take an observable, do something to the data emitted by the observable, and spit out a new observable.
Consider this: You’ve got an observable spitting out numbers. You can use the map()
operator to transform each number by squaring it:
import { of } from 'rxjts';
import { map } from 'rxjs/operators';
const numbers$ = of(1, 2, 3, 4, 5);
numbers$.pipe(
map(num => num * num)
).subscribe(square => console.log(square));
// Output: 1, 4, 9, 16, 25
In this scenario, we whipped up an Observable numbers$
using the of()
function. Then, we used the map()
operator to transform each number by squaring it, giving us the squared numbers as the output. Easy, right?
Operators can be mixed and matched. Check out this example:
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const numbers$ = of(1, 2, 3, 4, 5);
numbers$.pipe(
filter(number => number % 2 === 0),
map(number => number * 2)
).subscribe(number => console.log(number));
// Output: 4, 8
We combined two operators, filter()
and map()
, to filter out even numbers and double them. It's like Lego blocks for code!
Now, RxJS has quite a number of built-in operators that cover most programming needs. But sometimes, you might need something a little more custom. Enter custom pipeable operators.
They’re just functions that take in an observable and return a new observable. These operators are perfect for reducing code redundancy, improving readability, and making testing easier.
Creating your custom pipeable operators makes complex or reusable logic a breeze. They help in managing side effects, performing complex transformations, or wrapping business logic.
By encapsulating logic into a single function, they help you keep your code maintainable, simple, and less prone to errors.
Time to roll up our sleeves and start crafting custom pipeable RxJS operators. Let’s create a simple operator that doubles each value emitted by an observable:
import { pipe } from 'rxjs';
import { map } from 'rxjs/operators';
function multiplyByTwo() {
return function (source) {
return source.pipe(map(value => value * 2));
}
}
In this snippet, we crafted a function named multiplyByTwo
that churns out another function. This new function takes in an observable (source
) and returns a new observable that doubles every value.
You can use this operator just like you would use any other RxJS operator:
import { of } from 'rxjs';
of(1, 2, 3, 4, 5).pipe(multiplyByTwo()).subscribe(console.log);
// Output: 2, 4, 6, 8, 10
You can also create custom operators that accept parameters. Let’s make our multiplyByTwo
operator more generic:
function multiplyBy(factor) {
return function (source) {
return source.pipe(map(value => value * factor));
}
}
Here’s how to use this operator:
of(1, 2, 3, 4, 5).pipe(multiplyBy(3)).subscribe(console.log);
// Output: 3, 6, 9, 12, 15
Can we build an operator from scratch? Absolutely! Let’s build from scratch the same multiplyBy
operator, but this time without using any of the existing operators:
import { Observable } from 'rxjs';
function multiplyBy(factor) {
return function (source) {
return new Observable(observer => {
const subscription = source.subscribe({
next(value) {
observer.next(value * factor);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
}
});
return {
unsubscribe() {
subscription.unsubscribe();
}
}
});
}
}
Whew! We did it! We crafted an operator from scratch that takes care of next()
, error()
, and complete()
. This operator even cleans up after itself when the observable is done, and it’s important not to forget the unsubscribe logic while implementing your own operators later.
You’re almost there on becoming an RxJS operators’ maestro. But first, let’s get some more practice in. I’ll guide you through two more examples where we’ll recreate the existing tap
and delay
operators.
Creating a custom tap
operator is pretty straightforward:
function customTap<T>(callback: (value: T) => void): OperatorFunction<T, T> {
return function(source) {
return new Observable(observer => {
const subscription = source.subscribe({
next(value) {
callback(value);
observer.next(value);
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
return subscription;
});
}
}
So, what’s happening here? This operator just calls the callback function to do some side effect, then forwards the original value without changing it. Pretty neat, right?
Now, let’s create a custom delay
operator. This one needs JavaScript's setTimeout
:
function customDelay<T>(delayDuration: number): OperatorFunction<T, T> {
return function(source) {
return new Observable(observer => {
const subscription = source.subscribe({
next(value) {
setTimeout(() => observer.next(value), delayDuration);
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
return subscription;
});
}
}
In this case, we’re using setTimeout
to hold off on sending each value from the source observable for a certain amount of time. Cool, right?
Please note that these implementations are simplified versions of these operators and don’t cover important aspects like error handling or unsubscription logic. But don’t worry, we’ll touch on those soon.
So far, we’ve been making simple operators, which isn’t usually the case in the real world. So now, let’s roll up our sleeves and start handling some more complexity.
We’re going to design and recreate the well-known mergeMap
andswitchMap
operators, which are some of the most commonly used built-in operators.
Making a mergeMap
operator is a bit trickier. Here's a simple version:
function customMergeMap<T, R>(projection: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
return function(source) {
return new Observable(observer => {
let index = 0;
const outerSubscription = source.subscribe({
next(value) {
const innerObservable = from(projection(value, index++));
innerObservable.subscribe({
next(innerValue) {
observer.next(innerValue);
},
error(err) { observer.error(err); }
});
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
return () => outerSubscription.unsubscribe();
});
}
}
The customMergeMap
function takes in the source Observable and, for each value it emits, uses the projection function to create an inner Observable. It then subscribes to this inner Observable and sends out its values.
Now, the switchMap
operator works somewhat like mergeMap
, but it cancels the inner subscription when the source sends out a new value:
function customSwitchMap<T, R>(projection: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R> {
return function(source) {
return new Observable(observer => {
let index = 0;
let innerSubscription: Subscription;
const outerSubscription = source.subscribe({
next(value) {
if (innerSubscription) {
innerSubscription.unsubscribe();
}
const innerObservable = from(projection(value, index++));
innerSubscription = innerObservable.subscribe({
next(innerValue) {
observer.next(innerValue);
},
error(err) { observer.error(err); }
});
},
error(err) { observer.error(err); },
complete() { observer.complete(); }
});
return () => {
outerSubscription.unsubscribe();
if (innerSubscription) {
innerSubscription.unsubscribe();
}
}
});
}
}
customSwitchMap
uses a projection function to map each value from the source Observable to a new Observable. Every time the source emits a value, it unsubscribes from the current inner Observable and subscribes to a new one, hence "switching" Observables.
Our current operator implementations, while they do work, are missing crucial error handling and resource management, which are vital for production-ready operators.
Consider adding try/catch
blocks to handle errors in user-provided functions:
try {
const result = projection(value);
observer.next(result);
} catch (err) {
observer.error(err);
}
Also, make sure to handle errors and completions in inner subscriptions:
innerObservable.subscribe({
next(innerValue) { observer.next(innerValue); },
error(err) { observer.error(err); },
complete() { /* Handle completion here */ }
});
Unsubscription logic can get a bit tricky when multiple subscriptions are involved. Always remember to clean up:
return () => {
outerSubscription.unsubscribe();
innerSubscription?.unsubscribe();
/* Additional cleanup logic here */
};
In a nutshell, creating custom RxJS operators is about understanding the ins and outs of the RxJS library. Not only do you need to know how each operator works, but also how observables, operators, and subscribers interact with each other.
Remember to thoroughly test your custom operators, carefully handle exceptions, and properly clean up resources. And don’t hesitate to explore the source code of the RxJS library itself — it’s a gold mine of knowledge and advanced patterns.
Being able to create custom pipeable RxJS operators is like having a superpower. It makes your code more manageable, easier to read, and lets you encapsulate complex logic into reusable chunks.
With this trick up your sleeve, you’ll be able to handle asynchronous JavaScript like a pro. Happy coding!
Also published here