Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-d8d7a87.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "Amazon S3",
"contributor": "",
"description": "Add support for maxInFlightParts to multipart upload (PutObject) in MultipartS3AsyncClient."
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
private final AtomicReferenceArray<CompletedPart> completedParts;
private final Map<Integer, CompletedPart> existingParts;
private final PublisherListener<Long> progressListener;
private final int maxInFlightParts;
private Subscription subscription;
private volatile boolean isDone;
private volatile boolean isPaused;
Expand All @@ -81,7 +82,8 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<

KnownContentLengthAsyncRequestBodySubscriber(MpuRequestContext mpuRequestContext,
CompletableFuture<PutObjectResponse> returnFuture,
MultipartUploadHelper multipartUploadHelper) {
MultipartUploadHelper multipartUploadHelper,
int maxInFlightParts) {
this.totalSize = mpuRequestContext.contentLength();
this.partSize = mpuRequestContext.partSize();
this.expectedNumParts = mpuRequestContext.expectedNumParts();
Expand All @@ -92,6 +94,7 @@ public class KnownContentLengthAsyncRequestBodySubscriber implements Subscriber<
this.existingNumParts = NumericUtils.saturatedCast(mpuRequestContext.numPartsCompleted());
this.completedParts = new AtomicReferenceArray<>(expectedNumParts);
this.multipartUploadHelper = multipartUploadHelper;
this.maxInFlightParts = maxInFlightParts;
this.progressListener = putObjectRequest.overrideConfiguration().map(c -> c.executionAttributes()
.getAttribute(JAVA_PROGRESS_LISTENER))
.orElseGet(PublisherListener::noOp);
Expand Down Expand Up @@ -159,6 +162,7 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
asyncRequestBody.subscribe(new CancelledSubscriber<>());
asyncRequestBody.contentLength().ifPresent(progressListener::subscriberOnNext);
asyncRequestBody.close();

subscription.request(1);
return;
}
Expand Down Expand Up @@ -192,10 +196,16 @@ public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
subscription.cancel();
}
} else {
completeMultipartUploadIfFinished(asyncRequestBodyInFlight.decrementAndGet());
int inFlight = asyncRequestBodyInFlight.decrementAndGet();
if (!isDone && inFlight < maxInFlightParts) {
subscription.request(1);
}
completeMultipartUploadIfFinished(inFlight);
}
});
subscription.request(1);
if (asyncRequestBodyInFlight.get() < maxInFlightParts) {
subscription.request(1);
}
}

private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestBody, int currentPartNum) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.multipart;

import static software.amazon.awssdk.services.s3.internal.multipart.MultipartUploadHelper.contentLengthMismatchForPart;
import static software.amazon.awssdk.services.s3.internal.multipart.MultipartUploadHelper.contentLengthMissingForPart;
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;

import java.util.Collection;
import java.util.Comparator;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.CloseableAsyncRequestBody;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Pair;

