Issue
I have the following Spring configuration for Kafka:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@RefreshScope
@ConfigurationProperties(prefix = "kafka.status.producer")
public class StatusKafkaProducerConfig {
private String keySerializer;
private String valueSerializer;
private String bootstrapServers;
public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}
public void setValueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return properties;
}
@Bean
@RefreshScope
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
}
I migrated the project to latest Spring cloud 2021.0.1 and Spring Boot 2.6.6. But I get the following error stack during boot time:
2022-04-30 19:14:08.350 ERROR 23684 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'producerFactory' defined in class path resource [com/test/StatusUpdateKafkaProducerConfig.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.core.ProducerFactory]: Factory method 'producerFactory' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:658) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:486) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:953) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.19.jar:5.3.19]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.19.jar:5.3.19]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:415) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.7.jar:2.6.7]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.7.jar:2.6.7]
at com.test.Application.main(Application.java:82) ~[main/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.core.ProducerFactory]: Factory method 'producerFactory' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.3.19.jar:5.3.19]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:653) ~[spring-beans-5.3.19.jar:5.3.19]
... 19 common frames omitted
Caused by: java.lang.NullPointerException: null
at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011) ~[na:na]
at java.base/java.util.concurrent.ConcurrentHashMap.putAll(ConcurrentHashMap.java:1089) ~[na:na]
at java.base/java.util.concurrent.ConcurrentHashMap.<init>(ConcurrentHashMap.java:852) ~[na:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.<init>(DefaultKafkaProducerFactory.java:205) ~[spring-kafka-2.8.5.jar:2.8.5]
at org.springframework.kafka.core.DefaultKafkaProducerFactory.<init>(DefaultKafkaProducerFactory.java:168) ~[spring-kafka-2.8.5.jar:2.8.5]
at com.test.StatusUpdateKafkaProducerConfig.producerFactory(TransactionStatusUpdateKafkaProducerConfig.java:53) ~[commons-0.0.1-plain.jar:na]
at com.test.StatusUpdateKafkaProducerConfig$$EnhancerBySpringCGLIB$$e907de88.CGLIB$producerFactory$0(<generated>) ~[commons-0.0.1-plain.jar:na]
at com.test.StatusUpdateKafkaProducerConfig$$EnhancerBySpringCGLIB$$e907de88$$FastClassBySpringCGLIB$$db10662a.invoke(<generated>) ~[commons-0.0.1-plain.jar:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.3.19.jar:5.3.19]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-5.3.19.jar:5.3.19]
at com.StatusUpdateKafkaProducerConfig$$EnhancerBySpringCGLIB$$e907de88.producerFactory(<generated>) ~[commons-0.0.1-plain.jar:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282) ~[spring-core-5.3.19.jar:5.3.19]
at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:485) ~[spring-cloud-context-3.1.2.jar:3.1.2]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.19.jar:5.3.19]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.19.jar:5.3.19]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708) ~[spring-aop-5.3.19.jar:5.3.19]
at com.StatusUpdateKafkaProducerConfig$$EnhancerBySpringCGLIB$$6800bd04.producerFactory(<generated>) ~[commons-0.0.1-plain.jar:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.3.19.jar:5.3.19]
... 20 common frames omitted
Do you know how I can fix this issue?
Solution
The error stack trace is indicating that a NullPointerException
is happening when you are creating a new instance of DefaultKafkaProducerFactory
in the producerFactory()
method.
Internally, DefaultKafkaProducerFactory
creates a ConcurrentHashMap
with the provided information to maintain its configuration.
As you can see in the ConcurrentHashMap
javadocs, the put
method will raise NullPointerException
if any of the provided keys or values are null
.
As a consequence, probably one of or several of the variables keySerializer
, valueSerializer
or bootstrapServers
are null.
I assume these values are filled by dependency injection according to your @ConfigurationProperties
kafka.status.producer
. Please, be sure these properties are correctly defined in your Spring configuration.
For testing purposes, try removing the @RefreshScope
annotation: it may be a cause of the problem.
The contrary can be a valid solution as well. As indicated in the Spring Cloud documentation when talking about @RefreshScope
:
@RefreshScope
works (technically) on a@Configuration
class, but it might lead to surprising behavior. For example, it does not mean that all the@Beans
defined in that class are themselves in@RefreshScope
. Specifically, anything that depends on those beans cannot rely on them being updated when a refresh is initiated, unless it is itself in@RefreshScope
. In that case, it is rebuilt on a refresh and its dependencies are re-injected. At that point, they are re-initialized from the refreshed@Configuration
).
From that point of view if make perfect sense to annotate with @RefreshScope
the producerFactory
bean as well. I would try:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@RefreshScope
@ConfigurationProperties(prefix = "kafka.status.producer")
public class StatusKafkaProducerConfig {
private String keySerializer;
private String valueSerializer;
private String bootstrapServers;
public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}
public void setValueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
@Bean
@RefreshScope
public Map<String, Object> producerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return properties;
}
@Bean
@RefreshScope
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
@RefreshScope
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
}
Please, note the inclusion of the additional @RefreshScope
annotations.
Finally, consider review the Spring Kafka project documentation, it provides several examples about how to configure your project, whether you are using Spring Boot or not, and gives you further background about DefaultKafkaProducerFactory
.
As you can see in the aforementioned docs, if you are using Spring Boot, like in you case, the code can be greatly simplified. Consider review the Spring Boot
related documentation about the subject and related properties configuration.
Answered By - jccampanero
Answer Checked By - Willingham (JavaFixing Volunteer)