Issue
Context: I want to save asynchronously data to MongoDb. When Front (eg. Angular/Mobile) call the Controller EndPoint it will call a Service method aimed to firstly save data to MongoDb and, if well-successed, post the same data a Kafka topic and call another endpoint passing forward the same data. In order to reach this behavior I want use CompletableFuture with "two steps": first save on MongoDb and after that (thenAcceptAsync) to post the data to Kafka Topic and thenAcceptAsync post to another rest service.
Issue: I am completely stuck to get the first "step" working: saving the data to MongoDb. I can runAssyncronously a simple system.out.print but never get ReactiveCrudRepository.save() persisting the data. See code below.
I am pretty the rest is working since if I try save without CompletableFuture everything goes fine with this method:
//properly working without CompletableFuture
public void SimpleSaveMehtod(Extrato e) {
extratoRepository.save(e); // .subscribe();
}
Here is the service method code which "runPrintAsync.get();" works perfectly (prints the message) but "runSaveAsync.get();" didn't save at all. I gave a try by "ForkJoinPool.commonPool().awaitQuiescence(3, TimeUnit.SECONDS)" and it didn't work also (honestly I don't think made sense await if I blocked with get btw I tried it).
@Async("threadPoolTaskExecutor")
public void transferirVoidReturned(Extrato e) {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
CompletableFuture<Void> runSaveAsync = CompletableFuture.runAsync(() -> extratoRepository.save(e),newFixedThreadPool);
CompletableFuture<Void> runPrintAsync = CompletableFuture.runAsync(() -> System.out.print("I am printed"), newFixedThreadPool);
try {
// ForkJoinPool.commonPool().awaitQuiescence(3, TimeUnit.SECONDS);
runSaveAsync.get();
runPrintAsync.get();
} catch (InterruptedException | ExecutionException e1) {
e1.printStackTrace();
}
}
In case it is relevant, here is the repository:
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.domain.Pageable;
public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
@Query("{ id: { $exists: true }}")
Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}
Solution
tl;dr: You need to subscribe to the Mono
:
public void SimpleSaveMehtod(Extrato e) {
extratoRepository.save(e).subscribe();
}
Background: one of many rationales behind using reactive types over Future
and similar is that they allow performing computations lazily. This means, that when you create a Mono
, Flux
or any other structure and add some computation, it will not be executed until it's needed.
The way it is written now, as soon as the Mono
is created, the thread is returned to the pool because there is no more work to be done. Since no subscription was made, the computation in Mono
is never executed.
Answered By - Andronicus