Issue
I've been testing some of the Reactor backpressure stuff, and one common structure seems to be:
package com.example.backpressure;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
@SpringBootTest
class BackPressureApplicationTests {
@Test
public void backPressureApplicationTest1() {
Flux<Integer> request = Flux.range(1, 20).log();
request.subscribe(new BackPressureSubscriber<>());
}
}
class BackPressureSubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
public void hookOnNext(T value) {
System.out.println("Value is: " + value);
request(1);
}
}
The idea here is to apply backpressure by just allowing a new value to be received after having processed the current one (request(1)). It seems to work fine - and I am seeing output like this:
2022-10-03 20:11:51.908 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:11:51.909 INFO 2380 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:11:51.910 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onNext(1)
Value is: 1
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onNext(2)
Value is: 2
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onNext(3)
Value is: 3
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onNext(4)
Value is: 4
But what would it mean if I e.g. user request(3) in the hookOnNext?
public void hookOnNext(T value) {
System.out.println("Value is: " + value);
request(3);
}
When examining the output - it seems that the structure is identical (expect for 1=>3):
2022-10-03 20:10:37.701 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:10:37.703 INFO 19484 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onNext(1)
Value is: 1
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onNext(2)
Value is: 2
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onNext(3)
Value is: 3
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onNext(4)
Value is: 4
Question 1: If I understand the code correctly, it will - for each value received - tell the producer that it can send 3 more values. But will then this "quota" be aggregated somehow for the producer - since I request more values than I process? Does it really make sense at all to request more than 1 value in the hookOnNext handler?
I have seen people do things like this:
class BackPressureSubscriber<T> extends BaseSubscriber<T> {
int cnt = 0;
int numEachTime=3;
public void hookOnSubscribe(Subscription subscription) {
request(3);
}
public void hookOnNext(T value) {
System.out.println("Value is: " + value);
cnt++;
if(cnt%numEachTime == 0) {
request(3);
}
}
}
Which then gives output:
2022-10-03 20:21:59.681 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(1)
Value is: 1
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(2)
Value is: 2
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(3)
Value is: 3
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(4)
Value is: 4
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(5)
Value is: 5
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(6)
Value is: 6
Question 2: This processes 3 values before it requests 3 new values, but does this really give an advantage over requesting 1-by-1? Maybe some less latency due to less number of request calls?
Solution
Calling request(n)
from within a Subscriber S
tells a Publisher P
that is is allowed to call the onNext
method of S
at most n
times. Calling request(n)
again does not override a previous request: it tells P
that is allowed to produce another n
elements (i.e. 2*n
in total).
If n
is equal to 1, P
will call onNext
at most once and wait until more elements are requested by S
. This results in stop-and-wait which is usually not what you want. Subscribers should, therefore, request the maximum number of elements they are able to process. In fact, if S
can process elements faster than P
can procude them, S
might request a unbounded number of elements by calling request(Long.MAX_VALUE)
(ideally from its onSubscribe
method).
So, to answer your questions:
Yes, it makes sense to request multiple elements in order to reduce delays between S
and P
. While it may not make sense to call request(3)
unconditionally from within your onNext
method, image you have a buffer of, let's say, 100 elements. If you request 100 elements at the beginning (i.e. onSubscribe
) and if S
processes the elements slower than P
can produce them, your buffer might fill up to some number of elements. Then, let's say, that after some processing the remaining elements in the buffer reach 10. At this point you might request another 90: with a strategy like this your S
might never have to wait for P
to generate elements.
Fur further details, you might want to have a look at the specification from which much of this information here is taken from.
Answered By - Alex R
Answer Checked By - Senaida (JavaFixing Volunteer)