RxJava allow a different approach of programming than imperative approach. It is reactive based, where codes are not activate until it knows it’s end mean (subscribed to). Check out a simple analogy example in my previous blog.
RxJava Analogy with 3 lines of Kotlin code
As promised in that blog, I will share the basic benefits compare to imperative approach.
Using imperative approach, moving a piece of code to background thread is hard work. However in RxJava, we could easily define what thread each part of the chain would be in.
Use subscribeOn() to define the default thread the entire RxJava chain work on.
Use observerOn() to define what the bottom chain thread will be work on.
E.g. the below show the default Thread is on Schedulers.io() while the displayResult() is on AndroidManifest.mainThread().
Observable.create(ObservableOnSubscribe<String>
{ emitter -> emitter.onNext(fetchFromServer()) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe{ result -> displayResult(result) }
In the imperative world, imaging you call a function fetch from server, that could potential result in error. So you need to catch and report the error, and has also need callback to state that to the caller.
Also if you have some in between operation, that could result in Error, you need to handle that as well. Hence error handling could be all over the places. Or you have crazy callbacks to handle that.
In RxJava, you could just report the Error, and the subscriber would just be able to centralize how it want to process them.
E.g. the below show the Error could be during fetchFromServer or processResult. But it is centralized in the subscriber managing the error e.g. Log.e the error message
Observable.create(ObservableOnSubscribe<String>
{ emitter ->
try {
emitter.onNext(fetchFromServer())
emitter.onComplete()
} catch (error: Exception) {
emitter.onError(error)
}
})
.map { result ->
try {
processResult(result)
} catch (error: Exception) {
throw OutOfMemoryError("Running out of Memory")
}
}
.subscribe(
{ result -> displayResult(result) },
{ error -> Log.e("TAG", "{$error.message}")},
{ Log.d("TAG", "completed") }
)
Also, if the error happens during fetchFromServer, it will skip the operators, and let the Subscriber handle it directly.
Beside, other than Error, it also has onComplete support, that one could indicate the emitting process is completed
In Android world, we don’t have control over the life cycle. The OS could terminate it as needed. Rotation of device is also consider configuration change that need to kill and restart the activity.
Hence in the event of this happening, while we are still performing some background process e.g. fetching data from server, we would want to stop it immediately, to avoid it returning from callback to a dead activity.
In RxJava (2.0), upon subscription, it will return a disposable object.
val disposable = Observable
.just("rain water", "river water", "lake water")
.map { water -> "Clean $water" }
.subscribe{ cleanWater -> Log.i("TAG", "Drink $cleanWater") }
We could have this disposable value stored and use it to terminate the entire process when needed.
E.g. you could use it during onDestroy to dispose your process done.
override fun onDestroy() {
super.onDestroy()
disposable.dispose()
}
The provided 3 benefits above are the most basic RxJava has on top of Imperative Programming approach. There are also other benefits, such as it has loads of operators that is really useful; it is highly composable, codes could be linked together as a chain, move across threads easily; greatly reduction of callbacks etc..
We could use RxJava approach for almost anything. A little practice is required thought to make one comfortable with it. But its definitely beneficial as you might discover solution it provides on some of your coding challenge face when in imperative approach.
Click “❤” below to share. Thanks! ~Twitter:elye; Facebook:elye