In this post, we’ll learn how Kafka Streams’ Consumers behave differently from regular Kafka Consumers, the consequences for the application, as well as steps to minimize downtimes in event processing when consumer group members change.
With the default configuration, a containerised stateless Streams app pauses processing for >45s when one app instance (group member) is removed or restarted. For real-time data streaming workloads with a low e2e latency as an NFR (non-functional requirement), such a long ‘rebalance downtime’ often is unacceptable.
Fortunately, there’s a simple yet efficient solution to address this problem.
As a bonus, we will look under the hood of Kafka Consumer Groups, the Group Coordinator & Rebalance Protocol, and measure, analyze, and evaluate a simulation of a group member (replica) re-creation running on Kubernetes.
...TLDR? here's a spoiler:
props.put("internal.leave.group.on.close", true);
Let’s briefly recap on Kafka Consumers and Consumer groups.
An Apache Kafka® Consumer is a client application that subscribes to (reads and processes) events.
A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This is known as rebalancing the group.
(…) One of the brokers is designated as the group’s coordinator and is responsible for managing the members of the group as well as their partition assignments.
(…) When the consumer starts up, it finds the coordinator for its group and sends a request to join the group. The coordinator then begins a group rebalance so that the new member is assigned its fair share of the group’s partitions. Every rebalance results in a new generation of the group.
Each member in the group must send heartbeats to the coordinator in order to remain a member of the group. If no heartbeat is received before expiration of the configured session timeout, then the coordinator will kick the member out of the group and reassign its partitions to another member.
(Source: Kafka Consumer | Confluent Documentation)
When a consumer leaves a group due to a controlled shutdown or a crash, its partitions are reassigned automatically to other consumers. Similarly, when a consumer (re) joins an existing group, all partitions are rebalanced among the group members. This dynamic group cooperation is facilitated by the Kafka Rebalance Protocol.
For a rebalance scenario where one instance is stopped, the Consumer sends a LeaveGroup
request to the coordinator before stopping (as part of a graceful shutdown, Consumer#close()
), which triggers a rebalance.
During the entire rebalancing process, i.e. as long as the partitions are not reassigned, consumers no longer process any data. Fortunately, rebalancing is very fast, typically between anything from 50ms to seconds. It may vary depending on different factors, such as load on your Kafka cluster or the complexity of your Streams topology (no. of input topics, streams tasks := partitions, and state stores, … -> total no. of consumers).
For Kafka Streams, some config properties are overridden via (StreamsConfig.CONSUMER_DEFAULT_OVERRIDES). One of those properties is "internal.leave.group.on.close"
, set to false
(enabled by default for regular Consumers).
Please note it’s a non-public config, which may change without prior notice with new releases. Reference: ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG.
This means Consumers will not send LeaveGroup
requests when stopped but will be removed by the coordinator only when the Consumer session times out (ref. session.timeout.ms
). The default Consumer session timeout is 45s (note: was 10s before the Kafka 3.0.0 release, ref KIP-735). Consequently, no data is processed for more than 45 seconds for tasks assigned to the Consumer that had been stopped.
It even worsens if a new Consumer (re) joins the group while suspected dead (no more heartbeats received), where all consumers shut down, and task assignment is blocked until the timeout is exceeded. The coordinator evicts the old Consumer that had been stopped from the group. Until then, processing comes completely to a halt for all tasks, also known as ‘stop-the-world’ rebalancing. While the ‘incremental cooperative rebalancing protocol’ introduced with Kafka 2.5 avoids ‘stop-the-world’ rebalancing for regular Consumers, the mentioned Kafka Streams overrides nullify some aspects.
Running your apps on Kubernetes takes a longer route to achieve a robust, highly-available deployment. Kubernetes monitors your containers’ health, allows you to scale, and ensures all desired replicas are up and running according to your spec.
But still, to be truly elastic and minimize downtime of your data stream processing, your application must be able to handle Pods (/container) to be restarted, evicted, and re-created gracefully. There are many potential causes, e.g. application upgrades (CI/CD), k8s cluster security patching, (auto-)scaling, resource shortage, or k8s nodes running on Spot instances being interrupted.
Next, we look at a simple yet common example.
Example infrastructure setup: Stateless Kafka Streams app, 6 streams tasks, running on Kubernetes as Deployment, with 3 replicas.
Scenario: One pod is terminated and successively replaced.
I would like to share a screenshot depicting the consumer lag metrics, rendered in Grafana, for a simulation of our scenario.
Let’s walk through the results and explain the behavior:
18:34:00: the Pod (P1.1) is terminated and stops processing. The consumer lag of partitions [1,4]
starts to build up
18:34:20: the replacement Pod (P1.2) has come up; the streams task sends a JoinGroup
to the group coordinator
18:34:21: rebalancing triggered, assignments revoked, pauses - due to no heartbeats received from (P1.1)
18:34:21: all consumers pause processing, waiting for assignment; lag starts to build up for all partitions
18:34:45: rebalancing continues, new assignment, processing continues
18:34:48: all consumers caught up; consumer lags are back to healthy jitter
To better illustrate everything that is happening over time, here’s a time bar diagram highlighting all important steps:
Here are the belonging application logs for the rebalancing, which occurred at 16:34:44.988
and took 92ms.
2023-06-17 16:34:44,988 INFO State transition from RUNNING to REBALANCING
2023-06-17 16:34:45,080 INFO State transition from REBALANCING to RUNNING
So, we can conclude the following downtimes:
partitions [2,5]
: 48s
partitions [0,1,2,3,4]
: 25s While the actual rebalancing took only 92ms.
Depending on your stream processing use case, 45s+ downtime might be no big deal, but for real-time low-latency data streams, it’s a massive breach of the NFR.
So let’s see what options we’ve got to mitigate:
Since the session timeout determines the downtime, one way to mitigate is to reduce session.timeout.ms
. Don’t forget to decrease the value of heartbeat.interval.ms
to ensure three heartbeats plus a buffer can fit within the timeout period.
session.timeout.ms=6000
heartbeat.interval.ms=1500
Read the config here: Kafka Consumer Configurations
…but why work with timeouts when it’s perfectly valid to have your stateless Streams Consumers notify the coordinator when closing down?!?
To enable ‘leaveGroupOnClose’ (overriding the override 😜), configure your Kafka Streams app with the following property:
internal.leave.group.on.close=true
Please note it’s a non-public config, which may change without prior notice with new releases. Reference: ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG.
Drum roll 🥁 … and here, without further ado, are the results:
As we can (not) see - the two rebalancings complete so fast that there's not even the slightest consumer lag increase visible in the metrics.
Here’s the visual explanation:
Finally, here are also the application logs showing the timings of the rebalancing, which happened twice. One at 17:46:00.332
that took 92ms, and the other at 17:46:21.361
in 98ms.
2023-06-17 17:46:00,332 INFO State transition from RUNNING to REBALANCING
2023-06-17 17:46:00,424 INFO State transition from REBALANCING to RUNNING
2023-06-17 17:46:21,361 INFO State transition from RUNNING to REBALANCING
2023-06-17 17:46:21,458 INFO State transition from REBALANCING to RUNNING
This post recommends setting internal.leave.group.on.close=true
for stateless (!) Kafka Streams applications.
Before implementing internal.leave.group.on.close=true
for stateful applications, it is crucial to understand all potential consequences.
Unfortunately, my evaluation using `internal.leave.group.on.close=true` in combination with standby replicas was not very promising.
The expected fluent task re-assignment to hot standby while one replica "restarts" - and subsequent re-distribution of tasks, does not work.
The Kafka Streams specific `HighAvailabilityTaskAssignor` has known issues such as uneven task assignment, frozen warmup tasks ('task movement'), and not recognising caught-up standby tasks when the consumer group changes.
Please note there are plans to address those issues with the next version of the Consumer Rebalance Protocol (see [footnotes](#footnotes)).
Often the best plan to keep downtimes low during rebalance for stateful apps is to stick with RocksDB + StatefulSet + PersistentVolumes + restart within (!) the session timeout
=> re-join with previous assignment, re-use RocksDB state, and avoid rebalancing entirely...
Alternatively, take a look at kafka-streams-cassandra-state-store, introduced in an earlier blog post.
Frequently rebalancing within a short timeframe can cause consumer delays and strain the Kafka cluster.
If your application/container has a quick restart time, such as when running as a GraalVM native executable, it’s worth considering the use of .spec.minReadySeconds to maintain control and ensure upgrades occur in a controlled manner. This will help prevent frequent rebalancing within a short timeframe.
By configuring your Kafka Streams app with internal.leave.group.on.close=true
, a graceful shutdown immediately triggers a rebalancing process and tasks are re-assigned to other active members within the group. The processing downtime is significantly reduced while also improving elasticity and resilience. As a result, your applications enables interruption-free CI/CD and can be auto-scaled.
Please note that this recommendation only applies to stateless streams applications.! Tread carefully for stateful topologies, and do your homework! Remember that
internal.leave.group.on.close
is a non-public config, which may change without prior notice with new releases. Always check the source code for changes when upgrading the Kafka Streams dependency.
KafkaStreams.close()
API that forces the member to leave the consumer group - Apache Kafka - Apache Software Foundation
This post was originally published on Thriving.dev.