Issue
Example below returns buffer only after first 5 elements have been buffered. How to make it return buffers for the first 5 elements also?
@Test
public void testBuffer() {
Flux.range(0, 10)
.buffer(5,1)
.doOnNext(i -> log.info("tape {}", i))
.blockLast();
}
Returns:
tape [0, 1, 2, 3, 4]
tape [1, 2, 3, 4, 5]
tape [2, 3, 4, 5, 6]
tape [3, 4, 5, 6, 7]
tape [4, 5, 6, 7, 8]
tape [5, 6, 7, 8, 9]
tape [6, 7, 8, 9]
tape [7, 8, 9]
tape [8, 9]
tape [9]
desired result:
tape [0]
tape [0, 1]
tape [0, 1, 2]
tape [0, 1, 2, 3]
tape [0, 1, 2, 3, 4]
tape [1, 2, 3, 4, 5]
tape [2, 3, 4, 5, 6]
tape [3, 4, 5, 6, 7]
tape [4, 5, 6, 7, 8]
tape [5, 6, 7, 8, 9]
tape [6, 7, 8, 9]
tape [7, 8, 9]
tape [8, 9]
tape [9]
Solution
I do not know of any available operator to do such thing.
However, it is possible to combine multiple operators to produce this behaviour :
- use Flux#scan to create a sliding/growing buffer. Scan is very useful to accumulate source values in a very customisable way. Note that, however, it is not possible to trigger more buffering/values once the last value has been received. Therefore:
- We need to create a simple operator that takes the last emitted buffer, and replay it, removing heading values one by one.
- We need to combine 1. and 2. to obtain the entire sliding. Note that, as I have not found any operator to concatenate a flux produced by the last emitted value of another (in essence, a concatWith which would receive previous flux last value), I will use cache here.
WARNING I have absolutely not measured performance impact of such a strategy, it may be bad.
Here is a working prototype :
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Arrays;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
public class ReactorAccumulator {
public static void main(String[] args) {
Flux<Integer[]> growingSlider = Flux.range(0, 10)
.scan(new SlidingBuffer<>(5, Integer[]::new), SlidingBuffer::accumulate)
.skip(1)
.map(SlidingBuffer::getBuffer);
print("Growing slider", growingSlider);
var decreasingSlider = growingSlider.last()
.flatMapMany(buffer -> slideDecrease(buffer, 1));
print("Decreasing slider", decreasingSlider);
var slider = thenDecrease(growingSlider);
print("Growing then decreasing slider", slider);
}
private static <V> void print(String title, Flux<V[]> dataStream) {
var nl = System.lineSeparator();
var log = dataStream
.map(Arrays::toString)
.collect(Collectors.joining(nl, "-- "+ title + nl, nl + "--"))
.block(Duration.ofSeconds(1));
System.out.println(log);
}
private static <V> Flux<V[]> slideDecrease(V[] buffer, int startIndex) {
if (buffer.length <= startIndex) return Flux.empty();
else return Flux.range(startIndex, buffer.length - startIndex)
.map(from -> Arrays.copyOfRange(buffer, from, buffer.length));
}
private static <V> Flux<V[]> thenDecrease(Flux<V[]> dataStream) {
var cacheLast = dataStream.cache(1);
return cacheLast.concatWith(cacheLast.last()
.flatMapMany(buffer -> slideDecrease(buffer, 1)));
}
static final class SlidingBuffer<V> {
private final int maxSize;
private final V[] buffer;
SlidingBuffer(int maxSize, IntFunction<V[]> creator) {
this(maxSize, creator.apply(0));
}
private SlidingBuffer(int maxSize, V[] buffer) {
this.maxSize = maxSize;
this.buffer = buffer;
}
SlidingBuffer<V> accumulate(V newValue) {
final V[] nextBuffer = (buffer.length < maxSize)
? Arrays.copyOf(buffer, buffer.length + 1)
: Arrays.copyOfRange(buffer, 1, maxSize + 1);
nextBuffer[nextBuffer.length - 1] = newValue;
return new SlidingBuffer<>(maxSize, nextBuffer);
}
V[] getBuffer() {
// Def copy: might not be needed if you trust consumer code not to modify the array directly
return Arrays.copyOf(buffer, buffer.length);
}
}
}
That produces output:
-- Growing slider
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
[2, 3, 4, 5, 6]
[3, 4, 5, 6, 7]
[4, 5, 6, 7, 8]
[5, 6, 7, 8, 9]
--
-- Decreasing slider
[6, 7, 8, 9]
[7, 8, 9]
[8, 9]
[9]
--
-- Growing then decreasing slider
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
[2, 3, 4, 5, 6]
[3, 4, 5, 6, 7]
[4, 5, 6, 7, 8]
[5, 6, 7, 8, 9]
[6, 7, 8, 9]
[7, 8, 9]
[8, 9]
[9]
--
Answered By - amanin
Answer Checked By - Cary Denson (JavaFixing Admin)