Issue
I have a simple consumer in Spring working. I have a config class defined with a bunch of factories, etc. When I remove the config class, the consumer still works. I'm wondering the benefit of having the factory, ie:
@Bean
public ConcurrentKafkaListenerContainerFactory<String,
GenericRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
public ConsumerFactory<String, GenericRecord> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(retrieveConsumerConfigs());
}
and now just passing vals in via application properties and calling it a day. I have explicit control over the config in the class-based approach, but was also thinking I could drop the class and have the vals be available through the spring env variables like spring.kafka.bootstrapservers, for example.
Solution
The container factory is required for @KafkaListener
methods.
Spring Boot will auto-configure one (from application.properties/yml) if you don't provide your own bean. See KafkaAutoConfiguration
.
Boot will also configure the consumer factory (if you don't).
An application, typically, does not need to declare any infrastructure beans.
EDIT
I prefer to never declare my own infrastructure beans. If I need some feature that is not exposed as a Boot property, or where I want to override some property for just one container, I simply add a customizer bean.
@Component
class Customizer {
public Customizer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
factory.setContainerCustomizer(container -> {
if (container.getContainerProperties().getGroupId().equals("slowGroup")) {
container.getContainerProperties().setIdleBetweenPolls(60_000);
}
});
}
}
or
@Component
class Customizer {
Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> containerFactory,
ThreadPoolTaskExecutor exec) {
containerFactory.getContainerProperties().setConsumerTaskExecutor(exec);
}
}
etc.
Answered By - Gary Russell
Answer Checked By - David Marino (JavaFixing Volunteer)