Listen to this story
Project Lead, Senior Software Engineer
Walkthroughs, tutorials, guides, and tips. This story will teach you how to do something new or how to do something better.
When learning a new technology, sometimes I just want to see it work. It gives me a baseline to extend my ideas, to see what is possible, and to imagine what it can become.
While reading through documentation or following tutorials, I am often greeted with numerous options and configurations. After seeing all the dazzling capabilities, I always want to see the core, the fundamental. Didn’t you ever just want to experience a simple working copy? Hence, I just want to see it work.
This series aims at minimizing the possibility of having a missing link and encourages you to build your next innovative solution based on what you learned here.
According to Kafka, Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.
Navigate to where the docker-compose.yml is. Run the below command in cmd to start a docker container of Kafka in the background.
docker-compose up -d
version: "3"
services:
kafka:
image: "bitnami/kafka:3.2.3"
hostname: kafka
ports:
- "9092:9092"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
Navigate to where KafkaStream.Producer.csproj
is at. Start a Powershell and run the following command.
dotnet run
When you see the message: “Message delivered to input-topic”, that means you did it!
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "producer-1"
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
var message = new Message<string, string>
{
Key = null, // Set the key if you want to partition the messages
Value = "Hello, Kafka!"
};
try
{
var deliveryResult = await producer
.ProduceAsync("input-topic", message);
Console.WriteLine($"Message delivered to
{deliveryResult.TopicPartitionOffset}");
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
}
}
It serves as an entry point for a client to connect to a Kafka broker.
Navigate to where KafkaStream.Consumer.csproj is at. Start a Powershell and run the following command.
dotnet run
The initialization will take a few seconds. Then you should see the following messages.
Consuming messages from topic: input-topic
Press any key to exit
Received message: Hello, Kafka! from partition [0] offset 0
There you go!
const string Topic = "input-topic";
// Configure the consumer
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "consumer-group-1",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // Disable auto commit to have more control over offsets
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// Subscribe to the topic
consumer.Subscribe(Topic);
Console.WriteLine($"Consuming messages from topic: {Topic}");
// Start consuming messages in a background thread
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var consumerThread = new Thread(() =>
{
try
{
while (true)
{
try
{
var consumeResult =
consumer.Consume(cancellationToken);
Console.WriteLine($"Received message:
{consumeResult.Message.Value} from partition
{consumeResult.Partition} offset
{consumeResult.Offset}";
// Manually commit the offset to mark the message as consumed
consumer.Commit(consumeResult);
}
catch (ConsumeException ex)
{
Console.WriteLine($"Error occurred:
{ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// This exception will be thrown when cancellation is requested.
Console.WriteLine("Cancellation requested");
}
finally
{
consumer.Close();
}
});
// Start the consumer thread
consumerThread.Start();
// Wait for a key press to exit
Console.WriteLine("Press any key to exit");
Console.ReadKey();
// Request cancellation and wait for the consumer thread to stop
cancellationTokenSource.Cancel();
consumerThread.Join();
Console.WriteLine("End Consumer");
}
In this context, it tells the consumer where it left off.
While developing your own producer/consumer, if you get an error message indicating the hostname cannot be resolved, check KAFKA_CFG_ADVERTISED_LISTENERS. If you still run into the same issue, consider updating the host file.
In this article, we successfully run Kafka in a docker container, produce a message to a topic, and consume a message from the topic. There are limitless applications of using Kafka. Go ahead and play with the project, and build something interesting of your own!