When learning a new technology, sometimes I just want to see it work. It gives me a baseline to extend my ideas, to see what is possible, and to imagine what it can become.
While reading through documentation or following tutorials, I am often greeted with numerous options and configurations. After seeing all the dazzling capabilities, I always want to see the core, the fundamental. Didn’t you ever just want to experience a simple working copy? Hence, I just want to see it work.
This series aims at minimizing the possibility of having a missing link and encourages you to build your next innovative solution based on what you learned here.
According to Kafka, Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.
Navigate to where the docker-compose.yml is. Run the below command in cmd to start a docker container of Kafka in the background.
docker-compose up -d
version: "3"
services:
kafka:
image: "bitnami/kafka:3.2.3"
hostname: kafka
ports:
- "9092:9092"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
Navigate to where KafkaStream.Producer.csproj
is at. Start a Powershell and run the following command.
dotnet run
When you see the message: “Message delivered to input-topic”, that means you did it!
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "producer-1"
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
var message = new Message<string, string>
{
Key = null, // Set the key if you want to partition the messages
Value = "Hello, Kafka!"
};
try
{
var deliveryResult = await producer
.ProduceAsync("input-topic", message);
Console.WriteLine($"Message delivered to
{deliveryResult.TopicPartitionOffset}");
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
}
}
It serves as an entry point for a client to connect to a Kafka broker.
Navigate to where KafkaStream.Consumer.csproj is at. Start a Powershell and run the following command.
dotnet run
The initialization will take a few seconds. Then you should see the following messages.
Consuming messages from topic: input-topic
Press any key to exit
Received message: Hello, Kafka! from partition [0] offset 0
There you go!
const string Topic = "input-topic";
// Configure the consumer
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "consumer-group-1",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // Disable auto commit to have more control over offsets
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// Subscribe to the topic
consumer.Subscribe(Topic);
Console.WriteLine($"Consuming messages from topic: {Topic}");
// Start consuming messages in a background thread
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var consumerThread = new Thread(() =>
{
try
{
while (true)
{
try
{
var consumeResult =
consumer.Consume(cancellationToken);
Console.WriteLine($"Received message:
{consumeResult.Message.Value} from partition
{consumeResult.Partition} offset
{consumeResult.Offset}";
// Manually commit the offset to mark the message as consumed
consumer.Commit(consumeResult);
}
catch (ConsumeException ex)
{
Console.WriteLine($"Error occurred:
{ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// This exception will be thrown when cancellation is requested.
Console.WriteLine("Cancellation requested");
}
finally
{
consumer.Close();
}
});
// Start the consumer thread
consumerThread.Start();
// Wait for a key press to exit
Console.WriteLine("Press any key to exit");
Console.ReadKey();
// Request cancellation and wait for the consumer thread to stop
cancellationTokenSource.Cancel();
consumerThread.Join();
Console.WriteLine("End Consumer");
}
In this context, it tells the consumer where it left off.
While developing your own producer/consumer, if you get an error message indicating the hostname cannot be resolved, check KAFKA_CFG_ADVERTISED_LISTENERS. If you still run into the same issue, consider updating the host file.
In this article, we successfully run Kafka in a docker container, produce a message to a topic, and consume a message from the topic. There are limitless applications of using Kafka. Go ahead and play with the project, and build something interesting of your own!