Issue
Context
We are using a spring integration flow to poll for new files in the input directory. The file name is passed to an existing spring batch job that fetches the file and handles the business processing.
return IntegrationFlows.from(
Files.inboundAdapter(new File(properties.getInputDir())).filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(300, TimeUnit.SECONDS).maxMessagesPerPoll(50)))
.log(LoggingHandler.Level.INFO, "Inb_GW_Msg", Message::getPayload)
.channel(c -> c.executor(jobsTaskExecutor))
.transform(fileMessageToJobRequest)
.handle(jobLaunchingGateway, e -> e.advice(jobExecutionAdvice()))
.log(LoggingHandler.Level.INFO, "Inb_GW_Msg_Processing_Result")
.get();
the transformer sets the input file path as a job parameter
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
We are trying to modify the flow to use a remote SFTP server as the file source. The spring batch job uses an outbound spring integration gateway to GET the remote file using the path based on the recommendations provided here Spring Integration - Concurrent access to SFTP outbound gateway GET w/ STREAM and accessing the response from Queue Channel and it works well.
Issue
While trying to modify the inbound poller to fetch file names from the remote sftp server, we are having issues trying to find the appropriate inbound adapter.
The use of Sftp.inboundAdapter(SessionFactory<ChannelSftp.LsEntry> sessionFactory)
sets up a SftpInboundFileSynchronizingMessageSource
that would synchronize a remote file system to the local directory. This behavior is not desirable for our needs and we are looking for a remote poller that would merely fetch the file name similar to the local file system poller.
I understand that the SFTP outbound gateway supports the `ls' operation and one way to solve the polling needs is to implement a custom message source/poller that connects to the outbound gateway periodically to fetch the directory listing.
Are there any out-of-the-box adapters that support this requirement?
Seeking recommendations on simpler ways to achieve our polling needs without writing a lot of custom code and/or modifying the existing spring batch job.
Update
Looks like SftpStreamingMessageSource
set up by Sftp.inboundStreamingAdapter
would return the remote file metadata along with the input stream. I can simply discard/close the stream right away and use the metadata to kick off the spring batch job.
I will try this out and post an update. Appreciate feedback on this design idea.
Solution
Use the outbound gateway with the list (LS) command.
https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#using-the-ls-command
EDIT
@Bean
IntegrationFlow flow(DefaultSftpSessionFactory sf) {
return IntegrationFlows.fromSupplier(() -> "dir", e -> e.poller(Pollers.fixedDelay(5000)))
.handle(Sftp.outboundGateway(sf, Command.LS, "payload")
.options(Option.NAME_ONLY))
.split()
.log()
.get();
}
Answered By - Gary Russell
Answer Checked By - David Marino (JavaFixing Volunteer)