Issue
I have a Spring Kafka application and I'm getting an error when consumer tries to commit offset. Here is my Kafka consumer configuration:
KafkaConsumer:
@Bean
public ConsumerFactory<String, Item> consumerFactory() {
log.info("Configuring Kafka Consumer properties - consumerFactory");
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cpo-executor-groupid");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(Item.class, false));
}
KafkaAdmin:
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic processTopic() {
return TopicBuilder.name(topicName).partitions(2).build();
}
I know that this happen if the process take more than max.poll.interval.ms or session.timeout.ms from kafka, but it is not my case. My application take less than 1 second to consume and process the message:
Time: 11:00:32.773 Configuring Kafka Consumer properties - consumerFactory
Time: 11:00:39.433 INFO [cpo-executor,,] 55630 --- [ restartedMain] o.a.k.clients.consumer.ConsumerConfig
Time: 11:00:57.293 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator:
[Consumer clientId=consumer-cpo-executor-groupid-1, groupId=cpo-executor-groupid] Offset commit failed on partition process-topic-1 at offset 95:
The coordinator is not aware of this member.
Time: 11:00:57.299 ERROR [cpo-executor,,] 55630 --- [ntainer#0-0-C-1] c.c.cpoexecutor.config.KafkaErrHandler : Error in process with Exception org.apache.kafka.clients.consumer.CommitFailedException:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. and the records are []
What is happening since I didn't change any Kafka configuration and the default value for max.poll.interval.ms is 5 minutes?
kakfa: 2.13-2.8.0
spring-kafka: 2.7.6
spring: 2.4.2
Solution
I've changed the groupId name according to the environment, so I won't have the same groupId on the same kafka.
KafkaConsumer:
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offSetReset;
@Bean
public ConsumerFactory<String, Item> consumerFactory() {
log.info("Configuring Kafka Consumer properties - consumerFactory");
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offSetReset);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
application-local.yml:
spring:
kafka:
bootstrap-servers: #####:9092
consumer:
auto-offset-reset: earliest
group-id: cpo-executor-groupid-local
application-dev.yml:
spring:
kafka:
bootstrap-servers: #####:9092
consumer:
auto-offset-reset: earliest
group-id: cpo-executor-groupid-dev
application-prd.yml
spring:
kafka:
consumer:
bootstrap-servers: #####:9092
auto-offset-reset: earliest
group-id: cpo-executor-groupid-prd
Answered By - Aldo Inácio da Silva
Answer Checked By - Terry (JavaFixing Volunteer)