Issue
I need to be able to customise the id for each method annotated with @KafkaListener
based on the value of one of it's attributes as well as a value defined in the application.yaml
, for example :
Having a class with a method annotated like so :
@KafkaListener(info="myInfo")
public void listen(String msg){
}
And a custom property in my application.yaml
myapp:
myProperty: myProp
At runtime I would like to have the id for the registered endpoint consumer to be myInfo_myProp
rather than the autogenerated one that is produced when I do not explicitly provide one in the attributes of the annotation.
What would the best way to achieve this? I was thinking of extending the KafkaListenerEndpointRegistrar
or the BeanPostProcessor
?
Thanks
Solution
Override the endpoint registry bean and manipulate the endpoint properties there, before calling the super class.
Here's an example:
@SpringBootApplication
public class So72719215Application {
public static void main(String[] args) {
SpringApplication.run(So72719215Application.class, args);
}
@Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
KafkaListenerEndpointRegistry registry(@Value("${myApp.myProperty}") String myProperty) {
return new KafkaListenerEndpointRegistry() {
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
AbstractKafkaListenerEndpoint<?, ?> akle = (AbstractKafkaListenerEndpoint<?, ?>) endpoint;
akle.setId(new String(akle.getListenerInfo()) + "_" + myProperty);
akle.setGroupId("group_" + myProperty);
super.registerListenerContainer(endpoint, factory);
}
};
}
@KafkaListener(topics = "so72719215", info = "foo")
void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so72719215").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry reg) {
return args -> {
System.out.println(reg.getListenerContainerIds());
};
}
}
Answered By - Gary Russell
Answer Checked By - Senaida (JavaFixing Volunteer)