Hi! Serhii Kalinets from Parimatch Tech is here, and this post is about Kafka.
We have numerous data to process at speed, many microservices with their teams, that is why we chose Kafka as our main data exchange platform. We've faced several situations over the years of using it, and we want to talk more about some of them below.
Typical success stories are usually boring. However, it is always fun to read about failures and challenges. And this post is mostly about issues, misunderstandings, and (sometimes) heroic solutions.
On the go, I will be explaining the basics of how Kafka works and some specifics, just to keep all readers on the same page. Even if you’ve never worked with it, please feel free to read this post – it’s exciting and could make it easier for you to start using Kafka.
Let's start from the beginning, literally the connection. When connecting to Kafka, you need to specify the so-called bootstrap servers. Usually, the addresses of all the brokers in the cluster are provided, but it’s enough to specify just a few of them (even one could work). Why?
To answer this question, let's look at how clients connect to Kafka. When connecting, the client specifies the topic and its partition (more on partitions later on). To start writing to / reading from that partition, you need to connect to its leader, which is always one of the cluster brokers. The authors of Kafka were kind enough to us developers and eliminated the need to search for a leader on our own. The client can connect to any broker, which will re-connect it to the leader of the requested partition.
It means that to connect to the cluster successfully, you only need to know the address of just one broker. So, why indicate the whole list?
The list of addresses allows you to increase the availability of the cluster if some brokers are unavailable. The client connects to the brokers from the list one after another until one replies.
So, for local development and testing, using one address is enough. For production, using three addresses is reasonable – in such cases, two brokers are allowed to be unavailable – a highly exceptional situation. All brokers may be indicated, but it is impractical brokers could be added/removed by cluster operators do keeping the full list of brokers might require additional and unnecessary maintenance.
On the one hand, Kafka seems to be dead simple, but on the other, it’s an incredibly complicated thing. Yes, it’s just a service that allows you to write and read bytes, but there are hundreds of different settings that control the transmission and storage of these bytes.
For example, you can set the duration of how long messages should be retained in a topic. Unlike typical message brokers that only transmit data, Kafka stores it. In essence, Kafka is a commit log – a structure where data is appended only to the end, and once Kafka has received the message, it is stored as long as required.
The retention settings, which provide different options, define the “as long as required” part. You can set up messages to be deleted after a certain amount of time or when their total size hits some threshold.
The deletion does not happen immediately, rather when Kafka decides. Due to specific technical aspects, even after the cleanup job is run, eligible messages might be left intact. Why?
Kafka stores data in the files called “segments”. There could be many segments for a particular partition and one of them where data is written to is called “active”. When the active segment reaches a certain size or age, it becomes inactive, and a new active segment is created instead. Cleanup jobs delete data only in inactive segments, so it is possible that when retention is set to one day (for example) but the segment settings are default (to become inactive the segment should live for a week or reach a gigabyte in size) then people start wondering why the old data is still there.
We faced similar unpleasant situations when we configured retention of topics for two weeks and then – during some recovery process – re-read the topics from the beginning and finally were shocked to find the old data re-processed.
The general rule of thumb is to never rely on Kafka's data retention settings in your business logic. It just does not provide any guarantees.
Besides straightforward deletion, there is also so-called compaction– when Kafka deletes not just old messages but all previous messages with the same key (more on keys later on). In fact, messages in the middle of the topic are getting deleted. Why do we need this?
Compaction saves space for storing data by removing data we don't need. If we make changes to an entity in snapshots (the current state after the change), we no longer need previous versions. The latest snapshot is enough. Compaction is all about removing the previous versions.
Topics with compaction can be considered as tables in a relational database, where one key always has one value. Cool, right? Developers read the documentation up to this point, then develop services expecting no more than one message per unique key, and then… they cry.
Again, the real deletion of data occurs in inactive segments and under certain circumstances. Many configuration parameters control the process, but the point is that the data won't be deleted for a long time, and this should be considered in your service design.
And finally, another exciting thing about compaction. Publishing a message with an existing key to a topic with compaction is basically an UPDATE operation. And if we can change things, we also should be able to delete them. To delete, you need to send a message with the key and an empty body (pass NULL instead of a body). And this combination is called a “tombstone”, it is like a null terminator in a record’s history.
These tombstones are stored in the topic so that consumers (services that read from Kafka), once they reach them, would understand that a record with such a key has been deleted, and process this fact accordingly. Besides that, tombstones also get deleted after some time, leaving no trace of the original record inside the topic. The deletion time is configured separately, and it should not be set too short, especially if you do not know your topic consumers. After all, if the tombstone is deleted before some slow-moving consumers have managed to read it, then the record will not be deleted for them at all. It will remain forever.
The process seems to be really thoughtful and clearly described, with no signs of trouble. We came up with a service that reads the list of current events from a topic and stores it in memory. There are many sports events, but at some point, they all end, and then they could be deleted. We used the previously mentioned tombstone method in the topic with configured compaction.
However, after a while, we noticed that the starting time of our service instances started to increase. Long story short, it turned out that the tombstones were never removed, despite the correct configuration parameters.
There is KIP-534, which should have been fixed already, but we have not updated our Kafka yet, so we live with this bug for now. Our solution was to add another delete policy to delete records after a certain period. And to avoid losing the events of the distant future, which have no updates, we made periodic fake updates.
As mentioned above, Kafka is not a typical message broker. Yes, producers publish messages, and consumers read them and even commit something, but there is an important difference that can confuse some newcomers easily. I often hear questions asking how to re-read a message, how to delete it after reading it, how the consumer can notify the producer about successful processing, or why the client receives the next message if it hasn't committed the previous one. One reason for this is that practices used with standard message brokers do not necessarily apply to Kafka. The point is that these are not the commits that the developers are used to.
To get that, we can imagine a Kafka topic as a stream, like a file or a buffer in memory. Working with such a stream consists of opening it, indicating the position for reading the data, and then reading items one by one in a loop. We do not need to tell Kafka that we have succeeded in reading something. If a consumer returns a message during the current call, the call will return the next message.
The position of each message is called “offset”. Consumers should provide the starting offset they want to read from. It can be set in absolute and relative values, from the beginning to the end. Since most scenarios imply that when the service restarts it should resume reading from the position where it stopped last time, and to achieve that, the last read offset should be provided during connection. Kafka has a feature out of the box to make things simpler.
That feature is provided by consumer groups. This is basically a group of consumers reading some set of topics when all these topics partitions are distributed across all consumers so that at any point of time only one consumer is reading from any partition. Their primary purpose is horizontal scaling, but that is beyond this article's scope. What we will focus on, is the ability of consumer groups to store offsets. Once the offset of a given topic/partition combination is stored, when this partition is reassigned to another consumer (that might happen because of restart or scaling or any other reason), that consumer picks up the reading from the offset following the stored one.
The process of storing offsets is calledcommit. It has nothing to do with a confirmation that a message has been read or processed. It is merely providing a starting position for future consumers of that partition. And yes, a consumer can commit any offset, not just the one it just read.
Having that, it turns out that the frequency of commits may not coincide with the frequency of receiving messages. When the rate of messages is high, sending a commit to each one could significantly impact the performance negatively. For high load systems, offsets are usually being committed less often – for example, every few seconds.
This approach may result in some messages being processed more than once. For example, the service restarted after processing message 10 but only managed to commit message 5. As a result, after restarting, it will re-read messages 6-10.
Developers should always consider that and make their services idempotent (a complicated word that means the repeated execution of an operation should not change anything). Some developers try to achieve exactly-once semantics (when a message can only be read once) by playing with the frequency of commits and different Kafka settings – for example, explicitly sending a commit for every message.
However, this approach significantly reduces efficiency and still does not guarantee exactly once. The message may be processed, but if the service or infrastructure fails while the commit is being sent, this will result in re-reading the same message after restarting the service.
One day, we noticed an invasion of strange consumer groups to our cluster. Consumer groups are usually given meaningful names, indicating the service, product, or team. Such naming helps to identify consumers of a topic. But these strange groups were empty (did not store any offsets) and did not have any details in the name – just opaque GUIDs. So, where did they come from?
In fact, consumer groups are a great mechanism for scaling reading, where Kafka spares the developers the complexity of redistributing partitions between consumers when adding to or removing from the group. But for those who like to keep everything under control, manual control is provided.
When consumers connect using the Assign()
method, rather than Subscribe()
, they get full control over the situation and specify which partitions they want to read from.
In this case, consumer groups are not needed, but for some reason, one still needs to be specified when creating a consumer, and it will be created in the cluster.
And our mysterious new groups turned out to be consumer groups, created by a service that used Assign()
. But why were there so many of them, and where did the GUID come from?
It turned out that in the case of a .NET client from the official repository, a GUID is used to name the group. In the vast majority of scenarios that include a GUID, we require a unique identifier. Any .NET developer has a muscle memory – when they see Guid, they use Guid.NewGuid()
that produces a unique identifier. So did we. As a result, during every start of the service, a new consumer group was created, and the old one did not get removed. This was very strange and did not look like “by design” behavior.
However, during yet another study of examples of consumers with Assign
, we suddenly realized that they don’t useGuid.NewGuid()
, but rathernew Guid()
, which is an entirely different beast. It results not in a unique GUID but in a default value consisting of zeros. So in library samples, basically the Guid
type was used to get a constant that was used as a consumer group name. You can see these samples
Please don't hesitate to use constants for consumer groups in all scenarios, both Subscribe() and Assign(), don’t be confused by library samples.
If you start your journey with Kafka with a book (which is one of the best ways), the work of clients will, most likely, be described using Java as an example — with a lot of exciting and correct things. For example, you’ll discover that the consumer implements a rather complicated protocol, which hides many details like working with consumer groups, balancing, and so on.
Perhaps, this is why there are only a few client libraries. There are basically two of them – a native one, running under the JVM; and librdkafka, written in C, which serves as the foundation for the libraries of all other languages. There is one significant difference in how they commit offsets. Java clients do everything in a single thread, and all communication with the broker happens during the poll() call. This method, which reads messages from Kafka, also does other work, such as committing offsets, transactions, etc. Since everything happens in one thread, developers know for sure that if they get the message, then commit an offset. The service fails before calling the poll method, the offset will not be saved in Kafka, and when the service is restarted, this message will be read once again.
Librdkafka, in turn, works differently. Their offsets are committed in the background by a dedicated thread. After calling the Commit method, the commit may or may not reach Kafka. What is worse, with default settings, the offset could be committed, but the message may not be processed (
Our main stack is .NET, but at some point, we decided to add colors to our boring life by mixing a bit (as it seemed to us back then) of JVM to it, namely Scala. Why? Well, Kafka is written in Java-Scala, the JVM has a higher-level API – Kafka Streams. In their difference, these APIs are similar to C and Python for reading files.
For the former, you need to mess with an opening (and closing) a file, allocating (and freeing) a buffer, loops, and all other benefits of low-level byte handling. Tens of lines of code. And for the latter, well, it all could be done with a simple one-liner.
In Kafka streams, topics are introduced in the form of streams, from which you can read, join with other streams (topics) by a key, or write a predicate and filter the message by criteria.
So, we wrote some code, ran it, but it did not work as expected. No errors and no results. We started to dig deeper and found some interesting things.
To understand and appreciate the value of our findings, let's take a closer look at the Kafka concepts of keys and partitions. Messages in Kafka are stored in topics, and topics are split into partitions. Each partition is a kind of shard. Data from each topic is split into parts, stored on different brokers, thus increasing the number of producers and consumers of that topic.
Novices often confuse partitions (shards) with replicas (copies). The difference is that a partition keeps only a part of a topic data, while a replica stores all topic data. These two things are not mutually exclusive, and in most cases, topics have several partitions and several replicas.
Partitions improve efficiency, while replicas are used to improve reliability and availability. The increase in efficiency is achieved through the horizontal scaling of consumers. When using the recommended approach with consumer groups, only one consumer may be reading the partition at a time. Therefore, the scaling limit for reads is the number of partitions. The consumer group still could have more consumers than a number of partitions, but extra ones will just be idle.
The logic of partitioning is that data is being assigned to one or another partition according to certain criteria. Of course, it is possible to write to each partition in a round-robin manner, similar to a regular load balancer. Still, this approach is unsuitable for many typical scenarios where messages related to one entity (for example, order modifications) must be processed by the same consumer. The more practical way is a hash function that is applied to the message key and determines what partition that message should be sent to.
Things are getting complicated here, and once again thanks to the Kafka developers for making it easier. When sending a message, you need to specify the partition, but clients also have a built-in helper mechanism that calculates the appropriate partition.
This choice is made using the so-called partitioner. In fact, this is the implementation of some hashing function. And there is even a default partitioner that just works, so developers might not even know that it exists.
But let's get back to our problem. It turned out that Scala and .NET clients have different default partitioners. In our case, there were two services based on different technologies, both writing to the same topic. And because of this difference, messages with the same keys ended up in different partitions.
You need to check the default partitions if several services write to the same topic. And even better, avoid system design where more than one service writes to the same topic.
Every message in Kafka has a timestamp field. It would be logical to assume that the broker fills it when the message is added. But… don't be too confident about that.
Firstly, there is an option to set the time explicitly, and there are also two options for what to do if the time is not set. You can use the time on the producer's side (when the message was sent), or the broker's side (when the message was added to the topic).
Therefore, be careful about relying on the timestamp of the messages in Kafka, especially when the topic producer is not under your control. In this case, it is better to transfer the messages to your topic and then set the time as you wish.
Kafka is a fairly old, mature product (since 2011); some APIs have changed during its development path and some have been deprecated.
For example, in the beginning, for the connection, the Zookeeper address (which is a necessary component of Kafka versions before 2.8.0) was used. Later they switched to the addresses of Kafka brokers (the bootstrap servers mentioned above). Now the recommended way is to use bootstrap servers, but connection via Zookeeper also works and is still supported in some utilities.
We had an interesting case when the consumer group was removed, but its metrics continued to be published. It turned out that the group was being deleted using a tool configured with a Zookeeper connection, while the metrics were collected by the exporter, which was connected via bootstrap servers. So, eventually, the group remained untouched despite the fact that the tool returned success on the removal request.
Conclusion: Do not use obsolete protocols, or at least do not mix them with the new ones.
So, that was a selection of facts and misconceptions about Kafka. We hope that our experience will help you avoid the difficulties we encountered.
🚀See you soon!