Issue
In the below flow, the file split, aggregation works perfectly fine with new (different) files. When a processed file is placed again and picked by the Poller
, it goes into the FileSplitterSpec()
but never reaches the FileAggregator
.
Are there any markers placed on the processed file, that it is failing to reach the aggregator the next time?
return IntegrationFlows.from(txnChannel())
.split(fileSplitterSpec())
.filter(payload -> !(payload instanceof FileMarker), e -> e.discardChannel("aggregatorChannel"))
.<String> filter(StringUtils::isNotBlank, e -> e.discardChannel("aggregatorChannel"))
.transform(transformer, "transform")
.handle((payload, headers) -> {
headers.get("indHeader", DTO.class).getTxn().add(payload);
return payload;
})
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
@Bean
public FileSplitterSpec fileSplitterSpec() {
return Files.splitter()
.markers()
.firstLineAsHeader("someHeader");
}
Solution
Yes, the correlation key for group aggregation is based on the file name. You need to use different file name or consider to configure an aggregator for expireGroupsUponCompletion(true)
. The new FileAggregator()
should go to the processor()
option of the aggregator spec.
Answered By - Artem Bilan
Answer Checked By - Katrina (JavaFixing Volunteer)