In today's fast-paced and data-driven world, efficiently handling and processing large volumes of data can be crucial. Spring Webflux, a part of the Spring framework, provides a non-blocking, reactive approach to handling data streams, ensuring low resource consumption and high throughput. This article explores the power of Spring Webflux and its practical applications.
We will use the 4chan public API as our data source in this tutorial. The choice of the 4chan API is not an endorsement of the platform but a practical decision based on its public availability, lack of authentication requirements, and the nature of its data, which is suitable for demonstrating the capabilities of Spring Webflux.
Throughout the article, you will learn how to fetch data from the 4chan API, stream it continuously using Spring Webflux, and process the data to gain insights into the platform's metrics.
The git repository for this guide can be found here.
The 4chan Metrics Collector project is organized into two Gradle submodules: app
and api-client
. The settings.gradle.kts
file is responsible for including these submodules in the project.
The build.gradle.kts
file serves as the project's main build configuration. It defines the plugins, Java compatibility, configurations, repositories, and common dependencies for all submodules.
Notable dependencies for this project include:
The app
submodule, configured in app/build.gradle.kts
, has a dependency on the api-client
submodule. The app
submodule is a Spring Boot application responsible for running the metrics collector. The api-client
submodule, configured in api-client/build.gradle.kts
, is a library containing the implementation of the API client used by the app
submodule to access 4chan public API.
While the primary focus of this article is on using Spring Webflux for streaming data from the 4chan API, the project also demonstrates several other valuable features, such as:
In this section, we will explore how the application fetches data from the 4chan API, streams the data using Spring Webflux, and processes it to collect and publish metrics.
The process starts with the MetricsCollector
class, which schedules periodic requests to the 4chan API based on the configured boards in the application.yml
file. For each board, a Flux
is created to emit data, and these Flux
instances are merged into a single Flux
that the application subscribes to. As the data streams through the subscription, the application publishes metrics to Prometheus.
The diagram below describes the flow of the application - Each configured board continuously emits messages containing metadata about their active threads obtained from the 4chan public API, those streams are merged into a single continuous stream for all active threads across the configured boards, and as each message is processed in the subscription, metrics are published to Prometheus.
@PostConstruct
public void produce() {
boards.stream()
.map(board -> Flux.create(emit(board)))
.reduce(Flux::merge)
.orElse(Flux.empty())
.flatMap(Function.identity())
.doOnNext(this::publishThreadMetrics)
.subscribe();
}
The entry point of the application is the produce
method in the MetricsCollector
class. Thanks to the @PostConstruct
annotation, this method executes when the application starts up. In this method, all configured boards are streamed and each is mapped into a Flux
responsible for emitting active thread data for the board.
At this point, we have a stream of separated Flux
objects, that is then reduced into a single Flux
containing all active threads for all configured boards. Finally, it subscribes to start the process of data consumption, and as each next element goes through the stream, thread metrics are published accordingly.
The emit
method in the MetricsCollector
class configures a scheduler to execute requests periodically:
private Consumer<FluxSink<Flux<FourChanThread>>> emit(final String board) {
return emitter -> scheduler.scheduleAtFixedRate(
execute(emitter, board),
random.nextInt(REQUEST_DELAY_SECONDS),
REQUEST_INTERVAL_SECONDS,
TimeUnit.SECONDS);
}
The random delay ensures that not all threads issue requests to the 4chan API simultaneously.
The execute
method invokes the ApiClient
's getThreads
method to fetch data from the 4chan API:
private Runnable execute(final FluxSink<Flux<FourChanThread>> emitter, final String board) {
return () -> {
emitter.next(apiClient.getThreads(board));
};
}
The ApiClient
class uses a WebClient
instance to make reactive HTTP requests to the 4chan Public API. The getThreads
method returns a Flux<FourChanThread>
that represents the data stream of active threads for a specific board.
Each call to the threads endpoint returns a list of pages, and each page contains a list of threads. The call to bodyToMono
parses the response JSON into a Publisher
of a single list of pages. The subsequent mapping calls are used to unpack the threads from the lists that contain them, generating a Flux
of threads. The last mapping is used to enrich the thread metadata with the specific board to which the thread belongs.
public Flux<FourChanThread> getThreads(String board) {
log.debug("Issuing request to get threads for /{} board", board);
Endpoint endpoint = Endpoint.THREADS; // Endpoint is an enum that defines the API endpoints and is used to build the endpoint path
ParameterizedTypeReference<List<FourChanThreadList>> typeReference =
new ParameterizedTypeReference<>() {
};
return webClient.get()
.uri(endpoint.getEndpoint(board))
.retrieve()
.bodyToMono(typeReference)
.timeout(TIMEOUT)
.doOnError(throwable -> log.warn("Error when issuing request to get threads for /{} board", board, throwable))
.flatMapMany(Flux::fromIterable)
.flatMap(threadList -> Flux.fromIterable(threadList.getThreads()))
.flatMap(thread -> Mono.just(thread.withBoard(board)));
}
Testing Webflux applications can be quite challenging due to the asynchronous and non-blocking nature of reactive programming. However, with the right approach and tools, you can write effective tests that ensure the proper functionality and performance of your application. In this section, we will discuss the challenges of testing Webflux applications, use our project's unit tests as examples, and provide tips for writing effective tests.
Some of the challenges when testing Webflux applications include:
WebClient
used to make HTTP calls. Without proper care, it is easy to get tangled in a complex structure of methods that need to be stubbed and subsequent objects that need to be mocked.In our project, we have two test classes, MetricsCollectorTest
and ApiClientTest
, that cover different aspects of the application's functionality. These tests serve as examples of how to handle the challenges mentioned above:
The MetricsCollectorTest
class tests the asynchronous behavior of the produce()
method using Thread.sleep()
to wait for the expected execution time before making assertions.
metricsCollector.produce();
Thread.sleep(TimeUnit.SECONDS.toMillis(REQUEST_DELAY_SECONDS) + 1L);
The ApiClientTest
class uses the StepVerifier
from the reactor-test
library to ensure the expected behavior of the getThreads()
method.
Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
.expectNext(...)
.verifyComplete();
The ApiClientTest
class tests the error handling of the getThreads()
method by simulating a missing board and ensuring that a WebClientResponseException
is thrown.
Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
.expectErrorMatches(throwable -> throwable instanceof WebClientResponseException)
.verify();
The ApiClientTest
class tests the timeout handling of the getThreads()
method by simulating a delayed response, ensuring that a TimeoutException
is thrown.
Flux<FourChanThread> result = apiClient.getThreads(board);
StepVerifier.create(result)
.expectErrorMatches(throwable -> throwable instanceof TimeoutException)
.verify();
In the ApiClientTest
class, we inject a tailored WebClient
in the ApiClient
instance being tested with a specified Exchange Function. This allows us to control the behavior of the WebClient
and create test cases suitable for different scenarios.
private WebClient createWebClient(HttpStatus httpStatus, Duration delay, String body, String contentType) {
ClientResponse clientResponse = ClientResponse.create(httpStatus, ExchangeStrategies.withDefaults())
.header(HttpHeaders.CONTENT_TYPE, contentType)
.body(body)
.build();
ExchangeFunction exchangeFunction = request -> Mono
.just(clientResponse)
.delayElement(delay);
return WebClient.builder()
.exchangeFunction(exchangeFunction)
.build();
}
In this article, we have explored the power of Spring Webflux for handling large volumes of data from REST APIs, using the 4chan public API as our data source. We learned how to fetch, stream, and process data using Spring Webflux, and examined the project structure, dependencies, and various features such as Gradle project configuration, starting and stopping threads, and custom Spring configuration.
We also discussed the challenges of testing Webflux applications and provided examples from our project's unit tests to overcome these challenges. We highlighted the importance of handling asynchronous behavior, error handling, and timeouts in testing and provided tips for writing effective tests for Webflux applications.
By understanding and applying the concepts presented in this article, you will be better equipped to harness the capabilities of Spring Webflux in your own applications. As a result, you can build efficient, data-driven applications that effectively process and analyze large volumes of data from REST APIs.
Featured image by Snow White