Issue
While trying to use listener config properties in application.yml, I am facing an issue where the KafkaListener annotated method is not invoked at all if I use the application.yml config(listener.type= batch). It only gets invoked when I explicitly set setBatchListener to true in code. Here is my code and configuration.
- Consumer code:
@KafkaListener(containerFactory = "kafkaListenerContainerFactory",
topics = "${spring.kafka.template.default-topic}",
groupId = "${spring.kafka.consumer.group-id}")
public void receive(List<ConsumerRecord<String,byte[]>> consumerRecords,Acknowledgment acknowledgment){
processor.process(consumerRecords,acknowledgment);
}
- application.yml:
listener:
missing-topics-fatal: false
type: batch
ack-mode: manual
Consumer configuration:
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory( new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties())); factory.setErrorHandler(new SeekToCurrentErrorHandler( new UpdateMessageErrorHandler(),new FixedBackOff(idleEventInterval,maxFailures))); final ContainerProperties properties = factory.getContainerProperties(); properties.setIdleBetweenPolls(idleBetweenPolls); properties.setIdleEventInterval(idleEventInterval); return factory;
}
Solution
If I'm not mistaken, by using the ConcurrentKafkaListenerContainerFactory
builder in your configuration you're essentially overriding a piece of code that is usually executed within ConcurrentKafkaListenerContainerFactoryConfigurer
class within spring autoconfiguration package:
if (properties.getType().equals(Type.BATCH)) {
factory.setBatchListener(true);
factory.setBatchErrorHandler(this.batchErrorHandler);
} else {
factory.setErrorHandler(this.errorHandler);
}
Since it's hardcoded in your application.yaml
file anyway, why is it a bad thing for it to be configured in your @Configuration
file?
Answered By - Desomph
Answer Checked By - Mildred Charles (JavaFixing Admin)