How to Execute a Scheduled Task in Keycloak on Startup

Written by dstepanov | Published 2023/02/09
Tech Story Tags: keycloak | kafka | microservices | microservice-architecture | authentication | authorization | api | kafka-consumer

TLDRKeycloak and Kafka are two popular technologies that are widely used in modern microservice architectures. By combining these two technologies, you can build a secure and scalable microservice architecture that can handle a large volume of data. In this article, we will look at how to execute a scheduled task in Keycloak on startup using a Kafka consumer as an example.via the TL;DR App

Keycloak and Kafka are two popular technologies that are widely used in modern microservice architectures. Keycloak provides a centralized solution for authentication and authorization, while Kafka is a scalable and high-throughput message queue system.

In this article, we will look at how to execute a scheduled task in Keycloak on startup using a Kafka consumer as an example. By combining these two technologies, you can build a secure and scalable microservice architecture that can handle a large volume of data.

One of Keycloak's capabilities is running scheduled tasks, which can perform various actions such as synchronization of data, maintenance tasks, and many others. Keycloak allows you to create custom providers to extend its functionality. In our case, we will create a custom provider to run the Kafka consumer as a scheduled task.

ProviderEvents and PostMigrationEvent

The Keycloak API provides events that can be used to trigger a scheduled task.

  • ProviderEvents is a general-purpose event that can be triggered at various points in the Keycloak lifecycle, such as on startup, after authentication, and after authorization. However, it is important to note that ProviderEvents is not supported by EventListenerProvider, which is a Keycloak component used to listen to events.

  • PostMigrationEvent is an extension of ProviderEvents. PostMigrationEvent is triggered after Keycloak has completed a database migration. This event can be used to perform any necessary post-migration tasks, such as cleaning up old data or updating indexes. By using either of these events, you can schedule tasks to run automatically at specific points in the Keycloak lifecycle, allowing you to perform actions that are required for your application.

Steps to Execute Scheduled Task in Keycloak on Startup

Create a mock EventListenerProvider

This class will just be a stub because the EventListenerProvider does not allow you to subscribe to ProviderEvents.

public class KeycloakConsumerEventListener implements EventListenerProvider {

    private static final Logger log = Logger.getLogger(KeycloakConsumerEventListener.class);


    public KeycloakConsumerEventListener(String topicKafka, Properties props) {
        log.info("init custom event listener consumer");
    }



    @Override
    public void onEvent(Event event) {

    }

    @Override
    public void onEvent(AdminEvent adminEvent, boolean b) {
    }

    @Override
    public void close() {

    }
}

Create an EventListenerProviderFactory

This class will be responsible for creating instances of the EventListenerProvider and starting the scheduled tasks. In the init method, we will define configs for our consumer.

public class KeycloakConsumerEventListenerFactory implements EventListenerProviderFactory {
    private KeycloakConsumerEventListener keycloakConsumerEventListener;

    private static final Logger log = Logger.getLogger(KeycloakConsumerEventListenerFactory.class);

    private String topicKafka;
    private String bootstrapServers;
    
    private Properties properties;

    private Consumer<String, String> consumer;

    @Override
    public EventListenerProvider create(KeycloakSession keycloakSession) {
        if (keycloakConsumerEventListener == null) {
            keycloakConsumerEventListener = new KeycloakConsumerEventListener(topicKafka, properties);
        }
        return keycloakConsumerEventListener;
    }

    @Override
    public void init(Config.Scope scope) {
        log.info("Init kafka consumer");


        bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
        

        topicKafka = System.getenv("KAFKA_TOPIC");

        if (topicKafka == null || topicKafka.isEmpty()) {
            throw new NullPointerException("topic is required.");
        }
        if (bootstrapServers == null || bootstrapServers.isEmpty()) {
            throw new NullPointerException("bootstrapServers are required");
        }
        
        properties = getProperties();

    }

    @Override
    public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
        // define latter
    }

    @Override
    public void close() {

    }

    @Override
    public String getId() {
        return "kafka-event-consumer";
    }

    private Properties getProperties() {
        Properties propsKafka = new Properties();
        propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        propsKafka.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsKafka.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsKafka.put(ConsumerConfig.GROUP_ID_CONFIG, "KeycloakKafkaExampleConsumer");

        
        return propsKafka;
    }
}

Override the postInit method of the Factory

In the postInit method, initialize the Kafka consumer and start the scheduled task by using the PostMigrationEvent. This will ensure that the scheduled task is executed on startup after Keycloak has completed its database migration.

@Override
    public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
        keycloakSessionFactory.register(event -> {
            if (event instanceof PostMigrationEvent) {
                log.info("Init kafka consumer task");
  
                if (consumer == null) {
                    consumer = new KafkaConsumer<>(properties);

                    consumer.subscribe(Collections.singletonList(topicKafka));
                }

                KeycloakSession keycloakSession = keycloakSessionFactory.create();

                TimerProviderFactory timerProviderFactory = (TimerProviderFactory) keycloakSessionFactory.getProviderFactory(TimerProvider.class);
                //execute task every 10 seconds
                timerProviderFactory.create(keycloakSession)
                        .scheduleTask(this::processRecords, 10000, "kafka-consumer");
            }
        });
    }

    private void processRecords(KeycloakSession session) {
        log.debug("task execute");
        final ConsumerRecords<String, String> consumerRecords =
                this.consumer.poll(1000);

        try {
            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%s, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });
        } finally {
            consumer.commitAsync();
        }
    }

Conclusion

Keycloak provides a powerful and flexible way to run scheduled tasks that can perform various actions such as synchronization of data, maintenance tasks, and many others. By combining it with Kafka, you can build a secure and scalable microservice architecture that can handle a large volume of data. By following the steps outlined in this article, you can execute a scheduled task in Keycloak on startup using a Kafka consumer as an example. This is just one example of how you can extend Keycloak's functionality, and the same steps can be applied to implement other scheduled tasks as well.


Written by dstepanov | 10+ years of experience in software development including architecture, design, implementation, and maintenance.
Published by HackerNoon on 2023/02/09