Keycloak and Kafka are two popular technologies that are widely used in modern microservice architectures. Keycloak provides a centralized solution for authentication and authorization, while Kafka is a scalable and high-throughput message queue system.
In this article, we will look at how to execute a scheduled task in Keycloak on startup using a Kafka consumer as an example. By combining these two technologies, you can build a secure and scalable microservice architecture that can handle a large volume of data.
One of Keycloak's capabilities is running scheduled tasks, which can perform various actions such as synchronization of data, maintenance tasks, and many others. Keycloak allows you to create custom providers to extend its functionality. In our case, we will create a custom provider to run the Kafka consumer as a scheduled task.
The Keycloak API provides events that can be used to trigger a scheduled task.
This class will just be a stub because the EventListenerProvider does not allow you to subscribe to ProviderEvents.
public class KeycloakConsumerEventListener implements EventListenerProvider {
private static final Logger log = Logger.getLogger(KeycloakConsumerEventListener.class);
public KeycloakConsumerEventListener(String topicKafka, Properties props) {
log.info("init custom event listener consumer");
}
@Override
public void onEvent(Event event) {
}
@Override
public void onEvent(AdminEvent adminEvent, boolean b) {
}
@Override
public void close() {
}
}
This class will be responsible for creating instances of the EventListenerProvider and starting the scheduled tasks. In the init method, we will define configs for our consumer.
public class KeycloakConsumerEventListenerFactory implements EventListenerProviderFactory {
private KeycloakConsumerEventListener keycloakConsumerEventListener;
private static final Logger log = Logger.getLogger(KeycloakConsumerEventListenerFactory.class);
private String topicKafka;
private String bootstrapServers;
private Properties properties;
private Consumer<String, String> consumer;
@Override
public EventListenerProvider create(KeycloakSession keycloakSession) {
if (keycloakConsumerEventListener == null) {
keycloakConsumerEventListener = new KeycloakConsumerEventListener(topicKafka, properties);
}
return keycloakConsumerEventListener;
}
@Override
public void init(Config.Scope scope) {
log.info("Init kafka consumer");
bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
topicKafka = System.getenv("KAFKA_TOPIC");
if (topicKafka == null || topicKafka.isEmpty()) {
throw new NullPointerException("topic is required.");
}
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
throw new NullPointerException("bootstrapServers are required");
}
properties = getProperties();
}
@Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
// define latter
}
@Override
public void close() {
}
@Override
public String getId() {
return "kafka-event-consumer";
}
private Properties getProperties() {
Properties propsKafka = new Properties();
propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
propsKafka.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsKafka.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsKafka.put(ConsumerConfig.GROUP_ID_CONFIG, "KeycloakKafkaExampleConsumer");
return propsKafka;
}
}
In the postInit method, initialize the Kafka consumer and start the scheduled task by using the PostMigrationEvent. This will ensure that the scheduled task is executed on startup after Keycloak has completed its database migration.
@Override
public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
keycloakSessionFactory.register(event -> {
if (event instanceof PostMigrationEvent) {
log.info("Init kafka consumer task");
if (consumer == null) {
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topicKafka));
}
KeycloakSession keycloakSession = keycloakSessionFactory.create();
TimerProviderFactory timerProviderFactory = (TimerProviderFactory) keycloakSessionFactory.getProviderFactory(TimerProvider.class);
//execute task every 10 seconds
timerProviderFactory.create(keycloakSession)
.scheduleTask(this::processRecords, 10000, "kafka-consumer");
}
});
}
private void processRecords(KeycloakSession session) {
log.debug("task execute");
final ConsumerRecords<String, String> consumerRecords =
this.consumer.poll(1000);
try {
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%s, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
} finally {
consumer.commitAsync();
}
}
Keycloak provides a powerful and flexible way to run scheduled tasks that can perform various actions such as synchronization of data, maintenance tasks, and many others. By combining it with Kafka, you can build a secure and scalable microservice architecture that can handle a large volume of data. By following the steps outlined in this article, you can execute a scheduled task in Keycloak on startup using a Kafka consumer as an example. This is just one example of how you can extend Keycloak's functionality, and the same steps can be applied to implement other scheduled tasks as well.