Issue
I'm trying to set up a Spring SseEmitter to send a sequence of updates of the status of a running job. It seems to be working but:
Whenever I call emitter.complete()
in in my Java server code, the javascript EventSource
client calls the registered onerror
function and then calls my Java endpoint again with a new connection. This happens in both Firefox and Chrome.
I can probably send an explicit "end-of-data" message from Java and then detect that and call eventSource.close()
on the client, but is there a better way?
What is the purpose of emitter.complete()
in that case?
Also, if I always have to terminate the connection on the client end, then I guess every connection on the server side will be terminated by either a timeout or a write error, in which case I probably want to manually send back a heartbeat of some kind every few seconds?
It feels like I'm missing something if I'm having to do all this.
Solution
I have added the following to my Spring boot application to trigger the SSE connection close()
Server Side:
- Create a simple controller which returns SseEmitter.
- Wrap the backend logic in a single thread executor service.
- Send your events to the SseEmitter.
On complete send an event of type complete via the SseEmitter.
@RestController public class SearchController { @Autowired private SearchDelegate searchDelegate; @GetMapping(value = "/{customerId}/search") @ResponseStatus(HttpStatus.OK) @ApiOperation(value = "Search Sources", notes = "Search Sources") @ApiResponses(value = { @ApiResponse(code = 201, message = "OK"), @ApiResponse(code = 401, message = "Unauthorized") }) @ResponseBody public SseEmitter search(@ApiParam(name = "searchCriteria", value = "searchCriteria", required = true) @ModelAttribute @Valid final SearchCriteriaDto searchCriteriaDto) throws Exception { return searchDelegate.route(searchCriteriaDto); } } @Service public class SearchDelegate { public static final String SEARCH_EVENT_NAME = "SEARCH"; public static final String COMPLETE_EVENT_NAME = "COMPLETE"; public static final String COMPLETE_EVENT_DATA = "{\"name\": \"COMPLETED_STREAM\"}"; @Autowired private SearchService searchService; private ExecutorService executor = Executors.newCachedThreadPool(); public SseEmitter route(SearchCriteriaDto searchCriteriaDto) throws Exception { SseEmitter emitter = new SseEmitter(); executor.execute(() -> { try { if(!searchCriteriaDto.getCustomerSources().isEmpty()) { searchCriteriaDto.getCustomerSources().forEach(customerSource -> { try { SearchResponse searchResponse = searchService.search(searchCriteriaDto); emitter.send(SseEmitter.event() .id(customerSource.getSourceId()) .name(SEARCH_EVENT_NAME) .data(searchResponse)); } catch (Exception e) { log.error("Error while executing query for customer {} with source {}, Caused by {}", customerId, source.getType(), e.getMessage()); } }); }else { log.debug("No available customerSources for the specified customer"); } emitter.send(SseEmitter.event(). id(String.valueOf(System.currentTimeMillis())) .name(COMPLETE_EVENT_NAME) .data(COMPLETE_EVENT_DATA)); emitter.complete(); } catch (Exception ex) { emitter.completeWithError(ex); } }); return emitter; } }
Client Side:
- Since we specified the
name
of event on ourSseEmitter
, an event will be dispatched on the browser to the listener for the specified event name; the website source code should useaddEventListener()
to listen for named events. (Notice: Theonmessage
handler is called if no event name is specified for a message) - Call the
EventSource
on theCOMPLETE
event to release the client connection.
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
var sse = new EventSource('http://localhost:8080/federation/api/customers/5d96348feb061d13f46aa6ce/search?nativeQuery=true&queryString=*&size=10&customerSources=1,2,3&start=0');
sse.addEventListener("SEARCH", function(evt) {
var data = JSON.parse(evt.data);
console.log(data);
});
sse.addEventListener("COMPLETE", function(evt) {
console.log(evt);
sse.close();
});
Answered By - Ashraf Sarhan
Answer Checked By - Terry (JavaFixing Volunteer)