Issue
I have the following adapter:
@Component
class MyCustomFlow : IntegrationFlowAdapter() {
fun singleThreadTaskExecutor(): TaskExecutor {
val executor = ThreadPoolTaskExecutor()
executor.maxPoolSize = 1
executor.initialize()
return executor
}
@Filter
fun filter(data: SomeData): Boolean = ...
@Transformer
fun transform(customer: Data): Message<SomeData> {
....
}
@ServiceActivator
fun handle(data: Data): SomeData {
....
}
@Bean(name = [PollerMetadata.DEFAULT_POLLER])
fun poller(): PollerSpec? {
return Pollers.fixedRate(500)
}
@Bean
override fun buildFlow(): IntegrationFlowDefinition<*> {
return from(MessageChannels.queue("updateCustomersLocation"))
.channel(MessageChannels.executor(singleThreadTaskExecutor()))
.split()
.filter(this)
.transform(this)
.handle(this)
.channel("customerLocationFetched")
}
}
And if I understand the Java DSL, the handle
is:
handle → ServiceActivator
Given a ServiceActivator, outside of the IntegrationFlowAdapter, I have an option to define a poller
. Within that poller I could add a delay before the next messages is processed.
@ServiceActivator(
poller = [Poller(fixedDelay = "3000", maxMessagesPerPoll = "1", fixedRate = "3000")]
)
Would it be possible, within the adapter, to add a delay for the ServiceActivator
(the method named handle
, similar to the logic I get if I add another channel.
@Bean
override fun buildFlow(): IntegrationFlowDefinition<*> {
return from(MessageChannels.queue("updateCustomersLocation"))
.channel(MessageChannels.executor(singleThreadTaskExecutor()))
.split()
.filter(this)
.transform(this)
.channel("myNewChannel")
//.handle(this)
//.channel("customerLocationFetched")
}
And then from outside the adapter I could just define a new ServiceActivator
:
@ServiceActivator(
inputChannel = "myNewChannel"
poller = [Poller(fixedDelay = "3000", maxMessagesPerPoll = "1", fixedRate = "3000")]
)
The reason I want a delay between the messages in the handle/service activator is that the method sends requests and I wanna control the rate the request are sent.
Solution
First of all the buildFlow()
method must not be marked with @Bean
: the IntegrationFlowAdapter
does the trick to register everything.
You can add poller
to the endpoint spec. See a second arg of that handle()
. Only the point that input channel for this polling endpoint must be pollable - the QueueChannel
exists out-of-the-box and you can place it just before your handle()
.
Answered By - Artem Bilan
Answer Checked By - Timothy Miller (JavaFixing Admin)