Issue
I have written an integration test for my kafka consumer using spring boot, with the spring-kafka libraries. This test uses EmbeddedKafka. A topic with one partition is used. I used the KafkaMessageListener container for this. But I am getting an error in this line
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic())
The error that I am getting is:
java.lang.IllegalStateException: Expected 1 but got 0 partitions.
The code which I referred to is: https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/
@EmbeddedKafka (partitions 1, ports = 9092)
@SpringBoot Test (properties="spring.kafka.bootstrap-servers=${spring.embedded.kakfa.brokers}")
@RunWith(SpringRunner.class)
@DirtiesContext
@Profile("test")
@TestPropertySource({"classpath:application.yaml"}}
public class Onboarding ConsumerListenerTest {
BlockingQueue<ConsumerRecord<String, String> records;
KafkaMessageListenerContainer<String, String> container;
@Autowired
protected EmbeddedkafkaBroker embeddedKafkaBroker;
public ConsumerFactory<String, String> consumerFactory;
@Value("${spring.kafka.client_topic}")
private String topicName;
@Value("${spring.kafka.group_id})
private String groupId;
@Before
public void setUp(){
consumerFactory = getKafkaConsumer(embeddedKafkaBroker, groupId, topicName);
ContainerProperties containerProperties = new ContainerProperties(topicName);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
consumerRecords = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<String, String>(){
@Override
public void onMessage(ConsumerRecord<String, String> record){
records.add(record);
}
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
}
And the getKafkaConsumer() function is:
public ConsumerFactory<String, String>
getKafkaConsumer(EmbeddedKafkaBroker
embeddedKafkaBroker,
String group,
String topic){
Map<string, Object> consumerProps =
KafkaTestUtils.consumerProps(group, "false",
embeddedkafkaNroker);
consumerProps.put(ConsumerConfig.BOOSTRAP_SERVER_CONFIG,
embeddedKafkaBroker.getBrokerAsString());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_COFIG, KafkaAvroDeserializer.class);
consumerProps.put("schema.registry.url", "bogus");
consumerProps.put("specific.avro.reader", true);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProps);
return consumerFactory;
}
Solution
It is hard to say what is happening without more information.
An alternative is to manually assign the partitions instead of waiting for group management to assign them:
ContainerProperties containerProperties =
new ContainerProperties(new TopicPartitionOffset(topicName, 0),
new TopicPartitionOffset(topicName, 1));
Answered By - Gary Russell
Answer Checked By - David Marino (JavaFixing Volunteer)