Reactive Programming Applied to Legacy Services: A WebFlux example

Author profile picture

@jesperancinhaJoão Esperancinha

1. Introduction

The concept of reactive programming can be traced way back to the mid 60s. It’s a declarative programming model or programming paradigm which is mainly concerned about the handling of asynchronous data streams and the propagation of data through transformations with certain determined orders of execution. The term was invented by Erik Meijer probably around 2010. The history of reactive programming isn’t very clear as to where it started. However, we can see references about it when in old literature we find terms like “data flows”.
These ended up being what we for a long time have been used to call and refer to as streams. Data flows as such were a part of another programming paradigm then called dataflow programming. This term was invented by Jack Dennis and his graduate students in the 1960s at the MIT
Reactive programming is sometimes referred to as Dataflow programming on steroids. If we would like to know more about the history of Reactive programming, I placed a lot of links in the resource repo of this article. Please have a look at them at the end of this article.

2.Reactive Programming in General

Reactive programming has many implementations, and they are all based on the observer pattern. In reactive programming terms this is also known as the publisher-subscriber pattern. Before the publisher there is a producer of events. This essentially is the bridge between the triggered event and the publisher. Once a request comes in, the producer triggers the event handler. The publisher is an implementation of that.
It processes and emits data without any knowledge of the receiver. The receiver on its own is the one who knows that data is coming through via a specific channel. It knows the type of data coming in. The receiver is known as the subscriber of that data and it basically waits for the data to be delivered. A subscriber is subscribed to a publisher.

3.Case study

3.1.Notes

In this article I will try to define in few words what actually Reactive Programming is in a language that anyone can understand before making a deep dive into the code. We will not need any major technological background to understand it. I am going to show an example, and we will try to see and identify the Reactive principles as we go along. In this article I will guide us through problems found along implementing reactive programming and the possible solutions. I will describe pro’s and con’s of each implementation. At the bottom of this tutorial we will find the most plausible solution I found for the case I’m presenting.

3.2.Introduction

In many current running services across the globe, many companies are still using services from quite a long time ago. Some systems even still use the 8 inches (ca. 20 cm), 80Kb disk. For these systems, the software still has to be limited, still has to be old and still cannot change. This usually happen mostly to economic and effort constraints. Applications for the military, airports, air companies and banks sometimes aren’t really up to date.
We have seen news of Airports still using Windows 3.11 and banks using COBOL. This is also a reason why companies like Pivotal are so busy with application transformations and the 12 factor applications. They are offering other solutions to bring systems up to date to current realities and challenges. Many times this just cannot be done in one step, no matter how positive we feel about it.
Let’s have a look at bank examples or even telecommunication companies. In my experience Telecom and Banks many times do still work with SOAP services, WSDL service definitions, XSD’s, XLTS’s and so forth. On a technological perspective SOAP really only has the downside of giving a lot of overhead and package load. For the rest everything is debatable. But this isn’t the problem I want to zoom in.
If these SOAP services were efficient, resilient, scalable, responsive and fast enough, then discussions about them would really just boil down to aesthetics rather than anything else. Truth is that many times their design doesn’t really allow much flexibility and working around them can seriously complicate our development. Not only that, frequently SOAP services are designed with many dependencies in mind, many interdependent keys, different ports and methods, different types with the same name and many other intricacies also seen in typical REST JSON based services.
In order to avoid any personal relation to any company I’ve worked with in the past I have decided to make up an example. Let’s say we have a population where people live in different Shells. That is our root element. Every shell has a slogan. In my example there are 16 slogans, and they are all sentences from the Cardi B’s rap solo in Maroon 5’s Girls like You. Shells will have people and will gather costumes.
For people I picked up 16 different characters from Game of Thrones. Each costume has a top and a lower and that refers to the top clothing like a shirt and the lower clothing like some knickers.
Each person will have costume and an account with a value. Take note that I am not concerned in this example with validationrepeated data or circular data.
We are going to investigate the complexity of making a sub requests of a request in a Reactive Programming web application.
This is a summarized overview of what we are going to look at:
In the above we can see that there is an event loop which handles the events. The events are produced by the REST controller. Afterwards, they are handled by the event handler of the publisher. All our SOAP calls and requests are made here. At this point we have left the realm of the servlet thread. One way to see this is that in our built in application and at this point, we won’t be able to break any execution with any breakpoint outside the programmed consumers or any of the handlers.
The event has just been triggered. At that point, our REST controller has already built a publisher (Flux or Mono) and our service is now executing it. Netty is implementing our subscriber to execute the callback to the browser via HTTP. We can only have one subscriber per publisher, but we can have many consumers that can be triggered in independent threads via a doOnNext method consumer implementation for example.
We can find many other alternatives to doOnNext in the API. A publisher allows us to apply transformations in a similar way that lambdas do. We also have map and flatMap methods for example. This pipeline represents a series of declarative code transformations. We can apply it sequentially to our data. One of the very important things to remember is that nothing really happens until the subscriber is called.
In our example application we won’t find any explicit subscriber in our code. This is because Netty is our subscriber and its code resides underwater. If you are interested in seeing the code more deeply please investigate three important classes:
  • reactor.core.publisher.FluxMapFuseable
  • org.springframework.http.server.reactive.ChannelSendOperator
  • reactor.netty.http.server.HttpTrafficHandler
