Issue
I will subscribe to kafka topic pattern such as "topic.*" My goal here to create deadletter queue for each kafka topic I listen.
For example when I listen topic named "topic.1" I would like to autocreate deadletter queue named "topic.1_deadletter" automaticly.
What I tried to do so far is like below:
My consumer:
@Component
@Slf4j
public class LibraryEventsConsumer {
@Autowired
LibraryEventConsumerConfig libraryEventConsumerConfig;
@KafkaListener(topicPattern = "kafka.*")
public void onMessage(String consumerRecord, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception{
log.info("ConsumerRecord : {}", consumerRecord);
String deadlettertopic = String.format("%s_deadletter",topic);
System.out.println(deadlettertopic);
System.out.println(KafkaHeaders.RECEIVED_TOPIC);
libraryEventConsumerConfig.getTopic(topic);`
Here with the method getTopic I am trying to autocreate kafka topic. And below you can see libraryEventConsumer class:
@Configuration
@EnableKafka
public class LibraryEventConsumerConfig {
@Bean
public void getTopic(String topic){
NewTopic deadlettertopic = TopicBuilder.name(String.format("%s_deadletter",topic))
.partitions(1)
.replicas(1)
.build();
}
}
Unfortunately that approach did not work and I got below error message:
Parameter 0 of method getTopic in com.kafkalibrary.Config.LibraryEventConsumerConfig required a bean of type 'java.lang.String' that could not be found.
Any idea how to proceed on that?
Solution Example Code:
For those who are looking for same goal, here is my example code: Thanks to Gary Russell for inspiration.
private static void createTopic(String topicName, int numPartitions) throws Exception {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:5052,localhost:5053,localhost:5054");
AdminClient admin = AdminClient.create(config);
//checking if topic already exists
boolean alreadyExists = admin.listTopics().names().get().stream()
.anyMatch(existingTopicName -> existingTopicName.equals(topicName));
if (alreadyExists) {
System.out.printf("topic already exits: %s%n", topicName);
} else {
//creating new topic
System.out.printf("creating topic: %s%n", topicName);
NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
admin.createTopics(Collections.singleton(newTopic)).all().get();
}
Solution
Add a rebalance listener, or extend AbstractConsumerSeekAware
(or just implement ConsumerSeekAware
).
public class LibraryEventsConsumer extends AbstractConsumerSeekAware {
Then, in onPartitionsAssigned()
use an AdminClient
to check if the DLT topic exists and, if not, create it.
Answered By - Gary Russell