The RxRecipes series generally refers to Android and RxJava2, but the same recipes can apply to any Rx implementation and platform (RxJs, RxSwift, a backend with RxJava, etc).
Imagine you have a wonderful reactive data layer for your newest application. The API/Disk/Cache setup is beautiful, so fresh so clean, truly an oasis of joy and wonder.
A new requirement comes in from product: you need to initialize the application with some static data to have a wait-free/no-internet-required first run.
So you decide to throw a JSON document in your /assets folder.
Except you soon realize this new data source (and associated AssetManager API) doesn’t fit with your compos-able, schedul-able, chain-able, error-terminating-able, (and all the other-ables) reactive data layer.
What we really want is a reactive concise API like this:
But how do we get there?
From the official documentation:
Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function. This allows you to defer the execution of the function you specify until an observer subscribes to the ObservableSource. That is to say, it makes the function “lazy.”
Using fromCallable() is the simplest way to take declarative external code, and make it reactive.
The fromCallable() creation method expects a callable implementation matching the following simple interface:
I will be using some Guava classes in this specific implementation (Optional, CharStreams, Lists).
It’s critical that we use fromCallable() and not something like just(), or fromIterable(), etc. because we don’t want this code to run synchronously.
This is a critical point, so let’s look at an example:
We are now blocking the caller thread for a long time. If you are calling this from a the main thread in Android, you’ve just caused an ANR, or caused the UI to freeze and become unresponsive.
But even if you called this from a thread that you don’t mind blocking there are still issues.
Imagine this file get’s modified earlier in chain of operators, that change won’t be reflected here. The file could even be deleted after the source is created, and the reactive source won’t reflect that.
To put it another way: The reactive source that we create will emit the result of the operations that already ran when the source was created; NOT run the operation when the source is subscribed to and then emit the result.
That is a bit of a mouthful, but understanding the difference is crucial in creating Reactive applications, or wrapping imperative code.
We often need to interface some reactive code with imperative code we don’t own (libraries, operating system, internal shared libraries across projects).
We can wrap the imperative code with fromCallable() to create observable stream sources.
We reviewed an example Rx-fromCallable()-wrapped implementation of some of the AssetManager API.
fromCallable() (and fromAction() for Completables) creates a reactive source that executes it’s work when subscribed to, not when we create it.
Enjoy the recipe?
Disagree with the implementation?
Have burning-hot fiery opinions about some random sentence above?
I’d love to hear from you, leave your comments below!
Create your free account to unlock your custom reading experience.