You’ll be able to breakpoint in between them and then get a better sense of what’s happening when executing the callback to the browser.
This concludes my introduction. We will now dig into the code I’ve done and review a few major examples:
  • Data Model
  • Architecture Overview
  • Making publishers
  • Services
  • In practice
  • A reactive solution
  • The ZIP solution
Bear in mind that this code is specifically made for tutorial purposes and given its complexity I will only explain the relevant parts to this tutorial. I will be adding a PDF to the repo at some point explaining how I set up the SOAP clients with Apache CXF.
Another constraint in this exercise is that I couldn’t make Blockhound work with Java 13. It just doesn’t seem to detect blocking calls with my Java 13 (13.0.1.hs-adpt) version. This is why everything I have is implemented with Java 12 (adoptopenjdk-12.0.1.). We’ll see further what Blockhound is and how it can be useful for us.
In this article I’m not showing complete class implementations because of the fair complexity and size of the whole code. It’s been prepared in a tutorial fashion.

3.3 Data model

In order to understand which data we are trying to get, let’s first have a look at our data model:

3.4. Making publishers

I have mentioned declarative programming techniques. Essentially what we are going to do is the implementation of the Observable pattern. In Spring WebFlow that can be very easily done using Flow and Mono. These are two forms of Publishers which are very much analogous to Flowable and Observable in RxJava 2. So we are going to declare what do we want to happen with our data once we receive it before our subscriber, Netty, can pick that up.
I will guide us through the ShellRepositoryImpl class and another one in our repository layer. These should cover the concepts of using legacy code in a reactive way. Note that the repository concept I am using here is only to allow possible future implementations of an actual reactive JPA repository using as an example the already reactive implementation of JDBC known as R2JDBC. In our case, they are just calls to SOAP clients.
All the publishers in our example are being made with code very similar to this:
public Mono<Shell> findSeaShellById(final Long id) {
    return Mono.fromCallable(() -> seaShellsWSDLShellsClient.getItem(id)).subscribeOn(elastic());
}

public ParallelFlux<Shell> findAllSeaShells() {
    return findAllShellIds()
            .parallel(parallelism)
            .runOn(elastic())
            .map(seaShellsWSDLShellsClient::getItem)
            .runOn(single());
}

