Issue
I am using Spring Cloud Stream version 3.0.6.RELEASE
.
I have an existing exchange called my.queue.exchange
. My application contains one consumer, I want to create a queue called MY_QUEUE
and bind that queue to my.queue.exchange
exchange. Additionally, I want to republish failed messages to a DLQ called MY_QUEUE_DLQ
.
My problem is that the dead letter queue called MY_QUEUE_DLQ
is bound to my.queue.exchange.dlx
exchange with two routing keys instead of one, the first one with routing key my.queue.rkey.dlx
and the second one with routing key MY_QUEUE
My consumer bean:
@Bean
public Consumer<Dto> consumeFunction() {
return dto -> {
// do stuff
};
}
my application.yml
:
spring:
cloud:
stream:
function:
definition: consumeFunction
rabbit:
bindings:
consumeFunction-in-0:
consumer:
autoBindDlq: true
deadLetterQueueName: MY_QUEUE_DLQ
deadLetterExchange: my.queue.exchange.dlx
deadLetterRoutingKey: my.queue.rkey.dlx
deadLetterExchangeType: topic
declareExchange: false
bindQueue: true
queueNameGroupOnly: true
bindingRoutingKey: 'my.queue.rkey'
bindings:
consumeFunction-in-0:
destination: my.queue.exchange
group: MY_QUEUE
Solution
You need to set republishToDlq: false
on the rabbit consumer properties.
In the RabbitExchangeQueueProvisioner
you can see the following code (see the comment_
if (properties instanceof RabbitConsumerProperties
&& ((RabbitConsumerProperties) properties).isRepublishToDlq()) {
/*
* Also bind with the base queue name when republishToDlq is used, which
* does not know about partitioning
*/
declareBinding(dlqName, new Binding(dlq.getName(), DestinationType.QUEUE,
dlxName, baseQueueName, arguments));
}
You can also get more details here as to why.
Also, I see you are using
spring:
cloud:
stream:
function:
definition: consumeFunction
Please change it to
spring:
cloud:
function:
definition: consumeFunction
as the other property is deprecated and is already removed in 3.2 version.
Answered By - Oleg Zhurakousky