Java 8 stream has made my programming life simpler as a software engineer. However there are a lot more that can be improved by incorporating the RXJava library. RXJava contains tons of functionalities to process streams or observables.
In Java 9 there is a similar concept called
Flow
[3]. If you are considering upgrading to Java 9 or already been using Java 9, but have not used reactive streams programming concept then you might want to try flow concept beforehand.In this article I will highlight a subset of RXJava features that in my opinion will be most useful in my codes. I will not cover all the functionalities that RXJava has because that will be too long. For more coverage in RXJava functionalities, please read [1].
Sometimes there are cases that you wanted to iterate together between 2 Collections. Say you have two lists of integers and you want to "combine" each element with the same index in list 1 with each element in list 2. Then you would probably think code like this.
List<Integer> list1 = IntStream.range(0, 10)
.boxed()
.collect(Collectors.toList());
List<Integer> list2 = IntStream.range(1, 11)
.boxed()
.collect(Collectors.toList());
for (int i = 0; i < list1.size(); i++) {
System.out.println(list1.get(i) + list2.get(i));
}
In python, there is an operator called
zip
, which makes this code shorter and luckily I can also do that with RXJava's zip
operator!Observable.zip(
Observable.range(0, 10),
Observable.range(1, 11),
(x, y) -> x + y)
.subscribe(System.out::println);
Java stream is designed to be used one time. If you’re wondering why there’s a great explanation of that at stackoverflow [2], but I will not get into that here.
Now, consider this example
Stream<Integer> intStream = IntStream.range(0, 10).boxed();
intStream.forEach(System.out::println);
intStream.forEach(System.out::println);
This code will print integer from 0 to 10, but then will the following exception.
java.lang.IllegalStateException: stream has already been operated upon or closed
Java 8 stream is not meant to be used more than once. The reason behind this behavior is, in my understanding, a unification behavior between iterator for non-one-shot source and one-shot source [2]. In one-shot source, like reading lines from a file, it is hard to determine the case of if we iterate this twice what should happen?
With Java 8 stream, you have to re-initiated the stream to make this works.
Stream<Integer> intStream = IntStream.range(0, 10).boxed();
intStream.forEach(System.out::println);
intStream = IntStream.range(0, 10).boxed();
intStream.forEach(System.out::println);
RXJava programming concept assumption is different from Java 8 stream, thus allowing us to re-use the stream in
Collection
.Observable<Integer> intObservable = Observable.range(0, 10);
intObservable.subscribe(System.out::println);
intObservable.subscribe(System.out::println);
Consider another example. You wanted to do division operation and use integer from -4 to 5 as the denominator.
IntStream.range(-4, 5)
.map(i -> 1 / i)
.forEach(System.out::println);
you will notice that there's a case where in the middle of the stream this will calculate
1/0
which will throw an errorjava.lang.ArithmeticException: / by zero
Now you have two choices
filter(i != 0)
stream operation for this.map
function, maybe to use try... catch...
. Okay, I will be honest with you, this option is sucks.In this case you can easily go with option 1, but not in real-world cases. In real-world cases you would very much likely go with option 2. You do not even know what are the values that caused the error. Or if you know the values causing the error you need to consider how many values you need to hardcode into the filter operation. Maybe it is just easier for you to place an error handling function.
RXJava, fortunately, will help you to handle that error case without writing a try-catch block! Here are some ways you can handle unwanted value causing an error, without
filter
function.Add On Error Subscription
subscribe()
may take additional subscription that handles when an error happens, for example:Observable.range(-4, 5)
.map(i -> 1 / i)
.subscribe(
System.out::println,
t -> System.out.println("some error happened"));
This block of code will print "some error happened" because it will still try to execute
1/0
, though it will not throw an exception now.Substitute Value When Error
Use
onErrorReturnItem()
to give default value if error happened Observable.range(-4, 5)
.map(i -> 1 / i)
.onErrorReturnItem(-1)
.subscribe(
System.out::println,
t -> System.out.println("some error happened"));
You need a little bit more careful when using
onErrorReturnItem()
. This code will not call on error subscription function because it will replace the values with -1
after an error occurs. This means, values after 0 will be replaced by -1 regardless next values will produce an error or not.RXJava has many ways to simplify your stream codes, but that's not the only function that it has. This article has not yet discussed other exciting functionalities of RXJava, such as how to handles "hot" or "cold" observables, how to handle backpressure, or how should we parallelize the operation. I encourage readers to explore more about the reactive concept and dig into the RXJava library to experience and thinking about making your applications to be reactive.
[1] Samoylov, N., Nield, T. 2020. Learning RxJava 2nd Edition.
[2] https://stackoverflow.com/questions/28459498/why-are-java-streams-once-off/28513908#28513908
[3] Java 9 Reactive Streams. https://www.baeldung.com/java-9-reactive-streams