public class JavaKafkaTest {
@Rule
public KafkaContainer kafka;
@Before
public void setUp() throws Exception {
kafka = new KafkaContainer("5.4.2");
kafka.start();
}
}
(Kafka version
5.4.2
) and start the docker, all in few lines of codes.
2.5.0
instance, now I will demonstrate full class.
KafkaContainer
public class JavaKafkaTest {
@Rule
public KafkaContainer kafka;
@Before
public void setUp() throws Exception {
kafka = new KafkaContainer("5.4.2");
kafka.start();
Properties adminProperties = new Properties();
adminProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
AdminClient adminClient = KafkaAdminClient.create(adminProperties);
adminClient.createTopics(
Stream.of("plaintext-input")
.map(n -> new NewTopic(n, 1, (short) 1))
.collect(Collectors.toList())
).all().get();
}
@Test
public void testKafkaTestcontainer() throws InterruptedException {
prepareSeedMessages();
final Topology topology = prepareKafkaTopology();
final KafkaStreams streams = new KafkaStreams(topology, consumerProps());
streams.start();
Thread.sleep(5000);
}
private void prepareSeedMessages() {
KafkaProducer producer = new KafkaProducer(producerProps());
producer.send(new ProducerRecord("plaintext-input", "this is sparta"));
producer.send(new ProducerRecord("plaintext-input", "this is sparta"));
producer.send(new ProducerRecord("plaintext-input", "this is sparta"));
producer.close();
}
private Topology prepareKafkaTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder
.<String, String>stream("plaintext-input")
.peek((k, v) -> {
Logger log = Logger.getLogger(Thread.currentThread().getName());
log.info(String.format("receive message from plaintext-input : %s", v));
})
.flatMapValues(v -> Arrays.asList(v.split("\\W+")))
.peek((k, v) -> {
Logger log = Logger.getLogger(Thread.currentThread().getName());
log.info(String.format("receive message from count : %s", v));
})
.groupBy((k, v) -> v)
.count();
return streamsBuilder.build();
}
private Properties consumerProps() {
Properties consumerProps = new Properties();
consumerProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
consumerProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
consumerProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "sample-app");
consumerProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
return consumerProps;
}
private Properties producerProps() {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return producerProperties;
}
@After
public void tearDown() {
kafka.stop();
}
}
method, all the necessary components are prepared.
setUp
must be started, before other calls. Remember
KafkaContainer
will be executed for every test method, so that will make our tests immutable.
@Before
is the runnable part. This is where you do stuff and assert all the things you need. This example demonstrates complete publish and subscribe in a single method.
testKafkaContainer
procedure will populate the Kafka queue with three strings. I set the standard, required attributes: key serializer, value serializer, application id, and bootstrap server. It is important that when setting the producer’s properties, I took a bootstrap server from Kafka container instance, instead of hard-coding it by yourself.
prepareSeedMessages
will break each token by whitespaces. I then can group by each word and do count.
flatMapValues
method to stop the Kafka container. This method will destroy the container and delete all the data inside this container.
tearDown
and
@BeforeClass
. Using
@AfterClass
and
@Before
like this example will results in container created and destroyed along every single test method execution. It will make running this test class very long. Instead we can declare construct once for each test class.
@After