Issue
I'm trying to create a custom BodyPublisher
that would deserialize my JSON object. I could just deserialize the JSON when I'm creating the request and use the ofByteArray
method of BodyPublishers
but I would rather use a custom publisher.
public class CustomPublisher implements HttpRequest.BodyPublisher {
private byte[] bytes;
public CustomPublisher(ObjectNode jsonData) {
...
// Deserialize jsonData to bytes
...
}
@Override
public long contentLength() {
if(bytes == null) return 0;
return bytes.length
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
subscriber.onSubscribe(subscription);
}
private CustomSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super ByteBuffer> subscriber;
private boolean cancelled;
private Iterator<Byte> byterator;
private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
this.subscriber = subscriber;
this.cancelled = false;
List<Byte> bytelist = new ArrayList<>();
for(byte b : bytes) {
bytelist.add(b);
}
this.byterator = bytelist.iterator();
}
@Override
public void request(long n) {
if(cancelled) return;
if(n < 0) {
subscriber.onError(new IllegalArgumentException());
} else if(byterator.hasNext()) {
subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
} else {
subscriber.onComplete();
}
}
@Override
public void cancel() {
this.cancelled = true;
}
}
}
This implementation works, but only if subscriptions request
method gets called with 1 as a parameter. But that's what happens when I am using it with the HttpRequest.
I'm pretty sure this is not any way preferred or optimal way of creating the custom subscription but I have yet to found better way to make it work.
I would greatly appreciate if anyone can lead me to a better path.
Solution
You are right to avoid making a byte array out of it, as that would create memory issues for large objects.
I wouldn’t try to write a custom publisher. Rather, just take advantage of the factory method HttpRequest.BodyPublishers.ofInputStream.
HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.ofInputStream(() -> {
PipedInputStream in = new PipedInputStream();
ForkJoinPool.commonPool().submit(() -> {
try (PipedOutputStream out = new PipedOutputStream(in)) {
objectMapper.writeTree(
objectMapper.getFactory().createGenerator(out),
jsonData);
}
return null;
});
return in;
});
As you have noted, you can use HttpRequest.BodyPublishers.ofByteArray
. That is fine for relatively small objects, but I program for scalability out of habit. The problem with assuming code won’t need to scale is that other developers will assume it is safe to pass large objects, without realizing the impact on performance.
Writing your own body publisher will be a lot of work. Its subscribe
method is inherited from Flow.Publisher.
The documentation for the subscribe
method starts with this:
Adds the given Subscriber if possible.
Each time your subscribe
method is called, you need to add the Subscriber to some sort of colllection, you need to create an implementation of Flow.Subscription, and you need to immediately pass it to the subscriber’s onSubscribe
method. Your Subscription implementation object needs to send back one or more ByteBuffers, only when the Subscription’s request
method is called, by invoking the corresponding Subscriber’s (not just any Subscriber’s) onNext method, and once you’ve sent all of the data, you must call the same Subscriber’s onComplete()
method. On top of that, the Subscription implementation object needs to handle cancel
requests.
You can make a lot of this easier by extending SubmissionPublisher, which is a default implementation of Flow.Publisher, and then adding a contentLength()
method to it. But as the SubmissionPublisher documentation shows, you still have a fair amount of work to do, for even a minimal working implementation.
The HttpRequest.BodyPublishers.of… methods will do all of this for you. ofByteArray
is okay for small objects, but ofInputStream
will work for any object you could ever pass in.
Answered By - VGR
Answer Checked By - Dawn Plyler (JavaFixing Volunteer)