Issue
I'm running spring boot with KafkaListener as my client. The question is how can we recover from a failed kafka configuration and avoid that the application stops with Process finished with exit code 0
.
An example of an incorrect config would e.g. an incorrect endpoint url. Same scenario would apply if the Kafka server would not be reachable. So in any case the KafkaListner process should never kill the server.
@Bean
open fun consumerFactory(): ConsumerFactory<String, String> {
val deserializer = JsonDeserializer<Thing>()
deserializer.addTrustedPackages("de.data.Thing")
val props: MutableMap<String, Any> = HashMap()
val serverUrl = server.substringBefore(":")
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
props[ConsumerConfig.GROUP_ID_CONFIG] = "group"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"\$ConnectionString\" " +
"password=\"Endpoint=sb://$serverUrl/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";"
return DefaultKafkaConsumerFactory(props,
StringDeserializer(), StringDeserializer())
}
@Bean
open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
factory.setMessageConverter(BytesJsonMessageConverter())
return factory
}
@KafkaListener(topics = ["topic"],
groupId = "group",
containerFactory = "kafkaListenerContainerFactory",
)
fun listenThingsChanged(@Payload thing: Thing,
record: ConsumerRecord<String, String>,
@Headers headers: MessageHeaders) {
....
}
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) at de.x.ServerAppKt.main(ServerApp.kt:11) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:606) at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ... 19 common frames omitted Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:737) ... 33 common frames omitted
Solution
If the broker is just down, the application will start fine (with versions earlier than 2.3.4 you had to set missingTopicsFatal
to false on the container properties (it has been false by default since then).
No resolvable bootstrap urls given in...
This is fatal - it is irrecoverable.
However, you can set autoStartup=false
- either on the @KafkaListener
or on the container factory.
This will prevent Spring from trying to start the containers during application initialization.
You can then start the containers yourself in a try/catch block.
Answered By - Gary Russell