Issue
Sometimes messages can be filtered out before the deserialization based on header values . Are there any existing patterns for this scenario using spring kafka. I am thinking implementing similar to ErrorHandlingDeserializer in addition to delegate take filter predicate also as property. Any suggestions? thanks.
Solution
Yes, you can use the same technique used by the ErrorHandlingDeserializer
to return a "marker" object instead of doing the deserialization, then add a RecordFilterStrategy
, that filters records with such objects, to the listener (container factory when using @KafkaListener
or use a filtering adapter for an explicit listener).
EDIT
Spring Boot and adding a filter...
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
return factory;
}
Answered By - Gary Russell