public Flux<Long> findAllShellIds() {
    return Mono.fromCallable(seaShellsWSDLShellsClient::getAllShellIds)
            .flux().flatMap(Flux::fromIterable);
}
Let’s examine the findAllSeaShells method. The goal of this method is to retrieve all the shells in the repo. In this code we have the seaShellsWSDLShellsClient client. This client communicates with one  webservice specifically responsible for the retrieval of SeaShells data.
The first method in the lambda chain is the findAllShellIds method. The main highlight of this code above is the Mono.fromCallable. This is the function to use if we intend to call a function from a black box. In our case our black boxes are our SOAP clients. Although we can predict all of our code, this is a simulation of a real case scenario. In such situations we won’t be able to completely predict what and how the client responds.
From the API we can read precisely how it works. From here we can see that the biggest advantage is that we can program this Mono to return null if it fails: “If the Callable resolves to null, the resulting Mono completes empty.”. Mono publishers are used to represent single objects.
Flux publishers are used for lists of objects. The fromCallable method, is implemented in the Mono publisher only. It would probably seem better to use the Flux.fromIterable method. However, by allowing a conversion from Mono to Flux, we are letting the WebFlux framework optimize the later execution and conversion from a List object to individual objects to the subscriber. A direct Flux.fromIterable conversion would skip that optimization step.
This conversion to Flux is explicitly implemented with a flatMap method. Notice how similar these methods are to typical Java lambdas? This similarity is part of why it’s so easy to use declarative programming paradigms in WebFlux instead of imperative programming. Essentially we are declaring the chain of events that is going to happen using lambdas! Now that the publisher knows it needs to get a list of Id’s, all we need is the actually shells that belong to those Id’s. At this point we still preserve a one to one relationship. In other words for each Id we have a shell.
Therefore we have it very clear we do need to wait for the Id in order process our following request. By default, processing is sequential. We need parallel processing because we want our application to be as reactive and responsive as possible. As a consequence we need to explicitly define in Flux that we are going to get our shells in a parallel fashion. This is where parallel comes into play. it is configurable with a parameter I’ve created called sea.shell.parallelism.
Here we can configure the maximum parallel threads we want to create. For Flux this is still not enough. After this, we need to specify the type of thread management we want and for this we have different types available. In my example I am using the elastic type. This is ideal because we are dealing with legacy code which potentially can be very slow. At the same time, in my example we can have unlimited threads generated per person in shell request.
From the API we have this definition: 
“Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) can grow indefinitely”.
Following this, we will have our Id. Notice that I am still talking in a future tense. We are still declaring code and nothing is happening. With our Id we will be able to map that stream to another transformation. We will then get our actual Shell. This is our instance coming out of our soap client. Since we are running everything in parallel and it’s a small request can safely determine that our Scheduler is of a single type. From the definition we have “Optimized for fast Runnable non-blocking executions”. Notice that because of all of the above our Flux is now of a ParallelFlux type.
All the other methods presented in this implementation are simple methods implemented in a very Naif way to get the data in one go in a blocking way. Further in this presentation we’ll have a look at differences in behaviour.

3.5. Services

Let’s have a look at our an example from our Services implementation. I have prepared three sorts:
All of them are different implementations of examples we could use to handle our data. The first implements a series of methods using a naif approach to getting our data all at once. I think it’s important to delve into this, because we’ll see different behaviours we want to avoid.
The second is an example of how we could use a microservice approach, by making the client complete the chain. Finally the last one is a complete reactive implementation using nothing but chained publishers. There are fundamental examples which will help us understand how Reactive Programming works. We will also gain a clear idea on the problems that we may face in seemingly “obvious and easy” solutions. The latter is the example which in the end makes more sense to me to use. I will also explain the reasons why I came to that conclusion.
Let’s start by having a look SeaShellServiceImpl:

@Value("${sea.shell.parallelism:20}")
private Integer parallelism;

@Value("${sea.shell.delay.ms:100}")
private Integer delay;
As we can see we have two inject properties: parallelism and delay. We already know what parallelism is. In regards to delay, this comes from a property called sea.shell.delay.ms in the application.properties file. We will see in detail what this property is about, but for the time being let’s look at all of the presented solutions. In this class, I’m using the naif word to represent the blocking methods.
However we can look at all of them as naif implementations. Let’s have a look at the methods implementations of this service. I will then guide us through the practical example with the running application. We’ll have a look at what we can learn from these examples.

3.5.1. Example 1 — Using consumers

Let’s look at getShellById in SeaShellServiceImpl:
public Mono<SeaShellDto> getSeaShellById(Long id) {
        return shellRepository.findSeaShellById(id)
                .map(SeaShellConverter::toShellDto)
                .doOnNext(consumerPersons())
                .doOnNext(consumerCostumes());
}
It gets a shell, creates it’s matching Dto and then tries to change the Dto further using a chain of consumers. If we look inside any of the consumer methods we’ll see an implementation of a subscriber. This subscriber is the last destination of the data being sent through. Notice also that the data type doesn’t change through the doOnNext methods. If we look in detail into the implementation we see that I’m filling up all the chained tree of a Shell with its Person’sCostume’sAccount’sTop’s and Lower’s.
Essentially I’m trying to get a complete Shell across and returning a ShellDto. Unfortunately, although the idea behind seems good, it goes against Reactive Programming principles and it does not guarantee that the objects are returned completely filled with data anyways. It goes against Reactive Programming principles because we are thinking about waiting for the Person and Costumes and all its underlying needed methods to make all their calls and fetch results.

