Issue
I have a completable future defined below
CompletableFuture<Person> personFutures = personService.getPersons();
Now, based on a particular condition, I need to check and do the call to getPersons until the condition is matched or the number of retries (5 retries, 5seconds apart) have finished.
The condition will be
if(personFutures.get().size() != totalPersonsInOrg){
retry(personService.getPersons(), 5, 5)
} else {
return persons
}
I want to use the thenApply and thenCompose to chain these after the first completablefuture.
personFutures.thenApply(persons -> {
if(persons.size() != totalPersonsOrg){
retry(personservice,5,5)
}
})
This is what needs to be changed
private boolean allPersonsFound(String id, int retry, int c
count)
{
if (retry > maxRetries) {
return false;
}
CompletableFuture<List<Persons>> personsFuture = personaService.getPersons();
List<Persons> persons = personsFuture.get();
if (persons.size() != count) {
//add delay of 50ms
return allPersonsFound(id, retry++, count);
}
return true;
}
Solution
Assuming your PersonsService
is:
interface PersonsService {
CompletableFuture<Persons> getPersons();
}
You probably want to have a proxy implementation with additional validation and retry logic.
One possibility is to use asynchronous recursion. Something like this (I have not tried to run it!):
final class ValidatedPersonsService implements PersonsService {
private final PersonsService upstreamService;
private final Predicate<Persons> validationPredicate;
private final int numberOfAttempts;
private final long backoffDuratioMs;
private final Executor executor;
private final Executor delayedExecutor;
ValidatedPersonsService(final PersonsService upstreamService,
final Predicate<Persons> validationPredicate,
final int numberOfAttempts,
final long backoffDuratioMs,
final Executor executor) {
this.upstreamService = upstreamService;
this.validationPredicate = validationPredicate;
this.numberOfAttempts = numberOfAttempts;
this.backoffDuratioMs = backoffDuratioMs;
this.executor = executor;
this.delayedExecutor = CompletableFuture.delayedExecutor(backoffDuratioMs, TimeUnit.MILLISECONDS, executor);
}
// this one is needed to track number of passed attempts through the async recursion steps
private static final class PersonsResponse {
final Persons persons;
final int attempt;
private PersonsResponse(final Persons persons, final int attempt) {
this.persons = persons;
this.attempt = attempt;
}
}
@Override
public CompletableFuture<Persons> getPersons() {
return submitRequest(1, executor)
.thenApply(response -> response.persons);
}
private CompletableFuture<PersonsResponse> submitRequest(int currentAttempt, Executor executor) {
if (currentAttempt > numberOfAttempts) {
return CompletableFuture.failedFuture(new RuntimeException("max number of attempts exceeded"));
} else {
return upstreamService
.getPersons()
.thenApplyAsync(persons -> new PersonsResponse(persons, currentAttempt), executor) // break out into our executor, to not rely on concurrency model of the upstream service
.thenCompose(this::validateResponse);
}
}
private CompletableFuture<PersonsResponse> validateResponse(PersonsResponse response) {
if (validationPredicate.test(response.persons)) {
return CompletableFuture.completedFuture(response);
} else {
return submitRequest(response.attempt + 1, delayedExecutor);
}
}
}
Answered By - bobah
Answer Checked By - Clifford M. (JavaFixing Volunteer)