@SdkInternalApi
public class UnknownContentLengthAsyncRequestBodySubscriber implements Subscriber<CloseableAsyncRequestBody> {
private static final Logger log = Logger.loggerFor(UnknownContentLengthAsyncRequestBodySubscriber.class);

/**
* Indicates whether this is the first async request body or not.
*/
private final AtomicBoolean firstAsyncRequestBodyReceived = new AtomicBoolean(false);

/**
* Indicates whether CreateMultipartUpload has been initiated or not
*/
private final AtomicBoolean createMultipartUploadInitiated = new AtomicBoolean(false);

/**
* Indicates whether CompleteMultipart has been initiated or not.
*/
private final AtomicBoolean completedMultipartInitiated = new AtomicBoolean(false);

/**
* The number of AsyncRequestBody has been received but yet to be processed
*/
private final AtomicInteger asyncRequestBodyInFlight = new AtomicInteger(0);

private final AtomicBoolean failureActionInitiated = new AtomicBoolean(false);

private final AtomicInteger partNumber = new AtomicInteger(0);
private final AtomicLong contentLength = new AtomicLong(0);

private final Queue<CompletedPart> completedParts = new ConcurrentLinkedQueue<>();
private final Collection<CompletableFuture<CompletedPart>> futures = new ConcurrentLinkedQueue<>();

private final CompletableFuture<String> uploadIdFuture = new CompletableFuture<>();

private final long partSizeInBytes;
private final PutObjectRequest putObjectRequest;
private final CompletableFuture<PutObjectResponse> returnFuture;
private final PublisherListener<Long> progressListener;
private final MultipartUploadHelper multipartUploadHelper;
private final GenericMultipartHelper<PutObjectRequest, PutObjectResponse> genericMultipartHelper;
private final int maxInFlightParts;

private Subscription subscription;
private CloseableAsyncRequestBody firstRequestBody;
private String uploadId;
private volatile boolean isDone;

UnknownContentLengthAsyncRequestBodySubscriber(
long partSizeInBytes,
PutObjectRequest putObjectRequest,
CompletableFuture<PutObjectResponse> returnFuture,
MultipartUploadHelper multipartUploadHelper,
GenericMultipartHelper<PutObjectRequest, PutObjectResponse> genericMultipartHelper,
int maxInFlightParts) {
this.partSizeInBytes = partSizeInBytes;
this.putObjectRequest = putObjectRequest;
this.returnFuture = returnFuture;
this.multipartUploadHelper = multipartUploadHelper;
this.genericMultipartHelper = genericMultipartHelper;
this.maxInFlightParts = maxInFlightParts;
this.progressListener = putObjectRequest.overrideConfiguration()
.map(c -> c.executionAttributes().getAttribute(JAVA_PROGRESS_LISTENER))
.orElseGet(PublisherListener::noOp);
}

@Override
public void onSubscribe(Subscription s) {
if (this.subscription != null) {
log.warn(() -> "The subscriber has already been subscribed. Cancelling the incoming subscription");
subscription.cancel();
return;
}
this.subscription = s;
s.request(1);
returnFuture.whenComplete((r, t) -> {
if (t != null) {
s.cancel();
MultipartUploadHelper.cancelingOtherOngoingRequests(futures, t);
}
});
}

@Override
public void onNext(CloseableAsyncRequestBody asyncRequestBody) {
if (asyncRequestBody == null) {
NullPointerException exception = new NullPointerException("asyncRequestBody passed to onNext MUST NOT be null.");
multipartUploadHelper.failRequestsElegantly(futures,
exception, uploadId, returnFuture, putObjectRequest);
throw exception;
}

if (isDone) {
return;
}

int currentPartNum = partNumber.incrementAndGet();
log.trace(() -> "Received asyncRequestBody " + asyncRequestBody.contentLength());
asyncRequestBodyInFlight.incrementAndGet();

Optional<SdkClientException> sdkClientException = validatePart(asyncRequestBody, currentPartNum);
if (sdkClientException.isPresent()) {
multipartUploadHelper.failRequestsElegantly(futures, sdkClientException.get(), uploadId, returnFuture,
putObjectRequest);
subscription.cancel();
return;
}

if (firstAsyncRequestBodyReceived.compareAndSet(false, true)) {
log.trace(() -> "Received first async request body");
// If this is the first AsyncRequestBody received, request another one because we don't know if there is more
firstRequestBody = asyncRequestBody;
subscription.request(1);
return;
}

// If there are more than 1 AsyncRequestBodies, then we know we need to upload this
// object using MPU
if (createMultipartUploadInitiated.compareAndSet(false, true)) {
log.debug(() -> "Starting the upload as multipart upload request");
CompletableFuture<CreateMultipartUploadResponse> createMultipartUploadFuture =
multipartUploadHelper.createMultipartUpload(putObjectRequest, returnFuture);

createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> {
if (throwable != null) {
genericMultipartHelper.handleException(returnFuture, () -> "Failed to initiate multipart upload",
throwable);
subscription.cancel();
} else {
uploadId = createMultipartUploadResponse.uploadId();
log.debug(() -> "Initiated a new multipart upload, uploadId: " + uploadId);

sendUploadPartRequest(uploadId, firstRequestBody, 1);
sendUploadPartRequest(uploadId, asyncRequestBody, 2);

// We need to complete the uploadIdFuture *after* the first two requests have been sent
uploadIdFuture.complete(uploadId);
}
});
CompletableFutureUtils.forwardExceptionTo(returnFuture, createMultipartUploadFuture);
} else {
uploadIdFuture.whenComplete((r, t) -> {

Check warning on line 187 in services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UnknownContentLengthAsyncRequestBodySubscriber.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove useless curly braces around statement

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ0CO9r5XYukZyFkjsiS&open=AZ0CO9r5XYukZyFkjsiS&pullRequest=6801
sendUploadPartRequest(uploadId, asyncRequestBody, currentPartNum);
});
}
}

private Optional<SdkClientException> validatePart(AsyncRequestBody asyncRequestBody, int currentPartNum) {
Optional<Long> contentLength = asyncRequestBody.contentLength();

Check warning on line 194 in services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UnknownContentLengthAsyncRequestBodySubscriber.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename "contentLength" which hides the field declared at line 74.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ0CO9r5XYukZyFkjsiT&open=AZ0CO9r5XYukZyFkjsiT&pullRequest=6801
if (!contentLength.isPresent()) {
return Optional.of(contentLengthMissingForPart(currentPartNum));
}

Long contentLengthCurrentPart = contentLength.get();
if (contentLengthCurrentPart > partSizeInBytes) {
return Optional.of(contentLengthMismatchForPart(partSizeInBytes, contentLengthCurrentPart, currentPartNum));
}
return Optional.empty();
}