3.5.2. Example 2 — Using a very naif approach

The naif approach to this problem doesn’t use any publishers in its implementation. In this method we’ll basically perform all necessary queries by asking and waiting for the response. This uses the traditional Spring MVC architecture and its purpose in this exercise is to show us how this approach differs from a Reactive Programming approach. Note that I could have used here ForJoin’s, CompletableFuture’s, Future’s, Executor’s, etc, to trigger parallelism. This is a more complicated option, which doesn’t add anything anything to a reactive programming way of building applications. It would still force us to wait for all sub threads to finish before letting go of the main thread. It is represented by methods
  • getAllSeaShellsNaifBlock
  • getSeaShellNaifBlock 
  • and setMainRootElements.
These are all imperative programming examples:
public List<SeaShellDto> getAllSeaShellsNaifBlock() {
    return shellRepository.findAllSeaShellsBlock()
            .parallelStream()
            .map(SeaShellConverter::toShellDto)
            .peek(this::setMainRootElements)
            .collect(toList());
}

public SeaShellDto getSeaShellNaifBlock(Long id) {
    final SeaShellDto seaShellDto = SeaShellConverter
            .toShellDto(shellRepository
                    .findSeaShellBlockById(id));
    setMainRootElements(seaShellDto);
    return seaShellDto;
}
However further in this article I will discuss precisely the ForkJoin method and why it’s very unlikely that we would be able to make the method non-blocking in this way.

3.5.3. Example 3 — A reactive-ish approach

Looking at example 2, you may have had this idea: Why make the naif implementation multithreaded and then “wrap up” that method with a publisher? Truth is, we could, and in reality that would indeed be a non-blocking example. We would get all the data in one go and we would not block the main thread. Unfortunately for this solution, Reactive Programming also means fast, scalable, and more importantly very very responsive. With this approach, we are still making the user wait, although in a different way, for all the requests to be done. Please look at the solution implemented in getAllSeaShellsReactiveBlock:
public ParallelFlux<SeaShellDto> getAllSeaShellsReactiveBlock() {
    return Mono.fromCallable(this::getAllSeaShellsNaifBlock)
            .flux().flatMap(Flux::fromIterable)
            .parallel(parallelism)
            .runOn(elastic());
}
In this example we moved the problem out of a main thread that waits for the soap call to be done. However, we only re-located it to the single thread in the event loop. Blockhound will confirm that however it has become a non-blocking method. However in this way, we are letting the responsibility of dealing with this thread blocking system on ourselves. WebFlux is only responsible to essentially wrap this otherwise blocking method.

3.5.4. Example 4 — A waiting reactive approach

In the same way that example 3 is an upgrade to example 2, example 4 is an upgrade to example 1. Notice the difference between getAllSeaShellsReactiveWithDelay and getAllSeaShells.
See the delayElements method? From the definition “duration by which to delay each Subscriber.onNext(T) signal”. This means that each one of the signals will be delayed by a certain about of time. By default we have established 100ms in this delay. We will see that this is enough to get a complete data. Furthermore we will also see how it actually works. Remember that being an upgrade of example 1, this is still not our solution:
public Flux<SeaShellDto> getAllSeaShellsReactiveWithDelay() {
    return getAllSeaShells()
            .sequential()
            .delayElements(ofMillis(delay))
            .subscribeOn(elastic());
}
Looking at the above code, we can see that basically we are just guessing the time it will take for the getAllSeaShells() to complete. Furthermore we are always making sure that for each request, the request will wait for that specific time. In other words, we just made a non-blocking request that takes at least a specific time to return a response.

3.5.5. Example 5 — Wrapping and Fork Joining

