The RxRecipes series generally refers to Android and RxJava2, but the same recipes can apply to any Rx implementation and platform (RxJs, RxSwift, a backend with RxJava, etc).
Rx Composition with External Code
Imagine you have a wonderful reactive data layer for your newest application. The API/Disk/Cache setup is beautiful, so fresh so clean, truly an oasis of joy and wonder.
A new requirement comes in from product: you need to initialize the application with some static data to have a wait-free/no-internet-required first run.
So you decide to throw a JSON document in your /assets folder.
Except you soon realize this new data source (and associated AssetManager API) doesn’t fit with your compos-able, schedul-able, chain-able, error-terminating-able, (and all the other-ables) reactive data layer.
What we really want is a reactive concise API like this:
But how do we get there?
Solution: Wrap with fromCallable()
From the official documentation:
Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function. This allows you to defer the execution of the function you specify until an observer subscribes to the ObservableSource. That is to say, it makes the function “lazy.”
Using fromCallable() is the simplest way to take declarative external code, and make it reactive.
The fromCallable() creation method expects a callable implementation matching the following simple interface:
Let’s see the code
I will be using some Guava classes in this specific implementation (Optional, CharStreams, Lists).
- We return a Single instance which executes the following code when subscribed:
- We attempt get an InputStream for the file and read the stream into a CharStream which produces a String result.
- If there is an IOException (file doesn’t exist, some disk issue) we return an absent Optional.
- We create an Observable which executes the following code when subscribed:
- We get the existing locales for our assets (String).
- We use the flatMapIterable() operator to transform the stream from String items into a stream of Strings which emits each item in each String (in our case there is just one array in the original stream).
- We return a Completable which executes the close method when subscribed to.
- Edit: We are using fromAction() which is like fromCallable() except it accepts a runnable (no return type). fromAction() is still deferred execution, but then we don’t have to do any weird return (thanks Dávid Karnok for the tip!).
- Only the Completable type has fromAction() of course, since the other reactive types contain values.
Why fromCallable() and not just(), or other creation methods
It’s critical that we use fromCallable() and not something like just(), or fromIterable(), etc. because we don’t want this code to run synchronously.
This is a critical point, so let’s look at an example:
We are now blocking the caller thread for a long time. If you are calling this from a the main thread in Android, you’ve just caused an ANR, or caused the UI to freeze and become unresponsive.
But even if you called this from a thread that you don’t mind blocking there are still issues.
Imagine this file get’s modified earlier in chain of operators, that change won’t be reflected here. The file could even be deleted after the source is created, and the reactive source won’t reflect that.
To put it another way: The reactive source that we create will emit the result of the operations that already ran when the source was created; NOT run the operation when the source is subscribed to and then emit the result.
That is a bit of a mouthful, but understanding the difference is crucial in creating Reactive applications, or wrapping imperative code.
We often need to interface some reactive code with imperative code we don’t own (libraries, operating system, internal shared libraries across projects).
We can wrap the imperative code with fromCallable() to create observable stream sources.
We reviewed an example Rx-fromCallable()-wrapped implementation of some of the AssetManager API.
fromCallable() (and fromAction() for Completables) creates a reactive source that executes it’s work when subscribed to, not when we create it.
Next in the RxRecipes Series:
Enjoy the recipe?
Disagree with the implementation?
Have burning-hot fiery opinions about some random sentence above?
I’d love to hear from you, leave your comments below!