Issue
I have an endpoint which return a ResponseBodyEmitter
:
@GetMapping("/foo/stream")
public ResponseEntity<ResponseBodyEmitter> getFoos() {
ResponseBodyEmitter rbe = new ResponseBodyEmitter();
executor.execute(() -> {
try {
rbe.send(foo1);
Thread.sleep(2000);
rbe.send(foo2);
Thread.sleep(2000);
rbe.send(foo3);
Thread.sleep(2000);
}
rbe.complete();
} catch (Exception ex) {
rbe.completeWithError(ex);
}
});
return ResponseEntity.ok(rbe);
}
A custom filter (FooValidationFilter
) is created to perform checks on foo's object when this endpoint is called:
@Slf4j
@AllArgsConstructor
public class FooValidationFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
ContentCachingRequestWrapper requestToCache = new ContentCachingRequestWrapper(request);
ContentCachingResponseWrapper responseToUse = new ContentCachingResponseWrapper(response);
filterChain.doFilter(request, responseToUse);
if (!responseToUse.isCommitted() &&
responseToUse.getStatus() >= 200 && responseToUse.getStatus() < 300 &&
HttpMethod.GET.matches(request.getMethod())) {
Scanner scanner = new Scanner(responseToUse.getContentInputStream());
String fooField;
do {
fooField = scanner.findWithinHorizon(REGEX, 0);
// perform some checks
} while (fooField != null);
}
if (requestToCache.isAsyncStarted()) {
requestToCache.getAsyncContext().addListener(new AsyncListener() {
public void onComplete(AsyncEvent asyncEvent) throws IOException {
responseToUse.copyBodyToResponse();
}
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
}
public void onError(AsyncEvent asyncEvent) throws IOException {
}
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
}
});
} else {
responseToUse.copyBodyToResponse();
}
}
@Override
protected boolean shouldNotFilterAsyncDispatch() {
return false;
}
Note:
In debug mode, the
FooValidationFilter
is called after eachResponseBodyEmitter.send()
.In normal mode, the
FooValidationFilter
is invoked only one time. So, the checks on some Foo objects will be bypassed.
Am I missing something which is causing this issue ?
Solution
As per Spring Docs for OncePerFilterRequest :
It says that this filter base class aims to guarantee a single execution per request dispatch, on any servlet container.
But in some case, it can happen that the other servlets are also using the same filter due to that it can be called multiple times.
Now, coming to your issue. I suspect the issue is with the way you are debugging ExecutorService
that is making the FooValidationFilter
to be called multiple times in debug mode and not in normal mode.
Debugging ExecutorService
is difficult sometimes because in debug mode, its execution can't be predicted. So, you don't think that if it's called multiple times in debug mode, then it will be called multiple times in normal mode.
I tried to reproduce with the code that you have provided. I have used STS Eclipse IDE to debug the scenario. The filter is called for two times in normal mode.
- Normal Mode :
Controller logic :
@RestController
public class Resource {
private ExecutorService executor = Executors.newCachedThreadPool();
@GetMapping("/foo/stream")
public ResponseEntity<ResponseBodyEmitter> handleRbe() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
executor.execute(() -> {
try {
emitter.send("a");
Thread.sleep(2000);
emitter.send("b");
Thread.sleep(2000);
emitter.send("c");
Thread.sleep(2000);
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return new ResponseEntity<ResponseBodyEmitter>(emitter, HttpStatus.OK);
}
}
Added logger into your existing filter :
@Slf4j
@AllArgsConstructor
@Component
public class ValidationFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
log.info("Inside Filter");
ContentCachingRequestWrapper requestToCache = new ContentCachingRequestWrapper(request);
ContentCachingResponseWrapper responseToUse = new ContentCachingResponseWrapper(response);
filterChain.doFilter(request, responseToUse);
if (!responseToUse.isCommitted() &&
responseToUse.getStatus() >= 200 && responseToUse.getStatus() < 300 &&
HttpMethod.GET.matches(request.getMethod())) {
Scanner scanner = new Scanner(responseToUse.getContentInputStream());
// String fooField;
// do {
// fooField = scanner.findWithinHorizon("", 0);
// // perform some checks
// } while (fooField != null);
}
if (requestToCache.isAsyncStarted()) {
requestToCache.getAsyncContext().addListener(new AsyncListener() {
public void onComplete(AsyncEvent asyncEvent) throws IOException {
responseToUse.copyBodyToResponse();
}
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
}
public void onError(AsyncEvent asyncEvent) throws IOException {
}
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
}
});
} else {
responseToUse.copyBodyToResponse();
}
log.info("End Filter");
}
@Override
protected boolean shouldNotFilterAsyncDispatch() {
return false;
}
}
Note: You also need to add @Component
in your filter which was missing.
Postman request :
Server logs in normal mode to prove that it's called multiple times :
- Debug Mode steps to prove that it got called two times:
The request came to the filter first.
The request came to the controller :
The response comes back from controller and the filter check if the isAsyncStarted() is true. In this case, it returns true which means async context started.
It comes to the same line again -> filterChain.doFilter(request, responseToUse);
Server Log at this point in debug mode :
The response comes back from controller and the filter check if the isAsyncStarted() is true. In this case, it returns false and the async context is skipped.
Server log at the end :
Postman response :
Update : As per Spring's ResponseBodyEmitter Docs, it states that :
Write the given object to the response.
If any exception occurs a dispatch is made back to the app server where Spring MVC will pass the exception through its exception handling mechanism.
Note: if the send fails with an IOException, you do not need to call completeWithError(Throwable) in order to clean up. Instead the Servlet container creates a notification that results in a dispatch where Spring MVC invokes exception resolvers and completes processing.
In simple words :
send()
method only writes to the response. It doesn't mean that it will make dispatch to filter/interceptor on send() method call. The send method only combines all the individual send data and append it to the response.
Answered By - Anish B.
Answer Checked By - Mary Flores (JavaFixing Volunteer)