Up until here we have made no mention of multithreading. Now we are going to do just that. In this same class let’s have a look at method getSeaShellCostumesForkJoinTask:
public Flux<SeaShellDto> getAllSeaShellsReactiveWithForkJoins() {
    return Flux.fromStream(shellRepository.findAllSeaShellsBlock().parallelStream()
            .map(shell -> {
                final ForkJoinPool commonPool = new ForkJoinPool(100);
                final SeaShellDto seaShellDto = SeaShellConverter.toShellDto(shell);
                final Stream<ForkJoinTask<SeaShellPersonDto>> personDtoStream =
                        commonPool.invoke(getSeaShellPersonsForkJoinTask(commonPool, seaShellDto));
                final Stream<ForkJoinTask<SeaShellCostumeDto>>
                        costumeDtoStream = commonPool.invoke(getSeaShellCostumesForkJoinTask(commonPool, seaShellDto));
                seaShellDto.setPersons(personDtoStream.map(ForkJoinTask::join).collect(toList()));
                seaShellDto.setCostumes(costumeDtoStream.map(ForkJoinTask::join).collect(toList()));
                return seaShellDto;
            }));
}
In this method, we are using a fromStream method. Notice that there is a ForkJoinPool. If we run the unit tests and see how they are implemented:
@Test
public void findAllCompleteSeaShellsReactiveWithForkJoins_onCall_thenBlocking() {
    assertThrows(WebServiceException.class, () -> delay(Duration.ofSeconds(1))
            .doOnNext(it -> seaShellService.getAllSeaShellsReactiveWithForkJoins())
            .block());
}
The exception we’re catching here is WebServiceException. This exception is thrown by blockound in the chain when it finds a blocking call. We are using a publisher, the publisher is being used and yet, Blockhound is still saying that that we have a blocking method. The only difference between this method and all the other methods is that we are using a ForkJoinPool within the wrapper. In this case it seems that this happens because of where the ForkJoin pool is created. When a thead joins the commonPool, it will block the main thread. It’s as simple as that.

3.5.6. Example 6 — A reactive client

One simple way of making everything reactive is to wrap up all your methods with publishers. Not much work is involved in this solution because we are only making sure that we pass the simple publishers we made in our repos all the way to the controllers. Our services work almost as simple proxies. This means that they do have an extra change added. This is simple the conversion of the data to a Dto.
Let’s take a look an example in the SeaShellReactiveOneServiceImpl class:
public Mono<SeaShellDto> getSeaShellById(Long id) {
    return this.shellRepository.findSeaShellById(id)
            .map(SeaShellConverter::toShellDto);
}
In this way you can see that the only change is a map to a ShellDto. This is one of my elected solutions to consider when thinking about creating a reactive framework around legacy services.
We can find the example of our reactive client in the future package. This client is implemented in class SeaShellWebReactiveOneClientThere is an implementation with a subscriber. At the end we wait for 1 second in order to let the subscriber finish its work:
private void consumeReactively() throws InterruptedException {
    getAllSeaShellsReactively().subscribe(x -> log.info("BLOCK->" + x.toString()));
    Thread.sleep(1000);
}

3.5.7. Example 7 — A fully reactive approach

In some cases, we may find it more convenient to make sure that we get a few important things:
  1. Performance
  2. All data at once
  3. Keep it non-blocking
  4. Keep it simple
  5. Achieve true parallelism
