Issue
I'm trying to get a messages from Kafka topic, but for some reason I get the following error:
2022-06-28 14:17:52.044 INFO 1 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-api1-1, groupId=api1] Seeking to offset 1957 for partition ActiveProxySources-0
2022-06-28T14:17:52.688451744Z 2022-06-28 14:17:52.687 ERROR 1 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff none exhausted for ActiveProxySources-0@1957
2022-06-28T14:17:52.688499949Z
2022-06-28T14:17:52.688511943Z org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.freeproxy.parser.model.kafka.KafkaMessage]; nested exception is java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
2022-06-28T14:17:52.688544511Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688555996Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.checkDeser(KafkaMessageListenerContainer.java:2738) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688564633Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2612) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688573552Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2544) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688582961Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2429) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688591538Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2307) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688600362Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1981) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688610882Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1365) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688620353Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1356) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688629357Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688637662Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
2022-06-28T14:17:52.688646009Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
2022-06-28T14:17:52.688655783Z at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
2022-06-28T14:17:52.688664349Z Caused by: org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.freeproxy.parser.model.kafka.KafkaMessage]; nested exception is java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
2022-06-28T14:17:52.688674537Z at org.springframework.kafka.support.serializer.SerializationUtils.deserializationException(SerializationUtils.java:150) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688683348Z at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:204) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688699174Z at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688707618Z at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688718316Z at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688728359Z at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688736716Z at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688748228Z at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688758573Z at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1304) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688768278Z at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688776576Z at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.1.jar!/:na]
2022-06-28T14:17:52.688785598Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1521) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688793960Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1511) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688802367Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1339) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688811023Z ... 4 common frames omitted
2022-06-28T14:17:52.688819230Z Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.freeproxy.parser.model.kafka.KafkaMessage]; nested exception is java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
2022-06-28T14:17:52.688828306Z at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688837754Z at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688846335Z at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688854685Z at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:201) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688862907Z ... 16 common frames omitted
2022-06-28T14:17:52.688870692Z Caused by: java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
2022-06-28T14:17:52.688888550Z at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) ~[na:na]
2022-06-28T14:17:52.688898662Z at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[na:na]
2022-06-28T14:17:52.688907289Z at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:151) ~[java.jar:na]
2022-06-28T14:17:52.688915418Z at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
2022-06-28T14:17:52.688923583Z at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
2022-06-28T14:17:52.688931577Z at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
2022-06-28T14:17:52.688939753Z at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.19.jar!/:5.3.19]
2022-06-28T14:17:52.688948555Z at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) ~[spring-kafka-2.8.5.jar!/:2.8.5]
2022-06-28T14:17:52.688957079Z ... 19 common frames omitted
2022-06-28T14:17:52.688964715Z
I have other applications that send and read messages on Kafka topics with the same settings and they all work fine, but not this application. Ideally, I want to read messages from two Kafka topics (messages in both topics look the same and contain the same objects), but even when I try to read messages from one topic, I get the error shown above. The settings are follows:
class KafkaMessage {
String id
IdStatus status
}
@Service
@Slf4j
class ConsumerService {
Set<String> activeProxies = []
int getActiveProxiesNumber() {
activeProxies.size()
}
Set<String> activeProxySources = []
int getActiveProxySourcesNumber() {
activeProxySources.size()
}
@KafkaListener(topics = "ActiveProxies"/*, containerFactory = "KafkaListenerContainerFactoryActiveProxies"*/)
public void consumeProxyId(KafkaMessage message) {
log.info("Consuming ${message.id}: ${message.status}")
if (message.status == IdStatus.ADD) {
activeProxies.add(message.id)
}
if (message.status == IdStatus.DELETE) {
activeProxies.remove(message.id)
}
}
@KafkaListener(topics = "ActiveProxySources"/*, containerFactory = "KafkaListenerContainerFactoryActiveProxySources"*/)
public void consumeProxySourceId(KafkaMessage message) {
log.info("Consuming ${message.id}: ${message.status}")
if (message.status == IdStatus.ADD) {
activeProxySources.add(message.id)
}
if (message.status == IdStatus.DELETE) {
activeProxySources.remove(message.id)
}
}
}
TopicConfig:
@Configuration
public class TopicConfig {
@Value(value = "kafka:9092")
private String bootstrapAddress
@Value(value = "ActiveProxies")
private String activeProxies
@Value(value = "ActiveProxySources")
private String activeProxySources
// @Bean
// public KafkaAdmin kafkaAdmin() {
// Map<String, Object> configs = new HashMap<>();
// configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
// return new KafkaAdmin(configs);
// }
@Bean
public NewTopic ActiveProxiesTopic() {
return TopicBuilder.name(activeProxies)
.partitions(1)
.replicas(1)
.config(org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG, "60000")
.build()
}
@Bean
public NewTopic ActiveProxySourcesTopic() {
return TopicBuilder.name(activeProxySources)
.partitions(1)
.replicas(1)
.config(org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG, "60000")
.build()
}
}
application.properties file:
server.port=30329
spring.data.mongodb.database=free-proxy-engine
spring.kafka.bootstrap-servers=kafka:9092
spring.kafka.consumer.group-id=consumer-Api1
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
I use Docker-Compose to run Kafka and all other applications
docker-compose.yaml:
version: '2'
services:
mongodb:
image: mongo:5.0.9
restart: unless-stopped
api:
image: openjdk:11
depends_on:
- mongodb
- kafka
restart: unless-stopped
volumes:
- ./libs/api-0.0.1-SNAPSHOT.jar:/gjava/java.jar
environment:
spring_data_mongodb_host: mongodb
spring_kafka_consumer_group-id: api1
command: /bin/bash -c "cd /gjava && chmod +x /gjava/*.jar && java -jar /gjava/java.jar"
ports:
- 30329:30329
zookeeper:
image: confluentinc/cp-zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka
restart: always
hostname: kafka
depends_on:
- zookeeper
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
I created my own Consumer configuration file for Kafka, but the error remained when I tried to read messages from two topics and from one.
@EnableKafka
@Configuration
class KafkaConsumerConfig {
@Value(value = "kafka:9092")
private String bootstrapAddress
@Bean
public ConsumerFactory<String, KafkaMessage> ConsumerFactoryActiveProxies() {
Map<String, Object> props = new HashMap<>()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Api-1")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class)
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class)
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName())
return new DefaultKafkaConsumerFactory<>(props/*,
new StringDeserializer(),
new JsonDeserializer<>(KafkaMessage.class)*/)
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage>
KafkaListenerContainerFactoryActiveProxies() {
ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory
= new ConcurrentKafkaListenerContainerFactory<>()
factory.setConsumerFactory(ConsumerFactoryActiveProxies())
factory.setMessageConverter(new StringJsonMessageConverter())
return factory
}
@Bean
public ConsumerFactory<String, KafkaMessage> ConsumerFactoryActiveProxySources() {
Map<String, Object> props = new HashMap<>()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Api-2")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class)
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class)
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName())
return new DefaultKafkaConsumerFactory<>(props/*,
new StringDeserializer(),
new JsonDeserializer<>(KafkaMessage.class)*/)
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage>
KafkaListenerContainerFactoryActiveProxySources() {
ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory
= new ConcurrentKafkaListenerContainerFactory<>()
factory.setConsumerFactory(ConsumerFactoryActiveProxySources())
factory.setMessageConverter(new StringJsonMessageConverter())
return factory
}
}
I will be grateful for your help.
Solution
By default, the deserializer will use type information in headers to determine which type to create.
Caused by: java.lang.ClassNotFoundException: com.freeproxy.parser.model.kafka.KafkaMessage
Most likely, KafkaMessage is in a different package on the sending side.
There are a couple of solutions:
https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config
Set
JsonDeserializer.USE_TYPE_INFO_HEADERS
tofalse
andJsonDeserializer.VALUE_DEFAULT_TYPE
tocom.new.package.kafka.KafkaMessage
(the fully qualified name ofKafkaMessage
on the receiving side).Use type mapping: https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-mapping-types
I suggest you read this whole section https://docs.spring.io/spring-kafka/docs/current/reference/html/#json-serde
Answered By - Gary Russell
Answer Checked By - Marilyn (JavaFixing Volunteer)