Issue
I am running on spring-kafka:2.6.7
and I am looking for a way to set a custom task executor for my listener. Below is my Kafka configuration.
@Bean
ProducerFactory<Integer, BaseEventTemplate> eventProducerFactory() {
Map<String, Object> producerProps = new HashMap<>()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class)
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BaseEventTemplateSerializer.class)
producerProps.put(ProducerConfig.ACKS_CONFIG, "all")
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 256)
return new DefaultKafkaProducerFactory<>(producerProps)
}
@Bean
KafkaTemplate<Integer, BaseEventTemplate> baseEventKafkaTemplate() {
return new KafkaTemplate<>(eventProducerFactory())
}
@Bean
ConsumerFactory<Integer, BaseEventTemplate> baseEventConsumerFactory() {
Map<String, Object> consumerProps = new HashMap<>()
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaeventconsumer")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BaseEventTemplateDeserializer.class)
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Collections.singletonList(RoundRobinAssignor.class))
return new DefaultKafkaConsumerFactory<>(consumerProps)
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, BaseEventTemplate> baseEventKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, BaseEventTemplate> factory =
new ConcurrentKafkaListenerContainerFactory<>()
factory.setConsumerFactory(baseEventConsumerFactory())
factory.setConcurrency(3)
factory.getContainerProperties().setPollTimeout(3000)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)
factory.getContainerProperties().setSyncCommits(true)
return factory
}
I have a way to set consumer task executor via factory.getContainerProperties().setConsumerTaskExecutor()
but not sure how to set task executor for listener.
Solution
2.6.x is out of OSS support https://spring.io/projects/spring-kafka#support
The same thread used to poll the consumer is used to call the listener.
In very early versions (before 1.3), there were two threads due to limitations in the kafka-clients, but there is only one now (for the last 5 years).
Answered By - Gary Russell
Answer Checked By - Senaida (JavaFixing Volunteer)