Issue
I have a collection of streams which i am trying to ZIP together into 1 stream. I am using guava Streams.zip method to zip the stream. It works fine when number of streams in collection is below 8000, above 8000 it starts throwing stack overflow exception. On locally debugging i have found that the stack overflow is happening inside zip method. It successfully zip until 8000 streams and starts to throw exception after that. I am not able to find a workaround this or to why is it happening. Need some help around this to find. The guava zip code is here https://github.com/google/guava/blame/6d7e326b2cbfba5f19fc67859c0b3d4c45fab63f/guava/src/com/google/common/collect/Streams.java#L318
I tried local debugging. Converted all my lambda calls to vanlla for loop , so to confirm we are not calling anything recursively.Finally pin pointed that is is being caused by zip function.
Source code:
merge method which uses zip.
private static <T> Stream<T> merge(Stream<T> firstList, Stream<T> secondList) {
return Streams.zip(firstList, secondList, (first, second) -> {
if (first == null) {
return second;
}
return first.merge(second);
});
}
I am calling the merge method as this
Collections.singletonList(inlineList.stream()
.reduce(merge)
where inline list is list of streams.
Exception:
java.lang.StackOverflowError at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) at com.google.common.collect.Streams$1.tryAdvance(Streams.java:322) at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
Solution
It's important to remember that streams are data pipelines, not containers. That means when you call zip(s1, s2, f)
, it's not consuming the input streams; it's just returning a wrapper stream that reads from each input on demand and merges the results. Another key to keep in mind is that reduce()
only processes two elements at a time. Say you have 4 streams and want to reduce them by hand; it would look something like this:
Stream<T> m1 = merge(s1, s2);
Stream<T> m2 = merge(m1, s3);
Stream<T> m3 = merge(m2, s4);
Now, consider what happens when you want to read a single element from the final merged stream. m3
has to request an element from s4
and one from m2
, which in turn requires elements from s3
and m1
, which in turn requires elements from s2
and s1
, all in a single call stack. Stretch that out too far and you'll hit the maximum number of nested calls, resulting in a stack overflow.
I suspect this is an XY Problem and there's a cleaner way to achieve what you want that doesn't involve lazy merging of streams. But within the parameters of the question, one solution is to write a zipper that can handle n streams side by side instead of trying to stack them:
static <T> Stream<T> merge(List<Stream<T>> streams, BinaryOperator<T> mergeFunction) {
List<Iterator<T>> iters = streams.stream()
.map(Stream::iterator)
.collect(Collectors.toList());
return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, 0) {
@Override
public boolean tryAdvance(Consumer<? super T> action) {
Optional<T> next = iters.stream()
.filter(Iterator::hasNext)
.map(Iterator::next)
.reduce(mergeFunction);
next.ifPresent(action);
return next.isPresent();
}
}, false);
}
Note that this is a little different from zip()
in that the resulting stream is as long as the longest input stream instead of the shortest.
Answered By - shmosel
Answer Checked By - Candace Johnson (JavaFixing Volunteer)