Achieving an improvement in all of these points at the same points is highly unlikely. If we get all the data at once, we will reduce performance. If we want to keep it simple, then chaining everything together may work against us. To keep it non-blocking, we may be moving a lot of performance to the single thread that deals with everything.
The best way to answer this is: Experience will tell us everything we need to know about these decisions. If after weighing all pro’s and con’s of taking a fully reactive approach, we are still convinced, then let’s look at how this would work.
Let’s start with the example on class SeaShellReactiveServiceImpl:
public ParallelFlux<SeaShellDto> getAllSeaShells() {
    return shellRepository
            .findAllSeaShells()
            .map(this::fetchSeaShellPublisher).flatMap(Flux::from);
}
The fetchSeaShellPublisher is a method that returns a publisher that will start our publishing process. Notice that we are mapping a flux of SeaShells coming from our SOAP services. When mapping from a Publisher to another publisher, we will need to flatMap everything. Let’s keep this in mind before we move on. It’s important to understand that we are still only declaring and nothing has actually happened in terms of processing.
Let’s have a look at fetchSeaShellPublisher in more detail:
private Mono<SeaShellDto> fetchSeaShellPublisher(Shell seaShell) {
    final SeaShellDto seaShellDtoReturn = SeaShellConverter.toShellDto(seaShell);

    return zip(
            fetchPersonsPublisher(seaShell, seaShellDtoReturn).subscribeOn(Schedulers.parallel()),
            fetchCostumesPublisher(seaShell, seaShellDtoReturn).subscribeOn(Schedulers.parallel()),
            (persons, costumes) -> seaShellDtoReturn);
}
See that we are using the Mono.zip method. This is one of the many ways to chain publishers together. We are going to have look at this one in particular. Later I may make a whole article about the improvement of this implementation. What’s important for now is to see chaining of publishers in action. With the zip method we can chain publishers together and implement its result.
A very nice feature of zip is that when we subscribe the participating publishers on a parallel scheduler, they actually work in parallel. In our data model actually have a lot of pairs. In the root of the shell we have a two elements. A list of persons and a list of costumes. In the root of our costumes we have another pair. A top and a lower. In the root of person, yet another one. An account and a costume. Notice that costume in a person follow the same data model as person.
Let’s look at one of these two methods to see how the chain works. This is fetchPersonsPublisher:
private Mono<?> fetchPersonsPublisher(Shell seaShell, SeaShellDto seaShellDtoReturn) {
    return from(from(just(
            seaShell.getPersons()).subscribeOn(parallel()).map(shellPersonRepository::findPersonsBlock).subscribeOn(parallel())
            .map(persons -> {
                persons.forEach(person -> seaShellDtoReturn.getPersons().add(SeaShellConverter.toShellPersonDto(person)));
                return seaShellDtoReturn.getPersons();
            }))
            .thenMany(zip(
                    fetchPersonAccountPublisher(seaShellDtoReturn).subscribeOn(parallel()),
                    fetchPersonFullCostumePublisher(seaShellDtoReturn).subscribeOn(parallel()))));
}
Notice the thenMany method.This method sets up the first element in our persons publisher chain. We know that in a persons element we should pickup and account and a costume at the same time. As we now are aware, this is where the zip method comes in to essentially zip those two publishers together and in parallel as one single publisher! We should be able to easily interpret the rest of the code with this explanation.
When all the chains are complete, we can make sure that our thread will run through multiple publishers in a non-blocking way. The returned publisher is a result of this chaining and it will be handled independently in one separate thread without interfering with the main thread. It will of course generate different threads and it will evidently generate parallelism. The difference is that no interference with the main thread will be made.

3.6. Controllers

Let’s run our SOAP service and our SpringBoot Application to analyse this situation.
Run the soap service.
cd sea-shell-soap-wiremock/sea-shell-soap-service/target
java -jar sea-shell-soap-service-0.0.0-SNAPSHOT-jar-with-dependencies.jar  
Run our Spring Boot service:

cd sea-shell-service-spring-web-flux/sea-shell-rest-service/target  
java -jar sea-shell-rest-service-0.0.0-SNAPSHOT.jar
Let’s study on more detail our four controllers:
If we take a look at them, there are quite a few methods implemented. We’ll have a look the reactions of all of these calls to our service. For every example please take a moment to review the consumers chapter if you think you’ve missed something. We can use curl for this or our browser.
For the purpose of readability and to make a faster assessment of each call I think it’s better to use a browser. I used Chrome and the JSONView plugin for this. In some examples I need to return objects of different types. Coincidently they are always pairs and that’s why I’m using the Spring Data Pair type to return these objects.

3.6.1. Example 0

