" is a Java VM implementation of : a library for composing asynchronous and event-based programs by using observable sequences." by RxJava developers. RxJava Reactive Extensions That is it. It is basically allows you to follow a reactive programming paradigm. Purpose of this article is to introduce you to . To do that, I will walk you though a simple use case that I encountered and benefited using Observable. Observable Use case: Spawn up a external process using and use Observable to observe process output. ProcessBuilder I will first write the complete program if I weren't to use RxJava followed by an explanation. Then to the Reactive programming. Implementation 1: Conventional method using Java Consumer io.convert2pdf.commons.http; java.io.BufferedReader; java.io.IOException; java.io.InputStreamReader; java.util.List; java.util.function.Consumer; { ProcessBuilder processBuilder = ; Process process = ; Consumer<String> inputConsumer; Consumer<String> errorConsumer; Consumer<Void> completeConsumer; { processBuilder = ProcessBuilder(command); .inputConsumer = inputConsumer; .errorConsumer = errorConsumer; .completeConsumer = completeConsumer; } { process = processBuilder.start(); readInputStream(process); readErrorStream(process); process.waitFor(); } { Thread(() -> { BufferedReader br = BufferedReader( InputStreamReader(process.getInputStream())); String line = ; { ((line = br.readLine()) != ) { .inputConsumer.accept(line); } .completeConsumer.accept( ); } (IOException e) { e.printStackTrace(); } }).start(); } { Thread(() -> { BufferedReader br = BufferedReader( InputStreamReader(process.getErrorStream())); String line = ; { ((line = br.readLine()) != ) { .errorConsumer.accept(line); } } (IOException e) { e.printStackTrace(); } }).start(); } { RxJavaObservableProcess process = RxJavaObservableProcess( List.of( ), (input) -> System.out.println( + input), (error) -> System.out.println( + error), (complete) -> System.out.println( + complete) ); process.start(); } } package import import import import import public class RxJavaObservableProcess null null public RxJavaObservableProcess (List<String> command, Consumer<String> inputConsumer, Consumer<String> errorConsumer, Consumer<Void> completeConsumer) new this this this Exception public void start () throws private void readInputStream (Process process) new new new null try while null this this null catch private void readErrorStream (Process process) new new new null try while null this catch Exception public static void main (String[] args) throws new "tasklist" "Input: " "Error: " "Complete: " Note: if you are running this on a Unix, change List.of("tasklist") to List.of("ps", "aux") Explanation: On main method, I initialize RxJavaObservableProcess, which takes 4 arguments The command. In this case, I simply want to get current running processes on my Windows laptop. Input stream consumer. A consumer that get called when ever there is an input line available. Error stream consumer. A consumer that get called when ever there is an error line available. Complete state consumer. I added this consumer so that when you read the code with RxJava you can make a connection between these. I creates a Process with given arguments on my method and read input and error streams on and methods, respectively. Reading input/error streams are a blocking operations, thus I process those in a separate thread. start readInputStream readErrorStream Inside each stream read methods, I use to read a line and write it to a consumer. At the end of reading input stream, I invoke the with null to let user know that the process is complete. BufferedReader completeConsumer This is a simple task, but it took so much code to write it. Plus, I had to pass in all the consumers before hand so that I can attach them to corresponding stream before I invoke the blocking call, process.waitFor(). This is not a bad program, I actually have no issues with it. But, I think reactive programming paradigm can help me to make it better. Implementation 2: Using RxJava io.convert2pdf.commons.http; java.io.BufferedReader; java.io.IOException; java.io.InputStreamReader; java.util.ArrayList; java.util.Arrays; java.util.List; io.reactivex.rxjava3.core.Observable; io.reactivex.rxjava3.subjects.ReplaySubject; { ProcessBuilder processBuilder = ; Process process = ; ReplaySubject<String> processInputs = ReplaySubject.create(); { processBuilder = ProcessBuilder(command); } { Thread(() -> { { process = processBuilder.start(); readInputStream(process); readErrorStream(process); process.waitFor(); } (Exception e) { e.printStackTrace(); } }).start(); } { processInputs; } { Thread(() -> { BufferedReader br = BufferedReader( InputStreamReader(process.getInputStream())); String line = ; { ((line = br.readLine()) != ) { .processInputs.onNext(line); } .processInputs.onComplete(); } (IOException e) { e.printStackTrace(); } }).start(); } { Thread(() -> { BufferedReader br = BufferedReader( InputStreamReader(process.getErrorStream())); String line = ; { ArrayList<String> buffer = ArrayList<String>(); ((line = br.readLine()) != ) { buffer.add(line); } (buffer.size() > ) { .processInputs.onError( Throwable(Arrays.toString(buffer.toArray()))); } } (IOException e) { e.printStackTrace(); } }).start(); } { RxJavaObservableProcessV2 process = RxJavaObservableProcessV2(List.of( )); process.startAsync(); process.getInputs().subscribe( (input) -> System.out.println( + input), (error) -> System.out.println( + error), () -> System.out.println( ) ); } } package import import import import import import import import public class RxJavaObservableProcessV2 null null public RxJavaObservableProcessV2 (List<String> command) new Exception public void startAsync () throws new try catch Observable<String> public getInputs () return private void readInputStream (Process process) new new new null try while null this this catch private void readErrorStream (Process process) new new new null try new while null if 0 this new catch Exception public static void main (String[] args) throws new "tasklist" "Input: " "Error: " "Completed" Note that a complete implementation should take care of unsubscribing as well. Subscribe returns a Disposable, which should be disposed once with are done. Explanation: Code look rather similar with a minor change. Now, all the consumers are replaced by a single This open up many opportunities to decouple your program and handle operation in a contained manner ( ). The I used here, is a type of observer that emit events. These events can be emitted as ReplaySubject processInputs. separation of concerns ReplaySubject a simple message though onNext an error such as Throwable through onError a complete state though onComplete Once a observer completes it cannot emit next or error messages. Same goes for error as well. In RxJava all Subjects acts as both and . Observer Observable This is handy. After creating and kick starting the process on . I can pass that object around and still access to the input and error stream by subscribing to the Observable return by startAsync getInputs(). It is the conversion to return an observable so that we do not expose the Subject to the outside world. Unlike Observer, Observable only allow user to observe a source (subject) and does not allow emitting events though it. This is different from previous example. Previously, once a consumer consumes a message, it is gone. If other services need to handle data from the process, we need to explicitly call those services within out consumers. With every service that subscribe to the observable will receive all messages as long as / is not called. This improves event management by allowing observable to be passed around and handle messages on different services. ReplaySubject processInputs onError onComplete RxJava is a cool technology and has many benefits. While I only touched upon the general use of Observable and ReplaySubject, You can read more about by going to RxJava or ReactiveX websites. I used a variation of this code to manage processes on PS2PDF website. Ps2pdf.com allows users to and such as jpeg. All of these conversions spawn processes which execute or some other open source program underneath to do the actual conversion. One place I use this is when I want to update percentage completion of a to end user. compress pdf convert images FFMPEG video conversion I spawn a FFMPEG process and pass that Observable process to a service called WebSocketService. Which manages real-time communication between users and the system. This WebSocketService subscribe to the input stream Observable and wait for data. In case of FFMPEG, input stream of line takes the following form. frame= 1185 fps=337 q=38.0 size= 2048kB time=00:00:20.94 bitrate= 801.1kbits/s speed=5.95x WebSocketService decodes this message and send the progress to the front-end which will be displayed with a progress bar so that user knows whats going on. Note: This code was written in Java 14 and RxJava 3. It should work with Java 8+ with any version of RxJava with current imports.