Issue
help i try to connected with Spring Integration Ibm mq, what i doing:
public Publisher<Message<String>> jmsReactiveSource() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(ibmConnectionFactory)
.destination("DEV.QUEUE.1"))
.channel(MessageChannels.queue())
.log(org.springframework.integration.handler.LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@GetMapping("/event")
public Mono<String> getEvent() {
return Mono.from(jmsReactiveSource())
.log()
.map (Message::getPayload);
}
@GetMapping("/pub")
public void produce() {
jmsTemplateIbm.convertAndSend("DEV.QUEUE.1", "MESSAGE");
}
When I call /event after calling / pub nothing happens, the message is not deducted, what am I doing wrong, i need to make ibm mq non-blocking, since a large amount of resources is spent waiting for responses from ibm mq.
Solution
See Spring Integration documentation about Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl. We everywhere in our samples talk about making an IntegrationFlow
as a @Bean
. That's only the way an IntegrationFlowBeanPostProcessor
can understand that DSL definition and register respective beans in the application context. The whole class with those @GetMapping
methods and this @Bean
for the jmsReactiveSource()
have to be as a @Configuration
one to make that jmsReactiveSource()
method call as bean request from the application context. Otherwise you should consider to move this flow bean into the separate @Configuration
class if you don't want to make this @RestController
as such one. Then you would need to autowire that Publisher<Message<String>>
bean into this controller class.
UPDATE
For your second concern about an early consumption, you have to mark that Jms.messageDrivenChannelAdapter()
as autoStartup(false)
and start it from your Mono
in the getEvent()
using its doOnRequest()
. There is an id()
option on the adapter for your consideration to autowire that endpoint properly. You are right: even if it is a reactive in the end, the beginning is not and must be treated respectively to not consume until requested.
You also may consider to return Flux
instead of Mono
: the whole process is considered unbounded. Such a REST request should be also changed to the SSE: https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-codecs-streaming
Answered By - Artem Bilan