Apache Kafka is a distributed event streaming platform built with an emphasis on reliability, performance, and customization. Kafka can send and receive messages in a fashion. To achieve this, the ecosystem relies on few but strong basic concepts, which enable the community to build many features solving , for instance: publish-subscribe numerous use cases Processing messages as an . Enterprise Service Bus Tracking Activity, metrics, and telemetries. Processing Streams. Supporting . Event sourcing Storing logs. This article will see the concepts backing up Kafka and the different tools available to handle data streams. Architecture The behavior of Kafka is pretty simple: push into a particular , and subscribe to this to fetch and process the Let’s see how it is achieved by this technology. Producers Messages Topic Consumers Topic Messages. Infrastructure side Independently of the use, the following components will be deployed: One or more sending messages to the brokers. Producers One or more Kafka , the actual messaging server handling communication between producers and consumers. Brokers One or more fetching and processing messages in clusters named . Consumers Consumer Groups One or more instances managing the brokers. Zookeeper (Optionally) One or more instances uniformizing message schema. Registry As a scalable distributed system, Kafka is heavily relying on the concept of . As a result, on typical production deployment, there will likely be multiple instances of each component. clusters A is a cluster of the same consumer application. This concept is heavily used by Kafka to balance the load on the applicative side of things. Consumer Group Note: The dependency on Zookeeper will be removed soon, Cf. KIP-500 Applicative side A in Kafka is a pair. Those elements can be anything from an integer to a , provided the right serializer and deserializer. Message key-value Protobuf message The message is sent to a , which will store it as a . The topic should be a collection of logs semantically related, but without a particular structure imposed. A topic can either keep every message as a new log entry or only keep the last value for each key (a.k.a. Compacted log). Topic Log To take advantage of the multiple brokers, topics are into by default. Kafka will assign any received message to one partition depending on its key, or using otherwise, which results in a random assignment from the developer's point of view. Each partition has a responsible for all I/O operations, and replicating the data. A follower will take over the leader role in case of an issue with the current one. sharded Partitions a partitioner algorithm Leader Followers The partition holds the received data in order, increasing an integer for each message. However, there is no order guarantee between two partitions. So for order-dependent data, one must ensure that they end up in the same partition by using the same key. offset Each partition is assigned to a specific consumer from the consumer group. This consumer is the only one fetching messages from this partition. In case of shutdown of one customer, the brokers will among the customers. reassign partitions Being an asynchronous system, it can be hard and impactful on the performances to have every message delivered exactly one time to the consumer. To mitigate this, Kafka provides on the number of times a message will be processed ( at most once, at least once, exactly once). different levels of guarantee i.e., Schema and Registry Messages are serialized when quitting a producer and deserialized when handled by the consumer. To ensure compatibility, both must be using the same data definition. Ensuring this can be hard considering the application evolution. As a result, when dealing with a production system, it is recommended to use a schema to explicit a contract on the data structure. To do this, Kafka provides a server, storing and binding schema to topics. Historically only was available, but the registry is now modular and can also handle and out of the box. Registry Avro JSON Protobuf Once a producer sent a schema describing the data handled by its topic to the registry, other parties ( brokers and consumers) will fetch this schema on the registry to validate and deserialize the data. i.e., Integrations Kafka provides multiple ways of connecting to the brokers, and each can be more useful than the others depending on the needs. As a result, even if a library is an abstraction layer above another, it is not necessarily better for every use case. Kafka client library There are client libraries available in , which help develop a producer and consumer easily. We will use Java for the example below, but the concept remains identical for other languages. numerous languages The producer concept is to publish messages at any moment, so the code is pretty simple. { { Properties producerProperties = Properties(); producerProperties.put( , ); producerProperties.put( , ); producerProperties.put( , ); producerProperties.put( , ); producerProperties.put( , ); producerProperties.put( , ); producerProperties.put( , ); Producer<Long, AvroHelloMessage> producer = KafkaProducer<>(producerProperties); producer.send( AvroHelloMessage( , , , )); } } public class Main Exception public static void main (String[] args) throws // Configure your producer new "bootstrap.servers" "localhost:29092" "acks" "all" "retries" 0 "linger.ms" 1 "key.serializer" "org.apache.kafka.common.serialization.LongSerializer" "value.serializer" "io.confluent.kafka.serializers.KafkaAvroSerializer" "schema.registry.url" "http://localhost:8081" // Initialize a producer new // Use it whenever you need new 1L "this is a message" 2.4f 1 The code is a bit more complex on the consumer part since the consumption loop needs to be created manually. On the other hand, this gives more control over its behavior. The consumer state is automatically handled by the Kafka library. As a result, restarting the worker will start at the most recent offset he encountered. { { Properties consumerProperties = Properties(); consumerProperties.put( , ); consumerProperties.put( , ); consumerProperties.put( , ); consumerProperties.put( , ); consumerProperties.put( , ); consumerProperties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, ); consumerProperties; } { Consumer<Long, AvroHelloMessage> consumer = KafkaConsumer<>(configureConsumer()); consumer.subscribe(Pattern.compile( )); AtomicBoolean shouldStop = AtomicBoolean( ); Thread consumerThread = Thread(() -> { Duration timeout = Duration.ofSeconds( ); (!shouldStop) { (ConsumerRecord<Long, AvroHelloMessage> record : consumer.poll(timeout)) { AvroHelloMessage value = record.value(); } Thread.sleep( ); } consumer.close(timeout); }); consumerThread.start(); shouldStop.set( ); consumerThread.join(); } } public class Main Properties public static configureConsumer () new "bootstrap.servers" "localhost:29092" "group.id" "HelloConsumer" "key.deserializer" "org.apache.kafka.common.serialization.LongDeserializer" "value.deserializer" "io.confluent.kafka.serializers.KafkaAvroDeserializer" "schema.registry.url" "http://localhost:8081" // Configure Avro deserializer to convert the received data to a SpecificRecord (i.e. AvroHelloMessage) // instead of a GenericRecord (i.e. schema + array of deserialized data). true return Exception public static void main (String[] args) throws // Initialize a consumer final new // Chose the topics you will be polling from. // You can subscribe to all topics matching a Regex. "hello_topic_avro" // Poll will return all messages from the current consumer offset final new false new final 5 while for // Use your record // Be kind to the broker while polling 5 // Start consuming && do other things // [...] // End consumption from customer true Kafka Streams Kafka Streams is built on top of the consumer library. It continuously reads from a topic and processes the messages with code declared with a functional DSL. During the processing, transitional data can be kept in structures called and , which are stored into topics. The former is equivalent to a standard topic, and the latter to a compacted topic. Using these data stores will enable automatic tracking of the worker state by Kafka, helping to get back on track in case of restart. KStream KTable The following code sample is extracted from the . The code connects to a topic named containing strings values, without necessarily providing keys. The few lines configuring the will: tutorial provided by Apache streams-plaintext-input StreamsBuilder Transform each message to lowercase. Split the result using whitespaces as a delimiter. Group previous tokens by value. Count the number of tokens for each group and save the changes to a KTable named . counts-store Stream the changes in this Ktable to send the values in a KStream named output. streams-wordcount- { { Properties props = Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, ); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = StreamsBuilder(); builder.<String, String>stream( ) .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split( ))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, []>>as( )) .toStream() .to( , Produced.with(Serdes.String(), Serdes.Long())); Topology topology = builder.build(); KafkaStreams streams = KafkaStreams(topology, props); CountDownLatch latch = CountDownLatch( ); Runtime.getRuntime().addShutdownHook( Thread( ) { { streams.close(); latch.countDown(); } }); streams.start(); latch.await(); } } public class Main Exception public static void main (String[] args) throws new "streams-wordcount" "localhost:29092" final new "streams-plaintext-input" "\\W+" byte "counts-store" "streams-wordcount-output" final final new final new 1 // attach shutdown handler to catch control-c new "streams-shutdown-hook" @Override public void run () // The consumer loop is handled by the library Kafka Connect Kafka Connect provides a way of transforming and synchronizing data between almost any technology with the use of . Confluent is hosting a , on which users can share connectors for various technologies. This means that integrating a Kafka Connect pipeline is most of the time only a matter of configuration, without code required. A single connector can even handle both connection sides: Connectors Hub Populate a topic with data from any system: a . i.e. Source Send data from a topic to any system: a . i.e. Sink The source will read data from CSV files in the following schema then publish them into a topic. Concurrently, the sink will poll from the topic and insert the messages into a MongoDB database. Each connector can run in the same or a distinct worker, and workers can be grouped into a cluster for scalability. The connector instance is created through a configuration specific to the library. The file below is a configuration of the MongoDB connector. It asks to fetch all messages from the topic mongo-source to insert them into the collection sink of the database named kafka-connect. The credentials are provided from an external file, which is a feature of Kafka Connect to protect secrets. { : , : { : , : , : , : , : , : , : , : , : , : , : , : } } "name" "mongo-sink" "config" "topics" "mongo-source" "tasks.max" "1" "connector.class" "com.mongodb.kafka.connect.MongoSinkConnector" "connection.uri" "mongodb://${file:/auth.properties:username}:${file:/auth.properties:password}@mongo:27017" "database" "kafka_connect" "collection" "sink" "max.num.retries" "1" "retries.defer.timeout" "5000" "document.id.strategy" "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy" "post.processor.chain" "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder" "delete.on.null.values" "false" "writemodel.strategy" "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy" Once the configuration complete, registering the connector is as easy as an HTTP call on the running Kafka Connect instance. Afterward, the service will automatically watch the data without further work required. http://localhost:8083/connectors -d @sink-conf.json $ curl -i -X POST -H -H \ "Accept:application/json" "Content-Type:application/json" KSQL Database Ksql is somehow equivalent to Kafka Streams, except that every transformation is declared in an SQL-like language. The server is connected to the brokers and can create or from topics. Those two concepts behave in the same way as a KStream or KTable from Kafka Streams ( respectively, a topic and a compacted topic). Streams Tables i.e., There are three types of queries in the language definition: ( ): Creates a new stream or table that will be automatically updated. Persistent Query e.g. CREATE TABLE <name> WITH (...) ( ): Behaves similarly to a standard DBMS. Fetches data as an instant snapshot and closes the connection. Pull Query e.g. SELECT * FROM <table|stream> WHERE ID = 1 ( ): Requests a persistent connection to the server, asynchronously pushing updated values. Push Query e.g. SELECT * FROM <table|stream> EMIT CHANGES The database can be used to browse the brokers' content. Topics can be discovered through the command , and their content displayed using . list topics print <name> ksql> list topics; Kafka Topic | Partitions | Partition Replicas hello_topic_json | 1 | 1 ksql> print 'hello_topic_json' from beginning; Key format: KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/05/25 08:44:20.922 Z, key: 1, value: {"user_id":1,"message":"this is a message","value":2.4,"version":1} rowtime: 2021/05/25 08:44:20.967 Z, key: 1, value: {"user_id":1,"message":"this is another message","value":2.4,"version":2} rowtime: 2021/05/25 08:44:20.970 Z, key: 2, value: {"user_id":2,"message":"this is another message","value":2.6,"version":1} ---------------------------------------------------- ---------------------------------------------------- The syntax to create and query a stream or a table is very close to SQL. ksql> messages (user_id PRIMARY , message ) > (KAFKA_TOPIC = , VALUE_FORMAT= ); ksql> list tables; Table Name | Kafka Topic | Key Format | Value Format | Windowed MESSAGES | hello_topic_json | KAFKA | JSON | false ksql> messages; Name : MESSAGES Field | Type USER_ID | BIGINT (primary key) MESSAGE | VARCHAR(STRING) For runtime statistics and query details run: <Stream, >; ksql> * messages EMIT CHANGES; + |USER_ID |MESSAGE | + |1 |this is another message | |2 |this is another message | -- Let's create a table from the previous topic CREATE TABLE BIGINT KEY VARCHAR WITH 'hello_topic_json' 'JSON' -- We can see the list and details of each table ---------------------------------------------------------------------- ---------------------------------------------------------------------- describe ------------------------------------------ ------------------------------------------ DESCRIBE EXTENDED Table -- Appart from some additions to the language, the queries are almost declared in standard SQL. select from --------+------------------------+ --------+------------------------+ Kafka recommends using a headless ksqlDB server for production, with a file declaring all streams and tables to create. This avoids any modification to the definitions. Note: ksqlDB servers can be grouped in a cluster like any other consumer. Conclusion This article gives a broad view of the Kafka ecosystem and possibilities, which are numerous. This article only scratches the surface of each subject. But worry not, as they are all well documented by Apache, Confluent, and fellow developers. Here are a few supplementary resources to dig further into Kafka: (Youtube) Kafka Tutorials - Confluent Kafka Tutorials in Practice Top 5 Things Every Apache Kafka Developer Should Know — Bill Bejeck Kafkacat user Guide Troubleshooting KSQL Part 2: What’s Happening Under the Covers? — Robin Moffatt Apache Kafka Internals — sudan The complete experimental code is available on my GitHub repository . Also published on GitHub .