Issue
I'm getting up an application consuming kafka messages.
I followed Spring-docs about Deserialization Error Handling in order to catch deserialization exception. I've tried the failedDeserializationFunction method.
This is my Consumer Configuration Class
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
/* Error Handling */
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);
return consumerProps;
}
@Bean
public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(NTCMessageBody.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
This is the BiFunction Provider
public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {
@Override
public NTCMessageBody apply(byte[] t, Headers u) {
return new NTCBadMessageBody(t);
}
}
public class NTCBadMessageBody extends NTCMessageBody{
private final byte[] failedDecode;
public NTCBadMessageBody(byte[] failedDecode) {
this.failedDecode = failedDecode;
}
public byte[] getFailedDecode() {
return this.failedDecode;
}
}
When I send just one corrupted message on the topic I got this error (in loop):
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value
I understood that the ErrorHandlingDeserializer2 should delegate the NTCBadMessageBody type and continue the consumption. I also saw (in debug mode) it didn't never go in the constructor of the NTCBadMessageBody class.
Solution
When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.
So according to your code you are using record-level MessageListener
then just add ErrorHandler
to Container
If your error handler implements this interface you can, for example, adjust the offsets accordingly. For example, to reset the offset to replay the failed message, you could do something like the following; note however, these are simplistic implementations and you would probably want more checking in the error handler.
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
Or you can do custom implementation like in this example
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
System.out.println("OKKKKK");
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
//log.info("Skipping " + topic + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
System.out.println("OKKKKK");
}
});
return factory;
}
Answered By - Deadpool
Answer Checked By - Mary Flores (JavaFixing Volunteer)