paint-brush
Handling Custom Type Mapping in Kafka Listeners for Messages with "TypeId" Headerby@dstepanov
7,827 reads
7,827 reads

Handling Custom Type Mapping in Kafka Listeners for Messages with "TypeId" Header

by Stepanov DmitriiJuly 11th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

The serialization process relies on both the producer and the consumer having access to the same class definitions. When the consumer receives a message with a "TypeId" header referencing a class that resides in a different package than the consumer's DTO classes, the deserialization process fails to locate the appropriate class definition. This results in the "ClassNotFoundException" or "IllegalArgumentException" within the serialization exception. Understanding the root cause of this exception is key to effectively handling the situation.
featured image - Handling Custom Type Mapping in Kafka Listeners for Messages with "TypeId" Header
Stepanov Dmitrii HackerNoon profile picture


Kafka is widely used for building event-driven microservices architectures, allowing services to communicate through messages. However, when working with a foreign provider that adds a "TypeId" header to messages, you may encounter serialization and deserialization issues.


This article will guide you through the process of configuring a custom type mapper in Spring Kafka to handle the mapping of types between the producer and consumer services. By implementing this approach, you can successfully consume messages with different package structures and types, avoiding exceptions caused by incompatible types and ensuring smooth message processing.


Understanding the deserialization Exception with "TypeId" Header


In scenarios where the DTO classes used in the consumer service have a different package structure compared to the producer service's DTO classes, the serialization exception can occur due to the mismatched class paths.


The serialization process relies on both the producer and the consumer having access to the same class definitions. When the consumer receives a message with a "TypeId" header referencing a class that resides in a different package than the consumer's DTO classes, the deserialization process fails to locate the appropriate class definition. This results in the "ClassNotFoundException" or "IllegalArgumentException" within the serialization exception.


These exceptions can disrupt the smooth consumption of messages. Understanding the root cause of this exception is key to effectively handling the situation.


An example of the serialization exception is as follows:

javaCopy codejava.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
	at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198) ~[spring-kafka-3.0.7.jar:3.0.7]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition consumer-test-18 at offset 1. If needed, please seek past the record to continue consumption.
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1438) ~[kafka-clients-3.4.0.jar:na]
Caused by: java.lang.IllegalArgumentException: The class 'org.example.dto.out.OutMessageImplTwo' is not in the trusted packages: [java.util, java.lang, org.example.dto.in, org.example.dto.in.*]. 
If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129) ~[spring-kafka-3.0.7.jar:3.0.7]


The exception message provides insights into the underlying problem. In this case, the exception occurs because the class org.example.dto.out.OutMessageImplTwo is not present in the list of trusted packages. The trusted packages serve as a safeguard to ensure secure deserialization and prevent unauthorized classes from being deserialized.


When deserializing a message, the default behavior of Spring Kafka is to check if the class specified in the "TypeId" header is within the trusted packages. If it is not found, an exception is thrown. This mechanism is in place to protect against potential security risks, as deserializing untrusted classes can lead to code execution vulnerabilities.


To resolve this issue, you need to ensure that the consumer can properly map the incoming message to the corresponding class, even if they have different package structures. One approach is to configure a custom type mapper that explicitly handles the type mapping between the "TypeId" header and the consumer's DTO classes.


By implementing a custom type mapper, you can instruct the consumer to map the "TypeId" header to the appropriate class or even ignore this header. This enables successful deserialization and ensures that the consumer can process the message correctly.


In the upcoming sections, we will delve into the details of configuring and implementing a custom type mapper in Kafka listeners, allowing seamless message processing and addressing the challenges encountered when the consumer's DTO classes differ from those of the producer.


Implementation of custom-type mapper

In our example, we will send an implementation of OutMessageAbstract to Kafka and will read it as an implementation of the InMessageAbstract class. These classes are absolutely identical, they differ in name and package.


The figure below shows the project structure.



Structure of the project



Our KafkaListener looks like this. It implements the logic for processing each type of message from the topic.


@Component
@Log
@KafkaListener(
        topics = "${spring.cloud.stream.bindings.topic-in-0.destination}",
        groupId = "${spring.cloud.stream.bindings.topic-in-0.group}",
        containerFactory = "kafkaListenerContainerFactory")
public class EventListeners {
    @KafkaHandler
    public void handleInMessageImplEvent(@Payload InMessageImpl event) {
        System.out.println(event);
    }

    @KafkaHandler
    public void handleInMessageImplTwoEvent(@Payload InMessageImplTwo event) {
        System.out.println(event);
    }
}



We use the kafkaListenerContainerFactory bean to configure our listener. This factory is described in KafkaConsumerConfiguration and looks like this:



@Configuration
public class KafkaConsumerConfiguration {
    private final String SERVER;
    private final String SERVER_PORT;

    public KafkaConsumerConfiguration(
            @Value("${spring.cloud.stream.kafka.binder.brokers}") String server,
            @Value("${spring.cloud.stream.kafka.binder.defaultBrokerPort}") String port
    ) {
        this.SERVER = server;
        this.SERVER_PORT = port;
    }

    private <T> ConsumerFactory<String, T> typeConsumerFactory(Class<T> clazz) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:%s", SERVER, SERVER_PORT));

        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                new JsonDeserializer<>(clazz));
    }

    private <T> ConcurrentKafkaListenerContainerFactory<String, T> initFactory(Class<T> clazz) {
        ConcurrentKafkaListenerContainerFactory<String, T> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(typeConsumerFactory(clazz));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ?> kafkaListenerContainerFactory() {
        return initFactory(InMessageAbstract.class);
    }
}


Because we're adding a “Typeld” header to the message, we'll get the error described above when we try to deserialize. However, we can configure the JsonDeserializer manually. To do this, we must define our implementation of TypeMapper. The following shows how you can define your own TypeMappers.


class CustomTypeMapper<T> extends DefaultJackson2JavaTypeMapper {
    private final Class<T> clazz;

    public CustomTypeMapper(Class<T> clazz) {
        this.clazz = clazz;
    }

    @Override
    public JavaType toJavaType(Headers headers) {
        try {
            return TypeFactory.defaultInstance()
                    .constructType(ClassUtils.forName(clazz.getName(), getClassLoader()));
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
}


If we add our TypeMapper implementation to the JsonDeserializer when we create the factory, then we can force the Json to be deserialized to the class we need, ignoring the header. The corrected method would look like this:


private <T> ConsumerFactory<String, T> typeConsumerFactory(Class<T> clazz) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:%s", SERVER, SERVER_PORT));

    JsonDeserializer<T> jsonDeserializer = new JsonDeserializer<>(clazz);
    jsonDeserializer.setTypeMapper(new CustomTypeMapper<>(clazz));

    return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            jsonDeserializer);
}


Conclusion

By utilizing a custom type mapper in Spring Kafka, you can overcome the challenges associated with consuming messages from foreign providers that include a "TypeId" header. This article has provided a step-by-step guide to configuring and implementing the custom type mapper, enabling smooth message processing and avoiding serialization and deserialization exceptions. With this knowledge, you can confidently integrate with external systems and consume Kafka messages with different package structures.


The full project code is available on GitHub: https://github.com/stepanovD/kafka-templates/tree/master/consumer-custom-type-mapper