Issue
I'm trying to handle poison pill scenario with spring-kafka.
currently I'm handling this with below approach, here the failed messages getting pushed to a different topic named <original-topic>.DLT
.
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
So instead of pushing failed message to <original-topic>.DLT
topic, I want to get it and push it in DB directly.
I tried to get the failed message but no success. can anybody help here. TIA.
Solution
Simply implement your own ConsumerRecordRecoverer
and use it in the error handler instead of the DeadLetterPublishingRecoverer
.
/**
* A {@link BiConsumer} extension for recovering consumer records.
*
* @author Gary Russell
* @since 2.3
*
*/
@FunctionalInterface
public interface ConsumerRecordRecoverer extends BiConsumer<ConsumerRecord<?, ?>, Exception> {
}
Answered By - Gary Russell
Answer Checked By - Willingham (JavaFixing Volunteer)