Issue
I'm creating beans dynamically with ConfigurableListableBeanFactory and registerSingleton method, after that I want to listen events with EventListener. In the example the Kafka Listener receives the message correctly but the EventListener isn't fired, this is because the BeanFactory doesn't have support for ApplicationEvents, so how can add EventListener to BeanFactory?
Regards!
@PostConstruct
public void setup() {
final ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
kafkaConfiguration.getTopics().keySet().forEach(key -> {
Topic topic = kafkaConfiguration.getTopics().get(key);
DefaultKafkaConsumerFactory<String, KafkaEntity> defaultKafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(topic.getClazz()));
beanFactory.registerSingleton(key, defaultKafkaConsumerFactory);
ConcurrentKafkaListenerContainerFactory<String, KafkaEntity> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(defaultKafkaConsumerFactory);
concurrentKafkaListenerContainerFactory.setConcurrency(topic.getConcurrency());
concurrentKafkaListenerContainerFactory.setAutoStartup(topic.isAutoStart());
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(kafkaConfiguration.getPollTimeout());
concurrentKafkaListenerContainerFactory.getContainerProperties().setIdleEventInterval(1000L);
beanFactory.registerSingleton(CONTAINER_FACTORY_NAME + key, concurrentKafkaListenerContainerFactory);
}
@KafkaListener(id = "kfktest", topics = "data_common-apibridge-service.board", idIsGroup = false,
containerFactory = KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME + "BOARD")
public void listen(@Payload(required = false) BoardMessage message,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// TODO message handler
}
@EventListener(condition = "event.listenerId.startsWith('kfktest-')")
public void eventHandler(ListenerContainerIdleEvent event) {
log.info(event.getListenerId());
}
Solution
After some readings I found how to do that, I use the BeanFactoryPostProcessor that defines dynamically all the beans I have in config server like:
public static BeanFactoryPostProcessor beanFactoryPostProcessor(@Value("${kafka.topicNames}") String topicNames) {
After that I complete the Beans in another @Configuration with @Postconstruct in order to have some dependencies already instantiated.
@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor(@Value("${kafka.topicNames}") String topicNames) {
return beanFactoryPostProcessor -> {
BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) beanFactoryPostProcessor;
List<String> topics = Arrays.asList(topicNames.split(Pattern.quote(",")));
topics.forEach(key -> {
beanDefinitionRegistry.registerBeanDefinition(KafkaConsumerConfiguration.DEFAULT_KAFKA_CONSUMER_FACTORY_NAME + key,
BeanDefinitionBuilder.genericBeanDefinition(KafkaConsumerFactory.class)
.getBeanDefinition());
beanDefinitionRegistry.registerBeanDefinition(KafkaConsumerConfiguration.CONTAINER_FACTORY_NAME + key,
BeanDefinitionBuilder.genericBeanDefinition(ConcurrentKafkaListenerContainerFactory.class)
.getBeanDefinition());
});
log.info("Topics bean defined {}", topics.size());
};
}
Regards!
Answered By - lexfaraday