Scott Meschke

@scottmeschke

RxRecipes: Wrap your way to Rx

The RxRecipes series generally refers to Android and RxJava2, but the same recipes can apply to any Rx implementation and platform (RxJs, RxSwift, a backend with RxJava, etc).

Rx Composition with External Code

Imagine you have a wonderful reactive data layer for your newest application. The API/Disk/Cache setup is beautiful, so fresh so clean, truly an oasis of joy and wonder.

A new requirement comes in from product: you need to initialize the application with some static data to have a wait-free/no-internet-required first run.

So you decide to throw a JSON document in your /assets folder.

Except you soon realize this new data source (and associated AssetManager API) doesn’t fit with your compos-able, schedul-able, chain-able, error-terminating-able, (and all the other-ables) reactive data layer.

Ugh. InputStream and array literals, IOException to cover multiple cases (file doesn’t exist, some issue with the disk), synchronous-blocking close. No thanks.

What we really want is a reactive concise API like this:

Optional since the file may not exist. Completeable for the no-return-value operation. and an Observable stream of locales. Sweet! See discussion in comments regarding Maybe type for the getFileAsString() method.

But how do we get there?

Solution: Wrap with fromCallable()

From the official documentation:

Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function. This allows you to defer the execution of the function you specify until an observer subscribes to the ObservableSource. That is to say, it makes the function “lazy.”

Using fromCallable() is the simplest way to take declarative external code, and make it reactive.

The fromCallable() creation method expects a callable implementation matching the following simple interface:

Doesn’t get much simpler than that.

Let’s see the code

I will be using some Guava classes in this specific implementation (Optional, CharStreams, Lists).

  • We return a Single instance which executes the following code when subscribed:
  • We attempt get an InputStream for the file and read the stream into a CharStream which produces a String result.
  • If there is an IOException (file doesn’t exist, some disk issue) we return an absent Optional.
  • We create an Observable which executes the following code when subscribed:
  • We get the existing locales for our assets (String[]).
  • We use the flatMapIterable() operator to transform the stream from String[] items into a stream of Strings which emits each item in each String[] (in our case there is just one array in the original stream).

Why fromCallable() and not just(), or other creation methods

It’s critical that we use fromCallable() and not something like just(), or fromIterable(), etc. because we don’t want this code to run synchronously.

This is a critical point, so let’s look at an example:

We are now blocking the caller thread for a long time. If you are calling this from a the main thread in Android, you’ve just caused an ANR, or caused the UI to freeze and become unresponsive.

But even if you called this from a thread that you don’t mind blocking there are still issues.

Imagine this file get’s modified earlier in chain of operators, that change won’t be reflected here. The file could even be deleted after the source is created, and the reactive source won’t reflect that.

Words are hard…

To put it another way: The reactive source that we create will emit the result of the operations that already ran when the source was created; NOT run the operation when the source is subscribed to and then emit the result.

That is a bit of a mouthful, but understanding the difference is crucial in creating Reactive applications, or wrapping imperative code.

Let’s Review

We often need to interface some reactive code with imperative code we don’t own (libraries, operating system, internal shared libraries across projects).

We can wrap the imperative code with fromCallable() to create observable stream sources.

We reviewed an example Rx-fromCallable()-wrapped implementation of some of the AssetManager API.

fromCallable() (and fromAction() for Completables) creates a reactive source that executes it’s work when subscribed to, not when we create it.

Next in the RxRecipes Series:

TBD — I’ll update this with a link to the next post once it’s done.

Enjoy the recipe?

Disagree with the implementation?

Have burning-hot fiery opinions about some random sentence above?

I’d love to hear from you, leave your comments below!

More by Scott Meschke

Topics of interest

More Related Stories