Issue
I'm having trouble with my Kafka producer. For some reason, the value.serializer is being set to StringSerializer despite me specifically setting it to JsonSerializer in both the producerConfig and the application.yml as you can see below
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
public ProducerFactory<String, CustomType > producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, CustomType > kafkaTemplate(ProducerFactory<String, CustomType > producerFactory){
return new KafkaTemplate<>(producerFactory);
}
}
And once again in the application.yml file, just for good measure
spring:
kafka:
producer:
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
The problem lies here in the last line of this producerConfig; somehow the value.serializer is now StringSerializer.
ProducerConfig values:
acks = -1
batch.size = 65536
bootstrap.servers = [***********,***********]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = lz4
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 52428800
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
Unsuprisingly this is causing a org.apache.kafka.common.errors.SerializationException: Can't convert value of class CustomType to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
I can confirm that the correct profile is active from The following 1 profile is active: "test"
I also pinged the bootStrapServers and both of them are up and running
Another thing is that I searched all files in my projects and the only place where StringSerializer is even used is in the producerConfig when setting the KEY_SERIALIZER_CLASS_CONFIG.
Just a bit more context, below is the function I use in order to send the kafka message
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaTelemetryRawSend {
@Autowired
private KafkaTemplate<String, CustomType > kafkaTemplate;
public void send(CustomType customType){
System.out.println("kafka sending");
kafkaTemplate.send("topic", customType);
}
}
Any help/suggestions to put me in the right direction would be much appreciated
Solution
I think the mistake is that your producerConfig()
is not being used (try to debug it), because your producerFactory()
method does not have @Bean
annotation and Spring creates own Bean
in org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
class:
@Bean
@ConditionalOnMissingBean({ProducerFactory.class})
public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {...}
This KafkaAutoConfiguration
class has field org.springframework.boot.autoconfigure.kafka.KafkaProperties
- it is your application.yml binding with spring.kafka...
properties, which is used for create bean ProducerFactory
. But your application.yml
has an invalid property value.serializer
instead of value-serializer
.
Solution
Try to add @Bean
annotation on your producerFactory()
method or fix name of value.serializer
property to value-serializer
.
But I recommend you use Spring KafkaProperties
in your @Configuration
class and define kafka properties in the application.yml
file:
@Configuration
public class KafkaProducerConfig {
private final KafkaProperties kafkaProperties;
public KafkaConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ProducerFactory<String, CustomType> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public KafkaTemplate<String, CustomType> kafkaTemplate(ProducerFactory<String, CustomType> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
with application.yml
:
spring:
kafka:
producer:
batch-size: ...
...
properties:
...
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers:
- localhost:9092
Answered By - Alexander Bobryakov
Answer Checked By - Timothy Miller (JavaFixing Admin)