Issue
Have implemented KafkaProducer and KafkaConsumers using springboot.
Once message received at the consumer I am invoking REST API for saving the record into MongoDB.
But All the records posted to Kafka consumer are not getting saved to mongoDB.
Ex: out of 5 records posted to kafkaconsumer only 3 records saved to mongoDB.
Attaching the kafkaconsumer code:
KafkaConsumerConfiguration
@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
@Value("${spring.kafka.hostname}")
private String kafkaHostname;
public static final String GROUP_ID_CONFIG = "group_json1";
public static final String ENABLE_AUTO_COMMIT_CONFIG = "false";
public static final String AUTO_OFFSET_RESET_CONFIG = "earliest";
@Bean
public ConsumerFactory<String, UserKafkaDTO> userConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname);
config.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
JsonDeserializer<UserKafkaDTO> deserializer = new JsonDeserializer<>(UserKafkaDTO.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserKafkaDTO> userKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserKafkaDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object>kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setBatchListener(true);
return factory;
}
}
KafkaConsumerListener
@EnableKafka
@Configuration
public class KafkaConsumerListener {
@KafkaListener(topics = { "topicName" }, containerFactory = "userKafkaListenerFactory",autoStartup = "${listen.auto.start}")
public void consumeJson(UserKafkaDTO userKafkaDTO) {
invokeAPItoSaveRecordTOMongoDB(userKafkaDTO);
}
Solution
Try change the
factory.setBatchListener(true);
to
factory.setBatchListener(false);
because your listener handle single object only.
Answered By - Hong
Answer Checked By - Cary Denson (JavaFixing Admin)