Issue
I have supplied a bean for retry topic config:
@Bean
public RetryTopicConfiguration kafkaRetryTopicConfig(...) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(...)
.maxAttempts(..)
.useSingleTopicForFixedDelays()
.doNotRetryOnDltFailure()
.listenerFactory(factory)
.create(template);
}
I have also defined a bean for retry topic names provider factory:
// Need this because it is not just retry/DLT topic suffix but the entire name is configurable and specified in the configuration.
@Bean
public RetryTopicNamesProviderFactory retryTopicNamingProviderFactory() {
return new RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(DestinationTopic.Properties properties) {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
// NOTE: The retry and dead letter topic names come from config.
// Like to create different names for retry and dead letter topics.
// TODO: But how to distinguish which name I should return here!
return *******;
}
};
}
}
}
Two-part question:
a. Spring does not seem to correlate the beans. It is processing kafkaRetryTopicConfig
bean but is not using the retryTopicNamingProviderFactory
and custom topic names. It takes the default naming (.retry
and .deadLetter
). Is there any bean or component that I should declare so that the custom naming provider is picked up?
b. As indicated (TODO
) in the code above, how do I distinguish whether createRetryTopicNamesProvider
is called for retry or dead letter topic naming? I see that RetryTopicNamesProviderFactory
handles naming for both retry and dead letter topics.
Help much appreciated.
Solution
As you can see in the documentation, the framework provides a properties
object which you can inspect to find out whether you are on a main
, retry
or dlt
topic, as well as the configured suffixes
for the topic:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if(properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}
You also have access to the original topic
name, and the super.getTopicName(topic)
method that will give you the suffixed topic name.
You should be able to use this information to return the topic name you'd like.
If this isn't enough customization for your use case, please provide more details on how you need to configure the topic names.
EDIT: Also, please inform the Spring Kafka
version you're using - for version 2.9.x
and above the RetryTopicNamesProviderFactory
bean configuration is no longer supported.
EDIT 2: Addressing your comments, you should be able to tell the retry topic by using !properties.isMainTopic() && !properties.isDltTopic()
. You can open a feature request in the project's GitHub if you'd like an explicit isRetryTopic()
method.
From what I understand about your requisites, you should be able populate a map
where the key would be the topic name and type and the value is the configured topic name, and use that to return the proper name in the RetryTopicNamesProviderFactory
.
The components' configuration system for this feature was changed in 2.9
, and a decision was made to enforce the Experimental API disclaimer and not support the legacy mode anymore. This doesn't change the RetryTopicConfiguration
beans, just the bean-based component configurations. For 2.8
you should keep using the bean-based approach though.
You might want to consider upgrading to 2.9
though since it has a better configuration system in place for this feature, besides other improvements overall. This version is not attached to Spring Boot
's train and must be overridden manually.
The logic in RetryTopicBootstrapper that handles this in 2.8.x
is pretty straightforward - if there's a declared bean implementing this interface it should be fetched. Perhaps you can add a breakpoint there and make sure your bean is being considered.
Please let me know if that works for you, thanks.
Answered By - Tomaz Fernandes
Answer Checked By - David Marino (JavaFixing Volunteer)