In order to have a clean start I decided to create an example 0 to show a couple of basic things that happen when making a request to the back end.
Let’s look at the getShellSlogans method. Make the following request:
In this method I’m just mapping the results of a non-blocking call defined in method getAllSeaShells to its respective slogan. The reason I’m mapping them and not calling one method to get the slogan directly, is so that we can see that, however we have implemented it with several calls inside and several subscribers defined in the consumers, nothing changes in terms of performance.
Now let’s check our logs. We should be able to see all the subtree of the shells being called. In some cases the values are null, others empty and in other cases the values are all there. In the following examples I will explain this further. But for the moment, The important part of the this example is that our method is returning a ParallelFlux.
In this case the point is to see that we have achieved this and that every Pair we return is not following the actual order of Cardi B’s lyrics:
[
{
first: "Baltic tellin",
second: "I need you right here 'cause every time you call"
},
{
first: "Baltic tellin",
second: "I play with this kitty like you play with your guitar"
},
...
We may get lucky and we may indeed get the whole lyrics right for the complete 16 slogan segment, but that is highly unlikely. We all know that Cardi B starts her rap with “Not too long ago, I was dancing for dollars (eeoow)”.

3.6.2. Example 1

Let’s recall from the Services explanation that this example relies on the doOnNext consumer declarations to run. Let’s try calling this multiple times in our browser:
If we are lucky, we’ll be able to notice that sometimes we get all the data, but most of times, much of the data is actually null! Let’s look at the following two sequence diagrams which explain what happens:
Above we can see a normal flow of data. We see that all subscribers are being called in parallel and that the doOnNext works perfectly and in sequence. By the time we get to our Netty subscriber, we are sure to get the seaShellDto object filled up with data.
In most cases we won’t get the full data:
From the above, we see that the event call back has been emitted before the doOnNext consumers have ended their execution. In this case, the seaShellDto object has not been completely filled. Therefore we will be getting nulls.

3.6.3. Example 2

In our naif example example, we are always sure to return our data completely. It has been implemented in a traditional blocking fashion. Still, it is important at this point to have a look at what happens behind the scenes in such a blocking processing type:
Comparing this model with the reactive programming model we see an enormous difference in complexity. The blocking model seems more simple. However, what happens in this case is that the main Servlet thread for each request is completely blocked until we get a response back from our SOAP service. This will limit the ability of the service to cope in scenarios were high availabilty from the server is required.

3.6.4. Example 3

Our reactive-ish example encapsulates all our naif implementation from Example 2 and creates a publisher with it. It becomes a non-blocking thread, but still does not benefit from any parallelism. This greatly the reason why such an easy approach is also so easily found to not be very efficient.

3.6.5. Example 4

The waiting approach of example 4 basically allows the publisher to delay its execution for however amount of time needed. We have setup 100ms in our example. We can also configure that.
This is what happens:
Notice that with the 100ms delay, we guarantee that all doOnNext consumers are executed. However we are also guaranteeing that we have waiting time of 100ms for whatever request we make from the front end. This has a deprecating effect on performance.

3.6.6. Example 5

We could have gotten the idea that somehow, wrapping up a method in a publish such as Mono or Flux, we would always create a non-blocking call. This is why I found the ForkJoin example a very good one. The act of joining a pool or submitting a Task, is actually a blocking action on its own. This means that the simple fact that the ForkJoinPool is being used, renders the whole publisher a blocking publisher and block hound will detect this publisher as being a blocking publisher.
In the above diagram, everything is as simplified as possible to get an overview as to what is happening behind the scenes. In this case WebFlux cannot work independently of the the main Servlet Thread per request.

3.6.7. Example 6 and 7.

Finally at the end of our investigation we reach examples 4 and 5. These example match everything we discussed on our introduction. These two examples are non-blocking, completely WebFlux supported and they provide you with data as needed. They do have different paradigms but they have already been explained in point 3.5. In example 7, though paradigms changed and it’s important that we realize that in this implementation we have voided doing anything sequential.
For example we do not process pairs in the same thread or sub-threads. All the necessary threads in example 7, only fill up one object and they take the whole seaShellDto as a common shared resource. The following diagram is an overview over what happens during the processing of all publishers in example 7:
With the above example, what I am trying to explain with all the arrows is that one chained block is dependent on the other and that in reality they do wait for each other individually. For example, we cannot start requesting a Top if we don’t have a Costume. We can however ask for the Top individually without considering the Lower.
These two are completely separate publishers, which constantly walk through the seaShellDto objects in order to fill it up. There are no arguments being passed around through the different publishers created with zip. This is why, for this example, I’ve place seaShellDto in a location where it can be globally accessible through all the different publishers.

4. Listings

4.1. Rest Services

http://localhost:8080/seashells
http://localhost:8080/seashells/{id}
http://localhost:8080/seashells/slogans
http://localhost:8080/seashells/block
http://localhost:8080/seashells/block/{id}
http://localhost:8080/seashells/reactiveblock
http://localhost:8080/seashells/reactiveWithDelay
http://localhost:8080/seashells/reactiveWithForkJoins
http://localhost:8080/seashells/one/
http://localhost:8080/seashells/one/{id}
http://localhost:8080/seashells/one/person/{id}
http://localhost:8080/seashells/one/costume/{id}
http://localhost:8080/seashells/one/account/{id}
http://localhost:8080/seashells/one/top/{id}
http://localhost:8080/seashells/one/lower/{id}
http://localhost:8080/seashells/reactive
http://localhost:8080/seashells/reactive/{id}

4.2. Test Rest Services

http://localhost:8080/seashells/reactive/rootCostume/{idTop}/{idLower}
http://localhost:8080/seashells/reactive/rootShell/{idPerson}/{idCostume}
http://localhost:8080/seashells/reactive/rootCostumeSlowTop/{idTop}/{idLower}
http://localhost:8080/seashells/reactive/rootCostumeSlowLower/{idTop}/{idLower}

5. Conclusion

With this article my main goal was to walk us through the processes I’ve been through to discover Reactive Programming and most of its intricacies. In the end, our examples 6 and 7 seem to be the better approaches for whatever we want to develop. This way of programming is heavily based on Publisher, Subscriber principles.
This can make code slightly more difficult to understand, but on the other hand, good code many times is hard to understand. Imperative programming means that you can follow the code along a debug line. With principles of reactive programming, we are basing all of our code on declarative programming. We are thus, creating code that although it may improve efficiency, it does also require more effort to understand, change and update. 
WebFlux provides a framework which mitigates the complexity issue very well. As we have seen making publishers isn’t difficult at all. Neither is understanding them. Furthermore, we can easily debug them. Though not in a traditional way. Reactive programming isn’t always a best decision.
As engineers we have to make decisions and for reactive programming the reasons to make this step has to be more than just improving performance. Let’s enumerate some of the reasons to move to a Reactive Programming paradigm and in this case with WebFlux:
  1. High availability — Requests should not block each other.
  2. Performance — We need quick responses.
  3. High output — We need to manage high volumes of data.
The simple answer to why do we need Reactive Programming is sometimes simplified to as: It’s just faster!. It is, but are we sure we need it? For systems that depends on legacy so much, Is this really going to make our application faster? If so, how faster? Remember that we still have to wait for our legacy services to respond. The only thing that we are changing with reactive programming is eliminating the blocking phenomenon that happens in the main thread.
Does this hypothetical change has the potential to change our application so much?
If we are thinking about changing our REST service model, my best advice to is to just try it. Let’s make benchmarking tests. Check our needs. Reactive Programming and WebFlux are amazing top notch technologies which should be used if we are managing high amounts of data. If we are managing legacy systems it can also be very good potential first step to make everyone start thinking about a possible migration of the whole system to a better one.
Let’s make a quick recap of the terms we saw.
  • FluxParallelFlux and Mono are often used publishers.
  • The doOnNext method declares consumers to be executed after the publisher is done. The subscriber does not wait for them. Calling the subscriber allows us to implement functions that happen after the publisher is done with the eventHandler.
  • The zip method, allows us to create one publisher out of others.
  • The thenMany allow us to call another publisher after the current one, returning another publisher as a result. 
  • ForkJoinTasks do not work with WebFlux, because they use the main thread when joining the ForkJoinPool.
I trust that sharing my experience and my project, helps anyone interested in WebFlux to understand how this whole process works. I also hope that the examples I’ve placed on GitHub, can help you see the difference between different implementations.
Take note that my idea of creating a publisher a chain of publishers using zip, is just one of the many options WebFlux provides for chaining Publishers.
I hope you enjoyed this article as I much as I did making it! Please leave a review, comments or any feedback you want to give. I’m very grateful if you want to help me making this article better.
I have placed all the source code of this application in GitHub
Thank you for reading!

Tags

The Noonification banner

Subscribe to get your daily round-up of top tech stories!