Issue
I'm unable to publish to Dlq topic while using ErrorHandlingDeserializer for handling the errors with combination of Avro. Below is the error while publishing.
Topic TOPIC_DLT not present in metadata after 60000 ms. ERROR KafkaConsumerDestination{consumerDestinationName='TOPIC', partitions=6, dlqName='TOPIC_DLT'}.container-0-C-1 o.s.i.h.LoggingHandler:250 - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@49abe531]; nested exception is java.lang.RuntimeException: failed, failedMessage=GenericMessage
And here is the application.yml
spring:
cloud:
stream:
bindings:
process-in-0:
destination: TOPIC
group: groupID
kafka:
binder:
brokers:
- xxx:9092
configuration:
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
jaas:
loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
options:
username: username
password: pwd
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bindings:
process-in-0:
consumer:
configuration:
basic.auth.credentials.source: USER_INFO
schema.registry.url: registryUrl
schema.registry.basic.auth.user.info: user:pwd
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
max-attempts: 1
dlqProducerProperties:
configuration:
basic.auth.credentials.source: USER_INFO
schema.registry.url: registryUrl
schema.registry.basic.auth.user.info: user:pwd
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
deserializationExceptionHandler: sendToDlq
ackEachRecord: true
enableDlq: true
dlqName: TOPIC_DLT
autoCommitOnError: true
autoCommitOffset: true
I'm using the following dependencies:
spring-cloud-dependencies - 2021.0.1
spring-boot-starter-parent - 2.6.3
spring-cloud-stream-binder-kafka
kafka-schema-registry-client - 5.3.0
kafka-avro-serializer - 5.3.0
Im not sure what exactly im missing.
Solution
After going through a lot of documentation, I found out that for spring to do the job of posting DLQ, we need to have the same number of partitions for both Original topic and DLT Topic. And if it can't be done then we need to set dlqPartitions to 1 or manually provide the DlqPartitionFunction bean. By providing dlqPartitions: 1 all the messages will go to partition 0.
Answered By - Srikanth G
Answer Checked By - Robin (JavaFixing Admin)