Issue
How can I implement an internal event bus to do async operations in a webflux spring stack?
I want a service to emit an event:
@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
}
And a different component, not known by the publisher service, should be able to decide to react on that event.
@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
// do stuff
}
}
In a MVC application I would use ApplicationEventPublisher
to publish the event (publishEvent
) and @EventListener
+@Async
on the handler (onDeleteEntry
).
What is the equivalent in a reactive stack?
The other option I think about is running an embedded messaging service, because that should imply async semantics. But this feels like a lot of overhead for a simple scenario.
I found these SO threads
- Fire and forget with reactor
- Execute Asynchronous call after return statement Reactive Core + Spring Boot
- Spring Flux and the Async annotation
but they don't answer this scenario, because they assume that the listener is known by the publisher. But I need loosely coupling.
I also found these spring issues
- https://github.com/spring-projects/spring-framework/issues/21025
- https://github.com/spring-projects/spring-framework/issues/21831
And specifically see this comment promising suggesting this:
Mono.fromRunnable(() -> context.publishEvent(...))
From what I understand I could then just use @EventListener
since I am totally fine with not propagating the reactive context.
But can please someone explain the implications for the thread-bounding and if this is even legal in a reactive stack?
UPDATE
From testing it feels fine to do this:
@Service
class FeedServiceImpl(
val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
@EventListener
@Async
override fun handle(e: FeedEntryDeletedEvent) {
log.info("Handler started")
runBlocking {
// do stuff that takes some time
delay(1000)
}
log.info("ThreadId: ${Thread.currentThread().id}")
log.info("Handler done")
}
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
applicationEventPublisher.publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
log.info("ThreadId: ${Thread.currentThread().id}")
log.info("Publisher done")
}
}
Note that handle
is not a suspend function, because @EventListener
must have a single argument and coroutines introduce the continuation parameter behind the scene. The handler then launches a new blocking coroutine scope which is fine because it is on a different thread because of the @Async
.
Output is:
2021-05-13 12:15:20.755 INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl : ThreadId: 38
2021-05-13 12:15:20.755 INFO 20252 --- [ task-1] ...FeedServiceImpl : Handler started
2021-05-13 12:15:20.755 INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher done
2021-05-13 12:15:21.758 INFO 20252 --- [ task-1] ...FeedServiceImpl : ThreadId: 54
2021-05-13 12:15:21.759 INFO 20252 --- [ task-1] ...FeedServiceImpl : Handler done
UPDATE 2
The other approach without using @Async would be this:
@EventListener
// @Async
override fun handle(e: FeedEntryDeletedEvent) {
log.info("Handler start")
log.info("Handler ThreadId: ${Thread.currentThread().id}")
runBlocking {
log.info("Handler block start")
delay(1000)
log.info("Handler block ThreadId: ${Thread.currentThread().id}")
log.info("Handler block end")
}
log.info("Handler done")
}
override suspend fun deleteEntry(entryId: Long) {
feedRepository.deleteById(entryId)
Mono.fromRunnable<Unit> {
applicationEventPublisher.publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
log.info("Publisher ThreadId: ${Thread.currentThread().id}")
log.info("Publisher done")
}
2021-05-13 13:06:54.503 INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher ThreadId: 38
2021-05-13 13:06:54.503 INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher done
2021-05-13 13:06:54.504 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler start
2021-05-13 13:06:54.504 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler ThreadId: 53
2021-05-13 13:06:54.505 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block start
2021-05-13 13:06:55.539 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block ThreadId: 53
2021-05-13 13:06:55.539 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block end
2021-05-13 13:06:55.540 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler done
However, I still don't understand the implications for the application under load and it feels wrong to mix reactive operations with handlers that do runBlocking { }
.
Solution
Reactor offers Sink. You can use it like an event bus. Have a look at the following example.
@Configuration
public class EventNotificationConfig {
@Bean
public Sinks.Many<EventNotification> eventNotifications() {
return Sinks.many().replay().latest();
}
}
You create a Bean of a Sink in a @Configuration
class. This can be used to emit new events and it can be turned into a Flux for subscribers.
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {
private final @NonNull Sinks.Many<EventNotification> eventNotifications;
/**
* Provide a flux with our notifications.
*
* @return a Flux
*/
public Flux<EventNotification> getNotifications() {
return eventNotifications.asFlux();
}
/**
* Emit a new event to the sink.
*
* @param eventId
* @param status
* @param payload
*/
public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
eventNotifications.tryEmitNext(EventNotification.builder()
.eventId(eventId)
.status(status)
.payload(payload).build());
}
}
You can keep a maximum of one Sink instance in your application. Subscribing to different kinds of events can be achieved with filters that the various subscribers can apply to the Flux.
@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {
private final @NonNull NotificationUsecase notificationUsecase;
/**
* Start listening to events as soon as class EventListener
* has been constructed.
*
* Listening will continue until the Flux emits a 'completed'
* signal.
*/
@PostConstruct
public void init() {
this.listenToPings()
.subscribe();
this.listenToDataFetched()
.subscribe();
}
public Flux<EventNotification> listenToPings() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
.doOnNext(notification -> log.info("received PING: {}", notification));
}
public Flux<EventNotification> listenToDataFetched() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
.doOnNext(notification -> log.info("received data: {}", notification));
}
}
public Flux<EventNotification> listenToDataFetchedAndWriteToDatabase() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.flatMap(notification -> reactiveMongoRepository
.saveAndReturnNewObject(notification)
.doOnNext(log.info("I just saved something and returned an instance of NewObject!"))
.zipWith(Mono.just(notification)))
.map(tuple->tuple.getT2())
.filter(notification -> notification.getStatus().equals(PlanningNotification.Status.DATA_FETCHED))
.doOnNext(notification -> log.info("received data: {} - saved ", notification));
}
Emitting new events is equally simple. Just call the emit-method:
notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);
Answered By - Erunafailaro
Answer Checked By - Candace Johnson (JavaFixing Volunteer)