paint-brush
Introduction to RxJava: Observable Patternby@printfmyname
1,118 reads
1,118 reads

Introduction to RxJava: Observable Pattern

by Hari YaMarch 26th, 2020
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Using RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. In RxJava all Subjects acts as both Observer and Observable. This open up many opportunities to decouple your program and handle operation in a contained manner (separation of concerns). This improves event management by allowing observable to be passed around and handle messages on different services. With ReplaySubject every service that subscribe to the processInputs observable will receive all messages as long as onError/onComplete is not called.

Company Mentioned

Mention Thumbnail
featured image - Introduction to RxJava: Observable Pattern
Hari Ya HackerNoon profile picture
"RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences." by RxJava developers.

That is it. It is basically allows you to follow a reactive programming paradigm.

Purpose of this article is to introduce you to Observable. To do that, I will walk you though a simple use case that I encountered and benefited using Observable.

Use case:

Spawn up a external process using ProcessBuilder and use Observable to observe process output.

I will first write the complete program if I weren't to use RxJava followed by an explanation. Then to the Reactive programming.

Implementation 1: Conventional method using Java Consumer

package io.convert2pdf.commons.http;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.function.Consumer;

public class RxJavaObservableProcess {
    ProcessBuilder processBuilder = null;
    Process process = null;
    
    Consumer<String> inputConsumer;
    Consumer<String> errorConsumer;
    Consumer<Void> completeConsumer;
    
    public RxJavaObservableProcess(List<String> command, Consumer<String> inputConsumer, Consumer<String> errorConsumer, Consumer<Void> completeConsumer) {
        processBuilder = new ProcessBuilder(command);
        this.inputConsumer = inputConsumer;
        this.errorConsumer = errorConsumer;
        this.completeConsumer = completeConsumer;
    }
    
    public void start() throws Exception {
        process = processBuilder.start();
        readInputStream(process);
        readErrorStream(process);
        process.waitFor();
    }
    
