In today's fast-paced and data-driven world, efficiently handling and processing large volumes of data can be crucial. Spring Webflux, a part of the Spring framework, provides a non-blocking, reactive approach to handling data streams, ensuring low resource consumption and high throughput. This article explores the power of Spring Webflux and its practical applications. We will use the 4chan public API as our data source in this tutorial. The choice of the 4chan API is not an endorsement of the platform but a practical decision based on its public availability, lack of authentication requirements, and the nature of its data, which is suitable for demonstrating the capabilities of Spring Webflux. Throughout the article, you will learn how to fetch data from the 4chan API, stream it continuously using Spring Webflux, and process the data to gain insights into the platform's metrics. The git repository for this guide can be found . here Dependencies and Project Structure The 4chan Metrics Collector project is organized into two Gradle submodules: and . The file is responsible for including these submodules in the project. app api-client settings.gradle.kts The file serves as the project's main build configuration. It defines the plugins, Java compatibility, configurations, repositories, and common dependencies for all submodules. build.gradle.kts Notable dependencies for this project include: Spring Boot Actuator Spring Boot WebFlux Spring Boot AOP Lombok Micrometer Prometheus Registry The submodule, configured in , has a dependency on the submodule. The submodule is a Spring Boot application responsible for running the metrics collector. The submodule, configured in , is a library containing the implementation of the API client used by the submodule to access . app app/build.gradle.kts api-client app api-client api-client/build.gradle.kts app 4chan public API While the primary focus of this article is on using Spring Webflux for streaming data from the 4chan API, the project also demonstrates several other valuable features, such as: Gradle project with Kotlin-based configuration Starting and stopping threads at application startup and shutdown Custom Spring configuration Using Docker Compose to start the application, Prometheus, and Grafana together Publishing metrics to Prometheus Using Grafana to graph published metrics Fetching, Streaming, and Processing Data with Spring Webflux In this section, we will explore how the application fetches data from the 4chan API, streams the data using Spring Webflux, and processes it to collect and publish metrics. Overview The process starts with the class, which schedules periodic requests to the 4chan API based on the configured boards in the file. For each board, a is created to emit data, and these instances are merged into a single that the application subscribes to. As the data streams through the subscription, the application publishes metrics to . MetricsCollector application.yml Flux Flux Flux Prometheus The diagram below describes the flow of the application - Each configured board continuously emits messages containing metadata about their active threads obtained from the 4chan public API, those streams are merged into a single continuous stream for all active threads across the configured boards, and as each message is processed in the subscription, metrics are published to Prometheus. Create Flux and Subscribe to Data Stream @PostConstruct public void produce() { boards.stream() .map(board -> Flux.create(emit(board))) .reduce(Flux::merge) .orElse(Flux.empty()) .flatMap(Function.identity()) .doOnNext(this::publishThreadMetrics) .subscribe(); } The entry point of the application is the method in the class. Thanks to the annotation, this method executes when the application starts up. In this method, all configured boards are streamed and each is mapped into a responsible for emitting active thread data for the board. produce MetricsCollector @PostConstruct Flux At this point, we have a stream of separated objects, that is then reduced into a single containing all active threads for all configured boards. Finally, it subscribes to start the process of data consumption, and as each next element goes through the stream, thread metrics are published accordingly. Flux Flux Emitting Elements to the Flux The method in the class configures a scheduler to execute requests periodically: emit MetricsCollector private Consumer<FluxSink<Flux<FourChanThread>>> emit(final String board) { return emitter -> scheduler.scheduleAtFixedRate( execute(emitter, board), random.nextInt(REQUEST_DELAY_SECONDS), REQUEST_INTERVAL_SECONDS, TimeUnit.SECONDS); } The random delay ensures that not all threads issue requests to the 4chan API simultaneously. The method invokes the 's method to fetch data from the 4chan API: execute ApiClient getThreads private Runnable execute(final FluxSink<Flux<FourChanThread>> emitter, final String board) { return () -> { emitter.next(apiClient.getThreads(board)); }; } Fetching Data from a Public API The class uses a instance to make reactive HTTP requests to the 4chan Public API. The method returns a that represents the data stream of active threads for a specific board. ApiClient WebClient getThreads Flux<FourChanThread> Each call to the returns a list of pages, and each page contains a list of threads. The call to parses the response into a of a single list of pages. The subsequent mapping calls are used to unpack the threads from the lists that contain them, generating a of threads. The last mapping is used to enrich the thread metadata with the specific board to which the thread belongs. threads endpoint bodyToMono JSON Publisher Flux public Flux<FourChanThread> getThreads(String board) { log.debug("Issuing request to get threads for /{} board", board); Endpoint endpoint = Endpoint.THREADS; // Endpoint is an enum that defines the API endpoints and is used to build the endpoint path ParameterizedTypeReference<List<FourChanThreadList>> typeReference = new ParameterizedTypeReference<>() { }; return webClient.get() .uri(endpoint.getEndpoint(board)) .retrieve() .bodyToMono(typeReference) .timeout(TIMEOUT) .doOnError(throwable -> log.warn("Error when issuing request to get threads for /{} board", board, throwable)) .flatMapMany(Flux::fromIterable) .flatMap(threadList -> Flux.fromIterable(threadList.getThreads())) .flatMap(thread -> Mono.just(thread.withBoard(board))); } Testing Webflux Applications Testing Webflux applications can be quite challenging due to the asynchronous and non-blocking nature of reactive programming. However, with the right approach and tools, you can write effective tests that ensure the proper functionality and performance of your application. In this section, we will discuss the challenges of testing Webflux applications, use our project's unit tests as examples, and provide tips for writing effective tests. Challenges to Testing Webflux Applications Some of the challenges when testing Webflux applications include: : Since Webflux applications are built using reactive programming, handling asynchronous behavior in tests can be tricky. You need to ensure that your test cases properly account for the asynchronous nature of the code they're testing. Asynchronous behavior : Reactive applications can have complex error-handling scenarios. It's essential to test various error conditions to ensure that your application gracefully handles failures. Error handling : Timeouts are a common concern in Webflux applications, and testing how your application handles them is crucial for ensuring reliable operation. Timeouts It is complicated to mock the reactive used to make calls. Without proper care, it is easy to get tangled in a complex structure of methods that need to be stubbed and subsequent objects that need to be mocked. Web Client: WebClient HTTP Unit Testing In our project, we have two test classes, and , that cover different aspects of the application's functionality. These tests serve as examples of how to handle the challenges mentioned above: MetricsCollectorTest ApiClientTest Asynchronous behavior The class tests the asynchronous behavior of the method using to wait for the expected execution time before making assertions. MetricsCollectorTest produce() Thread.sleep() metricsCollector.produce(); Thread.sleep(TimeUnit.SECONDS.toMillis(REQUEST_DELAY_SECONDS) + 1L); The class uses the from the library to ensure the expected behavior of the method. ApiClientTest StepVerifier reactor-test getThreads() Flux<FourChanThread> result = apiClient.getThreads(board); StepVerifier.create(result) .expectNext(...) .verifyComplete(); Error handling The class tests the error handling of the method by simulating a missing board and ensuring that a is thrown. ApiClientTest getThreads() WebClientResponseException Flux<FourChanThread> result = apiClient.getThreads(board); StepVerifier.create(result) .expectErrorMatches(throwable -> throwable instanceof WebClientResponseException) .verify(); Timeouts The class tests the timeout handling of the method by simulating a delayed response, ensuring that a is thrown. ApiClientTest getThreads() TimeoutException Flux<FourChanThread> result = apiClient.getThreads(board); StepVerifier.create(result) .expectErrorMatches(throwable -> throwable instanceof TimeoutException) .verify(); WebClient In the class, we inject a tailored in the instance being tested with a specified Exchange Function. This allows us to control the behavior of the and create test cases suitable for different scenarios. ApiClientTest WebClient ApiClient WebClient private WebClient createWebClient(HttpStatus httpStatus, Duration delay, String body, String contentType) { ClientResponse clientResponse = ClientResponse.create(httpStatus, ExchangeStrategies.withDefaults()) .header(HttpHeaders.CONTENT_TYPE, contentType) .body(body) .build(); ExchangeFunction exchangeFunction = request -> Mono .just(clientResponse) .delayElement(delay); return WebClient.builder() .exchangeFunction(exchangeFunction) .build(); } Conclusion In this article, we have explored the power of Spring Webflux for handling large volumes of data from REST APIs, using the 4chan public API as our data source. We learned how to fetch, stream, and process data using Spring Webflux, and examined the project structure, dependencies, and various features such as Gradle project configuration, starting and stopping threads, and custom Spring configuration. We also discussed the challenges of testing Webflux applications and provided examples from our project's unit tests to overcome these challenges. We highlighted the importance of handling asynchronous behavior, error handling, and timeouts in testing and provided tips for writing effective tests for Webflux applications. By understanding and applying the concepts presented in this article, you will be better equipped to harness the capabilities of Spring Webflux in your own applications. As a result, you can build efficient, data-driven applications that effectively process and analyze large volumes of data from REST APIs. Featured image by Snow White