private void sendUploadPartRequest(String uploadId,
CloseableAsyncRequestBody asyncRequestBody,
int currentPartNum) {
Long contentLengthCurrentPart = asyncRequestBody.contentLength().get();

Check warning on line 209 in services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UnknownContentLengthAsyncRequestBodySubscriber.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Call "Optional#isPresent()" before accessing the value.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ0CO9r5XYukZyFkjsiR&open=AZ0CO9r5XYukZyFkjsiR&pullRequest=6801
this.contentLength.getAndAdd(contentLengthCurrentPart);

multipartUploadHelper
.sendIndividualUploadPartRequest(uploadId, completedParts::add, futures,
uploadPart(asyncRequestBody, currentPartNum), progressListener)
.whenComplete((r, t) -> {
asyncRequestBody.close();
if (t != null) {
if (failureActionInitiated.compareAndSet(false, true)) {
multipartUploadHelper.failRequestsElegantly(futures, t, uploadId, returnFuture, putObjectRequest);
}
} else {
int inFlight = asyncRequestBodyInFlight.decrementAndGet();
if (!isDone && inFlight < maxInFlightParts) {
synchronized (this) {
subscription.request(1);
}
}
completeMultipartUploadIfFinish(inFlight);
}
});
if (asyncRequestBodyInFlight.get() < maxInFlightParts) {
synchronized (this) {
subscription.request(1);
}
}
}

private Pair<UploadPartRequest, AsyncRequestBody> uploadPart(AsyncRequestBody asyncRequestBody, int partNum) {
UploadPartRequest uploadRequest =
SdkPojoConversionUtils.toUploadPartRequest(putObjectRequest,
partNum,
uploadId);

return Pair.of(uploadRequest, asyncRequestBody);
}

@Override
public void onError(Throwable t) {
log.debug(() -> "Received onError() ", t);
if (failureActionInitiated.compareAndSet(false, true)) {
isDone = true;
multipartUploadHelper.failRequestsElegantly(futures, t, uploadId, returnFuture, putObjectRequest);
}
}

@Override
public void onComplete() {
log.debug(() -> "Received onComplete()");
// If CreateMultipartUpload has not been initiated at this point, we know this is a single object upload, and if no
// async request body has been received, it's an empty stream
if (createMultipartUploadInitiated.get() == false) {

Check warning on line 261 in services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UnknownContentLengthAsyncRequestBodySubscriber.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unnecessary boolean literal.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ0CO9r5XYukZyFkjsiU&open=AZ0CO9r5XYukZyFkjsiU&pullRequest=6801
log.debug(() -> "Starting the upload as a single object upload request");
AsyncRequestBody entireRequestBody = firstAsyncRequestBodyReceived.get() ? firstRequestBody :
AsyncRequestBody.empty();
multipartUploadHelper.uploadInOneChunk(putObjectRequest, entireRequestBody, returnFuture);
} else {
isDone = true;
completeMultipartUploadIfFinish(asyncRequestBodyInFlight.get());
}
}

private void completeMultipartUploadIfFinish(int requestsInFlight) {
if (isDone && requestsInFlight == 0 && completedMultipartInitiated.compareAndSet(false, true)) {
CompletedPart[] parts = completedParts.stream()
.sorted(Comparator.comparingInt(CompletedPart::partNumber))
.toArray(CompletedPart[]::new);

long totalLength = contentLength.get();
int expectedNumParts = genericMultipartHelper.determinePartCount(totalLength, partSizeInBytes);
if (parts.length != expectedNumParts) {
SdkClientException exception = SdkClientException.create(
String.format("The number of UploadParts requests is not equal to the expected number of parts. "
+ "Expected: %d, Actual: %d", expectedNumParts, parts.length));
multipartUploadHelper.failRequestsElegantly(futures, exception, uploadId, returnFuture, putObjectRequest);
return;
}

multipartUploadHelper.completeMultipartUpload(returnFuture, uploadId, parts, putObjectRequest,
totalLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@ public UploadObjectHelper(S3AsyncClient s3AsyncClient,
SdkPojoConversionUtils::toPutObjectResponse);
this.apiCallBufferSize = resolver.apiCallBufferSize();
this.multipartUploadThresholdInBytes = resolver.thresholdInBytes();
int maxInFlightParts = resolver.maxInFlightParts();
this.uploadWithKnownContentLength = new UploadWithKnownContentLengthHelper(s3AsyncClient,
partSizeInBytes,
multipartUploadThresholdInBytes,
apiCallBufferSize);
apiCallBufferSize,
maxInFlightParts);
this.uploadWithUnknownContentLength = new UploadWithUnknownContentLengthHelper(s3AsyncClient,
partSizeInBytes,
multipartUploadThresholdInBytes,
apiCallBufferSize);
apiCallBufferSize,
maxInFlightParts);
}

public CompletableFuture<PutObjectResponse> uploadObject(PutObjectRequest putObjectRequest,
Expand Down
Loading
Loading