    private void readInputStream(Process process) {
        new Thread(() -> {
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line = null;
            try {
                while((line = br.readLine()) != null) {
                    this.inputConsumer.accept(line);
                }
                this.completeConsumer.accept(null);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    private void readErrorStream(Process process) {
        new Thread(() -> {
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
            String line = null;
            try {
                while((line = br.readLine()) != null) {
                    this.errorConsumer.accept(line);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    public static void main(String[] args) throws Exception {
        RxJavaObservableProcess process = new RxJavaObservableProcess(
                List.of("tasklist"), 
                (input) -> System.out.println("Input: " + input), 
                (error) -> System.out.println("Error: " + error), 
                (complete) -> System.out.println("Complete: " + complete)
        );
        
        process.start();
    }
}

Note: if you are running this on a Unix, change List.of("tasklist") to List.of("ps", "aux")

Explanation:

On main method, I initialize RxJavaObservableProcess, which takes 4 arguments

  1. The command. In this case, I simply want to get current running processes on my Windows laptop.
  2. Input stream consumer. A consumer that get called when ever there is an input line available.
  3. Error stream consumer. A consumer that get called when ever there is an error line available.
  4. Complete state consumer. I added this consumer so that when you read the code with RxJava you can make a connection between these.

I creates a Process with given arguments on my start method and read input and error streams on readInputStream and readErrorStream methods, respectively. Reading input/error streams are a blocking operations, thus I process those in a separate thread.

Inside each stream read methods, I use BufferedReader to read a line and write it to a consumer. At the end of reading input stream, I invoke the completeConsumer with null to let user know that the process is complete.

This is a simple task, but it took so much code to write it. Plus, I had to pass in all the consumers before hand so that I can attach them to corresponding stream before I invoke the blocking call, process.waitFor().

This is not a bad program, I actually have no issues with it. But, I think reactive programming paradigm can help me to make it better.

Implementation 2: Using RxJava

package io.convert2pdf.commons.http;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.ReplaySubject;

public class RxJavaObservableProcessV2 {
    ProcessBuilder processBuilder = null;
    Process process = null;
    
    ReplaySubject<String> processInputs = ReplaySubject.create();
    
    public RxJavaObservableProcessV2(List<String> command) {
        processBuilder = new ProcessBuilder(command);
    }
    
    public void startAsync() throws Exception {
        new Thread(() -> {
            try {
                process = processBuilder.start();
                readInputStream(process);
                readErrorStream(process);
                process.waitFor();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
    
    public Observable<String> getInputs() {
        return processInputs;
    }
    
    private void readInputStream(Process process) {
        new Thread(() -> {
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()));
            String line = null;
            try {
                while((line = br.readLine()) != null) {
                    this.processInputs.onNext(line);
                }
                this.processInputs.onComplete();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    private void readErrorStream(Process process) {
        new Thread(() -> {
            BufferedReader br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
            String line = null;
            try {
                ArrayList<String> buffer = new ArrayList<String>();
                while((line = br.readLine()) != null) {
                    buffer.add(line);
                }
                if(buffer.size() > 0) {
                    this.processInputs.onError(new Throwable(Arrays.toString(buffer.toArray())));                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    public static void main(String[] args) throws Exception {
        RxJavaObservableProcessV2 process = new RxJavaObservableProcessV2(List.of("tasklist"));
        process.startAsync();
        
        process.getInputs().subscribe(
                (input) -> System.out.println("Input: " + input), 
                (error) -> System.out.println("Error: " + error), 
                () -> System.out.println("Completed")
        );
    }
}

Note that a complete implementation should take care of unsubscribing as well. Subscribe returns a Disposable, which should be disposed once with are done.

Explanation:

Code look rather similar with a minor change. Now, all the consumers are replaced by a single ReplaySubject processInputs. This open up many opportunities to decouple your program and handle operation in a contained manner (separation of concerns). The ReplaySubject I used here, is a type of observer that emit events. These events can be emitted as

  1. a simple message though onNext
  2. an error such as Throwable through onError
  3. a complete state though onComplete

Once a observer completes it cannot emit next or error messages. Same goes for error as well. In RxJava all Subjects acts as both Observer and Observable.

This is handy. After creating and kick starting the process on startAsync. I can pass that object around and still access to the input and error stream by subscribing to the Observable return by getInputs().

It is the conversion to return an observable so that we do not expose the Subject to the outside world. Unlike Observer, Observable only allow user to observe a source (subject) and does not allow emitting events though it.

This is different from previous example. Previously, once a consumer consumes a message, it is gone. If other services need to handle data from the process, we need to explicitly call those services within out consumers. With ReplaySubject every service that subscribe to the processInputs observable will receive all messages as long as onError/onComplete is not called. This improves event management by allowing observable to be passed around and handle messages on different services.

RxJava is a cool technology and has many benefits. While I only touched upon the general use of Observable and ReplaySubject, You can read more about by going to RxJava or ReactiveX websites.

I used a variation of this code to manage processes on PS2PDF website. Ps2pdf.com allows users to compress pdf and convert images such as jpeg. All of these conversions spawn processes which execute FFMPEG or some other open source program underneath to do the actual conversion. One place I use this is when I want to update percentage completion of a video conversion to end user.

I spawn a FFMPEG process and pass that Observable process to a service called WebSocketService. Which manages real-time communication between users and the system. This WebSocketService subscribe to the input stream Observable and wait for data. In case of FFMPEG, input stream of line takes the following form.

frame= 1185 fps=337 q=38.0 size=    2048kB time=00:00:20.94 bitrate= 801.1kbits/s speed=5.95x

WebSocketService decodes this message and send the progress to the front-end which will be displayed with a progress bar so that user knows whats going on.

Note: This code was written in Java 14 and RxJava 3. It should work with Java 8+ with any version of RxJava with current imports.