Issue
I'm coming back to Java after a few years away, and have been excited to see the introduction of non-blocking async support in the new java.net.http.HttpClient and in the href="https://aws.amazon.com/blogs/developer/aws-sdk-for-java-2-x-released/" rel="nofollow noreferrer">AWS Java SDK 2.0. I heard about the concepts of Reactive Programming years ago in conference talks, but haven't had much chance to apply those ideas in practice.
I have a problem that seems well suited to playing around with this style of programming: Basically I want to download a bunch of files (say 10,000) over HTTP and write them back out to S3.
I've used failsafe to implement retries for nonblocking async http GETs, and it's straightforward to composes those with uploads via the S3 async client (see the sketch below).
However, I'm not sure how to properly constrain the memory usage of the program: there is no mechanism to apply backpressure and prevent an out-of-memory exception if files are downloaded faster than they're written back out to S3.
I'm familiar with some traditional blocking solutions to this problem - e.g. use a semaphore to limit the number of concurrent downloads, or have downloads write out to some bounded blocking queue that S3 upload threads will pull from. However, if I'm going to use such a blocking mechanism to apply the backpressure, then it makes me question the advantage of using nonblocking IO in the first place.
Is there a more idiomatic "reactive" way to accomplish the same goal?
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class BackupClient {
private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
private final HttpClient httpClient = HttpClient.newBuilder().build();
private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();
public runBackup(List<URI> filesToBackup) {
List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
.map(backupClient::submitBackup)
.collect(Collectors.toList());
futures.forEach(CompletableFuture::join);
}
private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
.thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
.bucket("my-bucket")
.key(uri.toASCIIString())
.build(), AsyncRequestBody.fromString(httpResponse.body())));
}
private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
final HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.timeout(Duration.ofMinutes(2))
.GET()
.build();
final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
.withMaxRetries(4)
.withDelay(Duration.ofSeconds(1))
.handleResultIf(response -> 200 != response.statusCode());
return Failsafe.with(retryPolicy)
.getStageAsync(context -> {
if (context.getAttemptCount() > 0) {
LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
}
return this.httpClient.sendAsync(request, handler);
});
}
}
Solution
Since you need to control resource (memory) consumption, then Semaphore is the right tool for this goal. And as you want to use non-blocking computations, all you need is asynchronous Semaphore. Popular libraries (rxjava, reactive streams) use asynchronous Semaphore internally to construct reactive streams, but do not offer it as a separate class. When a subscriber of a reactive stream calls Flow.Subscription.request(n), it is equivalent to Semaphore.release(n). The analogue to Semaphore.acquire() is, however, hidden. It is called internally by the Publisher.
The downside of such design solution is that the resource feedback can only be established between a producer and its nearest consumer. If there is a chain of producers and consumers, then resource consumption of each link has to be controlled separately, and overall resource consumption becomes N times larger, where N is the number of links.
If you can afford this, then you can use rxjava or any other implementation of reactive streams library. If not, then you have to use the only asynchronous library which lets user to fully access an asynchronous Semaphore implementation : DF4J (yes I am the author). It does not contain direct solution to your problem, but has an example where an asynchronous network server limits the number of simultaneous connections by means of asynchronous Semaphore, see ConnectionManager.java.
Answered By - Alexei Kaigorodov
Answer Checked By - Timothy Miller (JavaFixing Admin)