In this post, we’ll , the consequences for the application, as well as steps to minimize downtimes in event processing when consumer group members change. learn how ’ Consumers behave differently from regular Kafka Consumers Kafka Streams With the , a containerised when one app instance (group member) is removed or restarted. For real-time data streaming workloads with a low e2e latency as an NFR (non-functional requirement), such a long ‘rebalance downtime’ often is unacceptable. default configuration stateless Streams app pauses processing for >45s Fortunately, there’s a simple yet efficient solution to address this problem. As a bonus, we will look under the hood of Kafka Consumer Groups, the Group Coordinator & Rebalance Protocol, and measure, analyze, and evaluate a simulation of a group member (replica) re-creation running on Kubernetes. ... ? here's a : TLDR spoiler props.put("internal.leave.group.on.close", true); Content Overview Theory Example Scenario: Kubernetes Pod Evicted… and Replaced Wait, 48s? Really??? Re-do the Example with ‘leaveGroupOnClose’ Pro Tips Conclusion Footnotes References and Further Reading Theory Regular Consumer Behaviour Let’s briefly recap on Kafka Consumers and Consumer groups. An Apache Kafka® is a client application that subscribes to (reads and processes) events. Consumer A is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as the group. consumer group rebalancing (…) One of the brokers is designated as the group’s and is responsible for managing the members of the group as well as their partition assignments. coordinator (…) When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then begins a group rebalance so that the new member is assigned its fair share of the group’s partitions. Every rebalance results in a new of the group. generation Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no heartbeat is received before expiration of the configured , then the coordinator will kick the member out of the group and reassign its partitions to another member. session timeout (Source: ) Kafka Consumer | Confluent Documentation When a consumer leaves a group due to a controlled shutdown or a crash, its partitions are reassigned automatically to other consumers. Similarly, when a consumer (re) joins an existing group, all partitions are rebalanced among the group members. This dynamic group cooperation is facilitated by the . Kafka Rebalance Protocol For a rebalance scenario where one instance is stopped, the Consumer sends a request to the coordinator before stopping (as part of a graceful shutdown, ), which triggers a rebalance. LeaveGroup Consumer#close() During the entire rebalancing process, i.e. as long as the partitions are not reassigned, consumers no longer process any data. Fortunately, rebalancing is very fast, typically between anything from 50ms to seconds. It may vary depending on different factors, such as load on your Kafka cluster or the complexity of your Streams topology (no. of input topics, streams tasks := partitions, and state stores, … -> total no. of consumers). != Streams Consumer Behaviour For Kafka Streams, some config properties are overridden via ( ). One of those properties is , set to (enabled by default for Consumers). StreamsConfig.CONSUMER_DEFAULT_OVERRIDES "internal.leave.group.on.close" false regular Please note it’s a config, which may change without prior notice with new releases. Reference: . non-public ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG This means Consumers will not send requests when stopped but will be removed by the coordinator only when the Consumer session times out (ref. ). The (note: was 10s before the Kafka 3.0.0 release, ref ). Consequently, no data is processed for more than 45 seconds for tasks assigned to the Consumer that had been stopped. LeaveGroup session.timeout.ms default Consumer session timeout is 45s KIP-735 It even worsens if a new Consumer (re) joins the group while suspected dead (no more heartbeats received), where all consumers shut down, and task assignment is blocked until the timeout is exceeded. The coordinator evicts the old Consumer that had been stopped from the group. Until then, processing comes completely to a halt for all tasks, also known as ‘stop-the-world’ rebalancing. While the ‘incremental cooperative rebalancing protocol’ introduced with Kafka 2.5 avoids ‘stop-the-world’ rebalancing for Consumers, the mentioned Kafka Streams overrides nullify some aspects. regular Example Scenario: Kubernetes Pod Evicted … and Replaced Running your apps on Kubernetes takes a longer route to achieve a robust, highly-available deployment. Kubernetes monitors your containers’ health, allows you to scale, and ensures all desired replicas are up and running according to your spec. But still, to be truly elastic and minimize downtime of your data stream processing, your application must be able to handle Pods (/container) to be restarted, evicted, and re-created gracefully. There are many potential causes, e.g. application upgrades ( ), k8s cluster security patching, (auto-)scaling, resource shortage, or k8s nodes running on Spot instances being interrupted. CI/CD Next, we look at a simple yet common example. Stateless Kafka Streams app, 6 streams tasks, running on Kubernetes as Deployment, with 3 replicas. Example infrastructure setup: One pod is terminated and successively replaced. Scenario: Initial state, all 3 pods are running & healthy, the streams app is processing, balanced task assignment Pod (P1.1) terminated (deleted) by k8s, shutting down gracefully A replacement Pod (P1.2) is scheduled & placed The final state, the replacement Pod is running & healthy, the streams app is processing balanced task assignment. I would like to share a screenshot depicting the consumer lag metrics, rendered in Grafana, for a simulation of our scenario. Let’s walk through the results and explain the behavior: the Pod (P1.1) is terminated and stops processing. The consumer lag of partitions starts to build up 18:34:00: [1,4] the replacement Pod (P1.2) has come up; the streams task sends a to the group coordinator 18:34:20: JoinGroup rebalancing triggered, assignments revoked, pauses - due to no heartbeats received from (P1.1) 18:34:21: all consumers pause processing, waiting for assignment; lag starts to build up for all partitions 18:34:21: rebalancing continues, new assignment, processing continues 18:34:45: all consumers caught up; consumer lags are back to healthy jitter 18:34:48: To better illustrate everything that is happening over time, here’s a time bar diagram highlighting all important steps: Here are the belonging application logs for the rebalancing, which occurred at and took . 16:34:44.988 92ms 2023-06-17 16:34:44,988 INFO State transition from RUNNING to REBALANCING 2023-06-17 16:34:45,080 INFO State transition from REBALANCING to RUNNING So, we can conclude the following downtimes: partitions : [2,5] 48s partitions : While the actual rebalancing took only . [0,1,2,3,4] 25s 92ms 😵 Wait, 48s? Really??? Depending on your stream processing use case, 45s+ downtime might be no big deal, but , it’s a massive . for real-time low-latency data streams breach of the NFR So let’s see what options we’ve got to mitigate: Option 1: Lower consumer session timeout Since the session timeout determines the downtime, one way to mitigate is to reduce . Don’t forget to decrease the value of to ensure three heartbeats plus a buffer can fit within the timeout period. session.timeout.ms heartbeat.interval.ms session.timeout.ms=6000 heartbeat.interval.ms=1500 Read the config here: Kafka Consumer Configurations Option 2: Enable ‘leaveGroupOnClose’ …but why work with timeouts when it’s perfectly valid to have your Streams Consumers notify the coordinator when closing down?!? stateless To enable ‘leaveGroupOnClose’ (overriding the 😜), configure your Kafka Streams app with the following property: override internal.leave.group.on.close=true Please note it’s a config, which may change without prior notice with new releases. Reference: . non-public ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG Re-do the Example with ‘leaveGroupOnClose’ 🚀 Drum roll 🥁 … and here, without further ado, are the results: As we can ( ) see - the two rebalancings complete so fast that there's not even the slightest consumer lag increase visible in the metrics. Here’s the visual explanation: not Finally, here are also the application logs showing the timings of the rebalancing, which happened twice. One at that took , and the other at in . 17:46:00.332 92ms 17:46:21.361 98ms 2023-06-17 17:46:00,332 INFO State transition from RUNNING to REBALANCING 2023-06-17 17:46:00,424 INFO State transition from REBALANCING to RUNNING 2023-06-17 17:46:21,361 INFO State transition from RUNNING to REBALANCING 2023-06-17 17:46:21,458 INFO State transition from REBALANCING to RUNNING Pro Tips Stateless <> Stateful This post recommends setting for Kafka Streams applications. internal.leave.group.on.close=true stateless (!) Before implementing for stateful applications, it is crucial to understand all potential consequences. internal.leave.group.on.close=true Unfortunately, my evaluation using `internal.leave.group.on.close=true` in combination with standby replicas was not very promising. The expected fluent task re-assignment to hot standby while one replica "restarts" - and subsequent re-distribution of tasks, does not work. The Kafka Streams specific `HighAvailabilityTaskAssignor` has known issues such as uneven task assignment, frozen warmup tasks ('task movement'), and not recognising caught-up standby tasks when the consumer group changes. Please note there are plans to address those issues with the next version of the Consumer Rebalance Protocol (see [footnotes]( )). Often the best plan to keep downtimes low during rebalance for stateful apps is to stick with RocksDB + StatefulSet + PersistentVolumes + restart within (!) the session timeout => re-join with previous assignment, re-use RocksDB state, and avoid rebalancing entirely... #footnotes Alternatively, take a look at , introduced in an . kafka-streams-cassandra-state-store earlier blog post k8s Deployment .spec.minReadySeconds Frequently rebalancing within a short timeframe can cause consumer delays and strain the Kafka cluster. If your application/container has a quick restart time, such as when running as a GraalVM native executable, it’s worth considering the use of to maintain control and ensure upgrades occur in a controlled manner. This will help prevent frequent rebalancing within a short timeframe. .spec.minReadySeconds Conclusion By configuring your Kafka Streams app with , a graceful and tasks are re-assigned to other active members within the group. The while . As a result, your applications enables interruption-free CI/CD and can be auto-scaled. internal.leave.group.on.close=true shutdown immediately triggers a rebalancing process processing downtime is significantly reduced also improving elasticity and resilience Please note that this recommendation only applies to streams applications.! Tread carefully for stateful topologies, and do your homework! Remember that stateless is a config, which may change without prior notice with new releases. Always check the source code for changes when upgrading the Kafka Streams dependency. internal.leave.group.on.close non-public Footnotes When writing this blog post, the latest version of kafka-streams was 3.4.1. There’s a ticket from June 2018 proposing to make the config public. The ticket is closed as . Concerns of the core developer team can be found in the discussion. KAFKA-6995 ’Won’t Fix’ Looking into the crystal ball: A Kafka Design Proposal (KIP) is in progress to introduce a new group membership and rebalance protocol for the Kafka Consumer and, by extensions, Kafka Streams.=> KIP-848: The Next Generation of the Consumer Rebalance Protocol It was also introduced on : Current 2022 The Next Generation of the Consumer Rebalance Protocol With David Jacot | UK The application + docker-compose setup that was put together for this article can be found on the thriving-dev GitHub Organisation::icon{name="mdi-github" class="inline -mt-0.5 w-6 h-6"} https://github.com/thriving-dev/kafka-streams-leave-group-on-close Many thanks to for proofreading the blog post! 🙇 @MatthiasJSax References and Further Reading Kafka Consumer Group Rebalance (1 of 2) Kafka Consumer Group Rebalance (2 of 2) Kafka-streams delay to kick rebalancing on consumer graceful shutdown - Stack Overflow Cooperative Rebalancing in the Kafka Consumer, Streams & ksqlDB Apache Kafka Rebalance Protocol, or the magic behind your streams applications | by Florian Hussonnois | StreamThoughts | Medium API that forces the member to leave the consumer group - Apache Kafka - Apache Software Foundation KIP-812: Introduce another form of the KafkaStreams.close() This post was originally published on . Thriving.dev