Apache Kafka® is an event streaming platform used by more than 30% of the Fortune 500 today. There are numerous features of Kafka that make it the de-facto standard for an event streaming platform, and in this blog post, I explain what I think are the top five things every Kafka developer should know. Some items in our top five are performance related, while others are about the key architectural concepts that make Kafka tick. I hope that at the end of this blog post, you’ll walk away with a deeper understanding of how Kafka works, as well as with a new trick or two up your sleeve. Tip #1: Understand message delivery and durability guarantees For data durability, the has the configuration setting . The configuration specifies how many acknowledgments the producer receives to consider a record delivered to the broker. The options to choose from are: KafkaProducer acks acks The producer considers the records successfully delivered once it sends the records to the broker. This is basically “fire and forget.” none: The producer waits for the lead broker to acknowledge that it has written the record to its log. one: The producer waits for an acknowledgment from the lead broker and from the following brokers that they have successfully written the record to their logs. all: As you can see, there is a trade-off to make here—and that’s by design because different applications have different requirements. You can opt for higher throughput with a chance for data loss, or you may prefer a very high data durability guarantee at the expense of a lower throughput. Now let’s take a second to talk a little bit about the scenario. If you produce records with set to all to a cluster of three Kafka brokers, it means that under ideal conditions, Kafka contains three replicas of your data—one for the lead broker and one each for two followers. When the logs of each of these replicas all have the same record offsets, they are considered to be in . In other words, these have the same content for a given topic partition. Take a look at the following illustration to clearly picture what’s going on: acks=all acks sync in-sync replicas But there’s some subtlety to using the configuration. What it doesn’t specify is replicas need to be in sync. The lead broker will always be in with itself. But you could have a situation where the two following brokers can’t keep up due to network partitions, record load, etc. So when a producer has a successful send, the actual number of acknowledgments could have come from only one broker! If the two followers are not , the producer still receives the required number of acks, but it’s only the leader in this case. acks=all how many sync in sync For example: By setting , you are placing a premium on the durability of your data. So if the replicas aren’t keeping up, it stands to reason that you want to raise an exception for new records until the replicas are caught up. acks=all In a nutshell, having only one in-sync replica follows the “letter of the law” but not the “spirit of the law.” What we need is a guarantee when using the setting. A successful send involves at least a majority of the available in-sync brokers. acks=all There just so happens to be one such configuration: . The configuration enforces the number of replicas that must be in sync for the write to proceed. min.insync.replicas min.insync.replicas Note that the configuration is set at the broker or topic level and is not a producer configuration. The default value for is one. So to avoid the scenario described above, in a three-broker cluster, you’d want to increase the value to two. min.insync.replicas min.insync.replicas Let’s revisit our previous example from before and see the difference: If the number of replicas that are in sync is below the configured amount, the lead broker won’t attempt to append the record to its log. The leader throws either a or , forcing the producer to retry the write. Having replicas out of sync with the leader is considered a retryable error, so the producer will continue to retry and send the records up to the configured . NotEnoughReplicasException NotEnoughReplicasAfterAppendException delivery timeout So by setting the and producer configurations to work together in this way, you’ve increased the durability of your data. min.insync.replicas acks Now let’s move on to the next items in our list: improvements to the Kafka clients. Over the past year, the Kafka producer and Kafka consumer APIs have added some new features that every Kafka developer should know. Tip #2: Learn about the new sticky partitioner in the producer API Kafka uses partitions to increase throughput and spread the load of messages to all brokers in a cluster. Kafka records are in a key/value format, where the keys can be null. Kafka producers don’t immediately send records, instead placing them into partition-specific batches to be sent later. Batches are an effective means of increasing network utilization. There are three ways the partitioner determines into which partition the records should be written. The partition can be explicitly provided in the ProducerRecord object via the overloaded ProducerRecord constructor. In this case, the producer always uses this partition. If no partition is provided, and the ProducerRecord has a key, the producer takes the hash of the key modulo the number of partitions. The resulting number from that calculation is the partition that the producer will use. If there is no key and no partition present in the ProducerRecord, then previously Kafka used a round robin approach to assign messages across partitions. The producer would assign the first record in the batch to partition zero, the second to partition one, and so on, until the end of the partitions. The producer would then start over with partition zero and repeat the entire process for all remaining records. The following illustration depicts this process: The round robin approach works well for even distribution of records across partitions. But there’s one drawback. Due to this “fair” round robin approach, you can end up sending multiple sparsely populated batches. It’s more efficient to send fewer batches with more records in each batch. Fewer batches mean less queuing of produce requests, hence less load on the brokers. Let’s look at a simplified example where you have a topic with three partitions to explain this. For the sake of simplicity, let’s assume that your application produced nine records with no key, all arriving at the same time: As you can see above, the nine incoming records will result in three batches of three records. But, it would be better if we could send one batch of nine records. As stated before, fewer batches result in less network traffic and less load on the brokers. Apache Kafka 2.4.0 added the , which now makes this possible. Instead of using a round-robin approach per record, the sticky partitioner assigns records to the same partition until the batch is sent. sticky partitioner approach Then, after sending a batch, the sticky partitioner increments the partition to use for the next batch. Let’s revisit our illustration from above but updated using the sticky partitioner: By using the same partition until a batch is full or otherwise completed, we’ll send fewer produce requests, which reduces the load on the request queue and reduces latency of the system as well. It’s worth noting that the sticky partitioner still results in an even distribution of records. The even distribution occurs over time, as the partitioner sends a batch to each partition. You can think of it as a “per-batch” round-robin or “eventually even” approach. To learn more about the sticky partitioner, you can read the blog post and the related design document. Apache Kafka Producer Improvements with the Sticky Partitioner KIP-480 Now let’s move on to the consumer changes. Tip #3: Avoid “stop-the-world” consumer group rebalances by using cooperative rebalancing Kafka is a distributed system, and one of the key things distributed systems need to do is deal with failures and disruptions—not just anticipate failures, but fully embrace them. A great example of how Kafka handles this expected disruption is the consumer group protocol, which manages multiple instances of a consumer for a single logical application. If an instance of a consumer stops, by design or otherwise, Kafka will and make sure another instance of the consumer takes over the work. rebalance As of version 2.4, Kafka introduced a new rebalance protocol: cooperative rebalancing. But before we dive into the new protocol, let’s look in a bit more detail at the consumer group basics. Let’s assume you have a distributed application with several consumers subscribed to a topic. Any set of consumers configured with the same group.id form one logical consumer called a consumer group. Each consumer in the group is responsible for consuming from one or more partitions of the subscribed topic(s). These partitions are assigned by the leader of the consumer group. Here’s an illustration demonstrating this concept: From the above illustration, you can see that under optimal conditions, all three consumers are processing records from two partitions each. But what happens if one of the applications suffers an error or can’t connect to the network anymore? Does processing for those topic partitions stop until you can restore the application in question? Fortunately, the answer is , thanks to the consumer rebalancing protocol. no Here’s another illustration showing the consumer group protocol in action: As you can see, Consumer 2 fails for some reason and either misses a poll or triggers a session timeout. The group coordinator removes it from the group and triggers what is known as a rebalance. A rebalance is a mechanism that attempts to evenly distribute (balance) the workload across all available members of a consumer group. In this case, since Consumer 2 left the group, the rebalance assigns its previously owned partitions to the other active members of the group. So as you can see, losing a consumer application for a particular group ID doesn’t result in a loss of processing on those topic partitions. There is, however, a drawback of the default rebalancing approach. Each consumer gives up its entire assignment of topic partitions, and no processing takes place until the topic partitions are reassigned—sometimes referred to as a “stop-the-world” rebalance. To compound the issue, depending on the instance of the used, consumers are simply reassigned the same topic partitions that they owned prior to the rebalance, the net effect being that there is no need to pause work on those partitions. ConsumerPartitionAssignor This implementation of the rebalance protocol is called because it prioritizes the importance of ensuring that no consumers in the same group claim ownership over the same topic partitions. Ownership of the same topic partition by two consumers in the same group would result in undefined behavior. eager rebalancing While it is critical to keep any two consumers from claiming ownership over the same topic partition, it turns out that there is a better approach that provides safety without compromising on time spent not processing: . incremental cooperative rebalancing First introduced to Kafka Connect in , this has now been implemented for the consumer group protocol too. With the cooperative approach, consumers don’t automatically give up ownership of all topic partitions at the start of the rebalance. Apache Kafka 2.3 Instead, all members encode their current assignment and forward the information to the group leader. The group leader then determines which partitions need to change ownership—instead of producing an entirely new assignment from scratch. Now a second rebalance is issued, but this time, only the topic partitions that need to change ownership are involved. It could be revoking topic partitions that are no longer assigned or adding new topic partitions. For the topic partitions that are in both the new and old assignment, nothing has to change, which means continued processing for topic partitions that aren’t moving. The bottom line is that eliminating the “stop-the-world” approach to rebalancing and only stopping the topic partitions involved means less costly rebalances, thus reducing the total time to complete the rebalance. Even long rebalances are less painful now that processing can continue throughout them. This positive change in rebalancing is made possible by using the . CooperativeStickyAssignor The makes the trade-off of having a second rebalance but with the benefit of a faster return to normal operations. CooperativeStickyAssignor To enable this new rebalance protocol, you need to set the to use the new . Also, note that this change is entirely on the client-side. partition.assignment.strategy CooperativeStickyAssignor To take advantage of the new rebalance protocol, you only need to update your client version. If you’re a Kafka Streams user, there is even better news. Kafka Streams enables the cooperative rebalance protocol by default, so there is nothing else to do. Tip #4: Master the command line tools The Apache Kafka binary installation includes several tools located in the directory. While you’ll find several tools in that directory, I want to show you the four tools that I think will have the most impact on your day-to-day work. I’m referring to the , , , and . bin console-consumer console-producer dump-log delete-records Kafka console producer The console producer allows you to produce records to a topic directly from the command line. Producing from the command line is a great way to quickly test new consumer applications when you aren’t producing data to the topics yet. To start the console producer, run this command: kafka- -producer --topic \ --broker-list <broker-host:port> console After you execute the command, there’s empty prompt waiting for your input—just type in some characters and hit enter to produce a message. Using the command line producer in this way does not send any keys, only values. Luckily, there is a way to send keys as well. You just have to update the command to include the necessary flags: kafka- -producer --topic \ --broker-list <broker-host:port> \ --property parse.key= \ --property key.separator= console true ":" The choice of the property is arbitrary. You can use any character. And now, you can send full key/value pairs from the command line! If you are using , there are available to send records in Avro, Protobuf, and JSON Schema formats. key.separator Confluent Schema Registry command line producers Now let’s take a look at the other side of the coin: consuming records from the command line. Kafka console consumer The console consumer gives you the ability to consume records from a Kafka topic directly from the command line. Being able to quickly start a consumer can be an invaluable tool in prototyping or debugging. Consider building a new microservice. To quickly confirm that your producer application is sending messages, you can simply run this command: kafka- -consumer --topic \ --bootstrap-server <broker-host:port> console After you run this command, you’ll start seeing records scrolling across your screen (so long as data is currently being produced to the topic). If you want to see all the records from the start, you can add a flag to the command, and you’ll see all records produced to that topic. --from-beginning kafka- -consumer --topic <topic-name> \ --bootstrap-server <broker-host:port> \ -- -beginning console from If you are using , there are available for Avro, Protobuf, and JSON Schema encoded records. The Schema Registry command line consumers are intended for working with records in the Avro, Protobuf or JSON formats, while the plain consumers work with records of primitive Java types: String, Long, Double, Integer, etc. The default format expected for keys and values by the plain console consumer is the String type. Schema Registry command line consumers If the keys or values are not strings, you’ll need to provide the deserializers via the command line flags and with the fully qualified class names of the respective deserializers. --key-deserializer --value-deserializer You may well have noticed that by default, the console consumer only prints the value component of the messages to the screen. If you want to see the keys as well, you can do so by including the necessary flags: kafka- -consumer --topic --bootstrap-server <broker-host:port> \ --property print.key= --property key.separator= console true ":" As with the producer, the value used for the key separator is arbitrary, so you can choose any character you want to use. Dump log Sometimes when you’re working with Kafka, you may find yourself needing to manually inspect the underlying logs of a topic. Whether you’re just curious about Kafka internals or you need to debug an issue and verify the content, the command is your friend. Here’s a command used to view the log of an example topic aptly named : kafka-dump-log example kafka-dump-log \ --print-data-log \ --files ./ /lib/kafka/data/example / log var -0 00000000000000000000. The flag specifies to print the data in the log. --print-data-log The flag is required. This could also be a comma-separated list of files. --files For a full list of options and a description of what each option does, run with the flag. kafka-dump-log --help Running the command above yields something like this: Dumping ./ /lib/kafka/data/example / log Starting offset: baseOffset: lastOffset: count: baseSequence: lastSequence: producerId: producerEpoch: partitionLeaderEpoch: isTransactional: isControl: position: CreateTime: size: magic: compresscodec: NONE crc: isvalid: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: baseOffset: lastOffset: count: baseSequence: lastSequence: producerId: producerEpoch: partitionLeaderEpoch: isTransactional: isControl: position: CreateTime: size: magic: compresscodec: NONE crc: isvalid: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: | offset: CreateTime: keysize: valuesize: sequence: headerKeys: [] key: payload: var -0 00000000000000000000. 0 0 0 1 -1 -1 -1 -1 0 false false 0 1599775774460 81 2 3162584294 true 0 1599775774460 3 10 -1 887 -2.1510235 1 9 9 -1 -1 -1 -1 0 false false 81 1599775774468 252 2 2796351311 true 1 1599775774463 1 9 -1 5 33.440664 2 1599775774463 8 9 -1 60024247 9.1408728 3 1599775774463 1 9 -1 1 45.348946 4 1599775774464 6 10 -1 241795 -63.786373 5 1599775774465 8 9 -1 53596698 69.431393 6 1599775774465 8 9 -1 33219463 88.307875 7 1599775774466 1 9 -1 0 39.940350 8 1599775774467 5 9 -1 78496 74.180098 9 1599775774468 8 9 -1 89866187 79.459314 There’s lots of information available from the command. You can clearly see the key, payload (value), offset, and timestamp for each record. Keep in mind that this data is from a demo topic that contains only 10 messages, so with a real topic, there will be substantially more data. dump-log Also note that in this example, the keys and values for the topic are strings. To run the tool with key or value types other than strings, you’ll need to use either the or the flags. dump-log --key-decoder-class --value-decoder-class Delete records Kafka stores records for topics on disk and retains that data even once consumers have read it. However, records aren’t stored in one big file but are broken up into segments by partition where the offset order is continuous across segments for the same topic partition. Because servers do not have infinite amounts of storage, Kafka provides settings to control how much data is retained, based on time and size: The time configuration controlling data retention is , which defaults to 168 hours (one week) log.retention.hours The size configuration controls how large segments can grow before they are eligible for deletion log.retention.bytes , the default setting for is -1, which allows the log segment size to be unlimited. If you’re not careful and haven’t configured the retention as well as the retention time, you could have a situation where you will run out of disk space. Remember, you want to go into the filesystem and manually delete files. Instead, we want a controlled and supported way to delete records from a topic in order to free up space. Fortunately, Kafka ships with a tool that deletes data as required. However log.retention.bytes size never The has two mandatory parameters: kafka-delete-records the broker(s) to connect to for bootstrapping --bootstrap-server: a JSON file containing the deletion settings --offset-json-file: Here’s an example of the JSON file: { : [ { : , : , : } ], : } "partitions" "topic" "example" "partition" 0 "offset" -1 "version" 1 As you can see, the format of the JSON is simple. It’s an array of JSON objects. Each JSON object has three properties: : the topic to delete from Topic : the partition to delete from Partition : the offset you want the delete to start from, moving backward to lower offsets Offset I want to discuss how to choose the in the JSON config file. Because the example topic contains only 10 records, you could easily calculate the starting offset to start the deletion process. offset But in practice, you most likely won’t know off the top of your head what offset to use. Also bear in mind that offset != message number, so you can’t just delete from “message 42.” If you supply a , then the offset of the is used, which means you will delete all the data currently in the topic. The is the highest available offset for consumption (the offset of the last successfully replicated message, plus one). -1 high watermark high watermark Now to run the command, just enter this on the command line: kafka- -records --bootstrap-server <broker-host:port> \ --offset-json-file offsets.json delete After running this command, you should see something like this on the console: Executing records operation Records operation completed: partition: example low_watermark: delete delete -0 10 The results of the command show that Kafka deleted all records from the topic partition . The value of 10 indicates the lowest offset available to consumers. Because there were only 10 records in the , we know that the offsets ranged from 0 to 9 and no consumer can read those records again. For more background on how deletes are implemented, you can read and . example-0 low_watermark example topic KIP-107 KIP-204 Tip #5: Use the power of record headers Apache Kafka 0.11 introduced the concept of . Record headers give you the ability to add some metadata about the Kafka record, without adding any extra information to the key/value pair of the record itself. Consider if you wanted to embed some information in a message, such as an identifier for the system from which the data originated. Perhaps you want this for lineage and audit purposes and in order to facilitate routing of the data downstream. record headers Why not just append this information to the key? Then you could extract the part needed and you would be able to route data accordingly. But adding artificial data to the key poses two potential problems. First, if you are using a compacted topic, adding information to the key would make the record incorrectly appear as unique. Thus, compaction would not function as intended. For the second issue, consider the effect if one particular system identifier dominates in the records sent. You now have a situation where you could have significant key skew. Depending on how you are consuming from the partitions, the uneven distribution of keys could have an impact on processing by increasing latency. These are two situations where you might want to use headers. The proposing headers provides some additional cases as well: original KIP Automated routing of messages based on header information between clusters Enterprise APM tools (e.g., Appdynamics or Dynatrace) need to stitch in “magic” transaction IDs for them to provide end-to-end transaction flow monitoring. Audit metadata is recorded with the message, for example, the client-id that produced the record. Business payload needs to be encrypted end to end and signed without tamper, but ecosystem components need access to metadata to achieve tasks. Now that I’ve made a case for using headers, let’s walk through how you can add headers to your Kafka records. Adding headers to Kafka records Here’s the Java code to add headers to a : ProducerRecord ProducerRecord< , > producerRecord = ProducerRecord<>( , ); producerRecord.headers().add( , .getBytes(StandardCharsets.UTF_8)); producerRecord.headers().add( , .getBytes(StandardCharsets.UTF_8)); producer.send(producerRecord); String String new "bizops" "value" "client-id" "2334" "data-file" "incoming-data.txt" // Details left out for clarity Create an instance of the class ProducerRecord Call the method and add the key and value for the header ProducerRecord.headers() Adding another header There’s a few things we need to point out with the code example here. The interface expects a String key and the value as a byte array. header Even though you provide a key, you can add as many headers with the same key if needed. Duplicate keys will overwrite previous entries with the same key. not Also, there are overloaded constructors that accept an . You could create your own concrete class that implements the interface and passes in a collection that implements the interface. However, in practice, the simple method shown here should suffice. ProducerRecord Iterable<Header> Header Iterable Now that you know how to add headers, let’s take a look at how you can access headers from the consumer side of things. Retrieving headers This is how you can access headers when consuming records: ConsumerRecords< , > consumerRecords = consumer.poll(Duration.ofSeconds( )); (ConsumerRecord< , > consumerRecord : consumerRecords) { (Header header : consumerRecord.headers()) { System.out.println( + header.key() + + (header.value())); } } //Details left out for clarity String String 1 for String String for "header key " "header value " new String Iterating over the ConsumerRecords For each , iterating over the headers ConsumerRecord Header processing From the code above, you can see that to process the headers, simply use the method to return the headers. In our example above, we’re printing the headers out to the console for demonstration purposes. Once you have access to the headers, you can process them as needed. For reading headers from the command line, adds support for optionally printing headers from the ConsoleConsumer, which will be available in the Apache Kafka 2.7.0 release. ConsumerRecord.headers() KIP-431 You can also use to view headers from the command line. Here’s an example command: kafkacat kafkacat -b kafka-broker: -t my_topic_name -C \ -f 9092 '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n' Recap You now have read the top five tips for working with Apache Kafka. To recap, we understand: Message durability and its relationship with delivery guarantees The sticky partitioner in the producer API The command line tools The power of record headers And yet, there is still so much more to learn! Head over to and to see what’s going on. Confluent Developer Kafka Tutorials