diff --git a/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/Messages.java b/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/Messages.java
index 5d99deb953..9d684eb48d 100644
--- a/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/Messages.java
+++ b/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/Messages.java
@@ -193,6 +193,7 @@ public final class Messages {
public static final String ABORTED_OPERATIONS_TTL_IN_SECONDS = "Aborted operations TTL in seconds: {0}";
public static final String SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = "Spring scheduler task executor threads: {0}";
public static final String FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = "Files async executor max threads: {0}";
+ public static final String DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = "Deploy from URL executor max threads: {0}";
public static final String ON_START_FILES_CLEANER_WITHOUT_CONTENT_ENABLED_0 = "On start files cleaner without content enabled: {0}";
public static final String THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER_0 = "Threads for file upload to controller: {0}";
public static final String THREADS_FOR_FILE_STORAGE_UPLOAD_0 = "Threads for file storage upload: {0}";
diff --git a/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/util/ApplicationConfiguration.java b/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/util/ApplicationConfiguration.java
index 7eba04496b..4d856b479f 100644
--- a/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/util/ApplicationConfiguration.java
+++ b/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/util/ApplicationConfiguration.java
@@ -9,7 +9,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.commons.collections4.CollectionUtils;
@@ -27,7 +26,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.support.CronExpression;
-
import static java.text.MessageFormat.format;
@Named
@@ -96,6 +94,7 @@ public class ApplicationConfiguration {
static final String CFG_ABORTED_OPERATIONS_TTL_IN_MINUTES = "ABORTED_OPERATIONS_TTL_IN_SECONDS";
static final String CFG_SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = "SPRING_SCHEDULER_TASK_EXECUTOR_THREADS";
static final String CFG_FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = "FILES_ASYNC_UPLOAD_EXECUTOR_THREADS";
+ static final String CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = "DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS";
static final String CFG_ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER = "ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER";
static final String CFG_THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER = "THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER";
static final String CFG_THREADS_FOR_FILE_STORAGE_UPLOAD = "THREADS_FOR_FILE_STORAGE_UPLOAD";
@@ -153,6 +152,7 @@ public class ApplicationConfiguration {
public static final String DEFAULT_GLOBAL_AUDITOR_ORIGIN = "uaa";
public static final int DEFAULT_SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = 3;
public static final int DEFAULT_FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = 6;
+ public static final int DEFAULT_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = 26;
public static final boolean DEFAULT_ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER = false;
public static final int DEFAULT_THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER = 6;
public static final int DEFAULT_THREADS_FOR_FILE_STORAGE_UPLOAD = 7;
@@ -211,6 +211,7 @@ public class ApplicationConfiguration {
private Integer abortedOperationsTtlInSeconds;
private Integer springSchedulerTaskExecutorThreads;
private Integer filesAsyncUploadExecutorThreads;
+ private Integer deployFromUrlExecutorMaxThreads;
private Boolean isOnStartFilesWithoutContentCleanerEnabledThroughEnvironment;
private Integer threadsForFileUploadToController;
private Integer threadsForFileStorageUpload;
@@ -264,6 +265,7 @@ public void load() {
getServiceHandlingMaxParallelThreads();
getAbortedOperationsTtlInSeconds();
getFilesAsyncUploadExecutorMaxThreads();
+ getDeployFromUrlExecutorMaxThreads();
getObjectStoreRegions();
getIsReadinessHealthCheckEnabled();
}
@@ -407,6 +409,13 @@ public Integer getFilesAsyncUploadExecutorMaxThreads() {
return filesAsyncUploadExecutorThreads;
}
+ public Integer getDeployFromUrlExecutorMaxThreads() {
+ if (deployFromUrlExecutorMaxThreads == null) {
+ deployFromUrlExecutorMaxThreads = getDeployFromUrlExecutorMaxThreadsFromEnvironment();
+ }
+ return deployFromUrlExecutorMaxThreads;
+ }
+
public String getGlobalAuditorUser() {
if (globalAuditorUser == null) {
globalAuditorUser = getGlobalAuditorUserFromEnvironment();
@@ -832,6 +841,12 @@ private Integer getFilesAsyncUploadExecutorMaxThreadsFromEnvironment() {
return value;
}
+ private Integer getDeployFromUrlExecutorMaxThreadsFromEnvironment() {
+ Integer value = environment.getInteger(CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, DEFAULT_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS);
+ logEnvironmentVariable(CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, Messages.DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, value);
+ return value;
+ }
+
private String getGlobalAuditorUserFromEnvironment() {
String value = environment.getString(CFG_GLOBAL_AUDITOR_USER);
return value;
diff --git a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/dto/AsyncUploadJobDto.java b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/dto/AsyncUploadJobDto.java
index a33ce80ccd..8249c40e56 100644
--- a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/dto/AsyncUploadJobDto.java
+++ b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/dto/AsyncUploadJobDto.java
@@ -1,12 +1,10 @@
package org.cloudfoundry.multiapps.controller.persistence.dto;
import java.time.LocalDateTime;
-
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
-
import org.cloudfoundry.multiapps.controller.persistence.model.PersistenceMetadata;
@Entity
@@ -31,6 +29,8 @@ private AttributeNames() {
public static final String FILE_ID = "fileId";
public static final String ERROR = "error";
public static final String INSTANCE_INDEX = "instanceIndex";
+ public static final String BYTES_READ = "bytesRead";
+ public static final String UPDATED_AT = "updatedAt";
}
@Id
@@ -73,13 +73,19 @@ private AttributeNames() {
@Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_INSTANCE_INDEX, nullable = false)
private Integer instanceIndex;
+ @Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_BYTES_READ)
+ private Long bytesRead;
+
+ @Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_UPDATED_AT)
+ private LocalDateTime updatedAt;
+
public AsyncUploadJobDto() {
// Required by JPA
}
public AsyncUploadJobDto(String id, String mtaUser, String state, String url, LocalDateTime addedAt, LocalDateTime startedAt,
LocalDateTime finishedAt, String namespace, String spaceGuid, String mtaId, String fileId, String error,
- Integer instanceIndex) {
+ Integer instanceIndex, Long bytesRead, LocalDateTime updatedAt) {
this.id = id;
this.mtaUser = mtaUser;
this.state = state;
@@ -93,6 +99,8 @@ public AsyncUploadJobDto(String id, String mtaUser, String state, String url, Lo
this.fileId = fileId;
this.error = error;
this.instanceIndex = instanceIndex;
+ this.bytesRead = bytesRead;
+ this.updatedAt = updatedAt;
}
@Override
@@ -201,22 +209,40 @@ public void setInstanceIndex(Integer instanceIndex) {
this.instanceIndex = instanceIndex;
}
+ public Long getBytesRead() {
+ return bytesRead;
+ }
+
+ public void setBytesRead(Long bytesRead) {
+ this.bytesRead = bytesRead;
+ }
+
+ public LocalDateTime getUpdatedAt() {
+ return updatedAt;
+ }
+
+ public void setUpdatedAt(LocalDateTime updatedAt) {
+ this.updatedAt = updatedAt;
+ }
+
@Override
public String toString() {
return "AsyncUploadJobDto{" +
- "id='" + id + '\'' +
- ", mtaUser='" + mtaUser + '\'' +
- ", state='" + state + '\'' +
- ", url='" + url + '\'' +
- ", addedAt=" + addedAt +
- ", startedAt=" + startedAt +
- ", finishedAt=" + finishedAt +
- ", namespace='" + namespace + '\'' +
- ", spaceGuid='" + spaceGuid + '\'' +
- ", mtaId='" + mtaId + '\'' +
- ", fileId='" + fileId + '\'' +
- ", error='" + error + '\'' +
- ", instanceIndex=" + instanceIndex +
- '}';
+ "id='" + id + '\'' +
+ ", mtaUser='" + mtaUser + '\'' +
+ ", state='" + state + '\'' +
+ ", url='" + url + '\'' +
+ ", addedAt=" + addedAt +
+ ", startedAt=" + startedAt +
+ ", finishedAt=" + finishedAt +
+ ", namespace='" + namespace + '\'' +
+ ", spaceGuid='" + spaceGuid + '\'' +
+ ", mtaId='" + mtaId + '\'' +
+ ", fileId='" + fileId + '\'' +
+ ", error='" + error + '\'' +
+ ", instanceIndex=" + instanceIndex +
+ ", bytesRead=" + bytesRead +
+ ", updatedAt=" + updatedAt +
+ '}';
}
}
diff --git a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/AsyncUploadJobEntry.java b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/AsyncUploadJobEntry.java
index cdd69325fa..16bd46b4f9 100644
--- a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/AsyncUploadJobEntry.java
+++ b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/AsyncUploadJobEntry.java
@@ -1,7 +1,6 @@
package org.cloudfoundry.multiapps.controller.persistence.model;
import java.time.LocalDateTime;
-
import org.cloudfoundry.multiapps.common.Nullable;
import org.immutables.value.Value;
@@ -44,4 +43,10 @@ enum State {
String getMtaId();
Integer getInstanceIndex();
+
+ @Nullable
+ Long getBytesRead();
+
+ @Nullable
+ LocalDateTime getUpdatedAt();
}
diff --git a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/PersistenceMetadata.java b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/PersistenceMetadata.java
index 0a4232c509..e5c17112a1 100644
--- a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/PersistenceMetadata.java
+++ b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/PersistenceMetadata.java
@@ -102,6 +102,8 @@ private TableColumnNames() {
public static final String ASYNC_UPLOAD_JOB_FILE_ID = "file_id";
public static final String ASYNC_UPLOAD_JOB_ERROR = "error";
public static final String ASYNC_UPLOAD_JOB_INSTANCE_INDEX = "instance_index";
+ public static final String ASYNC_UPLOAD_JOB_BYTES_READ = "bytes_read";
+ public static final String ASYNC_UPLOAD_JOB_UPDATED_AT = "updated_at";
public static final String BACKUP_DESCRIPTOR_ID = "id";
public static final String BACKUP_DESCRIPTOR_DESCRIPTOR = "descriptor";
diff --git a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/AsyncUploadJobService.java b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/AsyncUploadJobService.java
index ca22f51bca..2aadf0b9ef 100644
--- a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/AsyncUploadJobService.java
+++ b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/AsyncUploadJobService.java
@@ -3,7 +3,6 @@
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.persistence.EntityManagerFactory;
-
import org.cloudfoundry.multiapps.common.ConflictException;
import org.cloudfoundry.multiapps.common.NotFoundException;
import org.cloudfoundry.multiapps.controller.persistence.Messages;
@@ -58,6 +57,8 @@ public AsyncUploadJobEntry fromDto(AsyncUploadJobDto dto) {
.fileId(dto.getFileId())
.error(dto.getError())
.instanceIndex(dto.getInstanceIndex())
+ .bytesRead(dto.getBytesRead())
+ .updatedAt(dto.getUpdatedAt())
.build();
}
@@ -76,7 +77,9 @@ public AsyncUploadJobDto toDto(AsyncUploadJobEntry entry) {
entry.getMtaId(),
entry.getFileId(),
entry.getError(),
- entry.getInstanceIndex());
+ entry.getInstanceIndex(),
+ entry.getBytesRead(),
+ entry.getUpdatedAt());
}
}
diff --git a/multiapps-controller-persistence/src/main/resources/META-INF/persistence.xml b/multiapps-controller-persistence/src/main/resources/META-INF/persistence.xml
index 363dce3249..684a213264 100644
--- a/multiapps-controller-persistence/src/main/resources/META-INF/persistence.xml
+++ b/multiapps-controller-persistence/src/main/resources/META-INF/persistence.xml
@@ -19,6 +19,7 @@
+
diff --git a/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-2.35.0-persistence.xml b/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-2.35.0-persistence.xml
new file mode 100644
index 0000000000..d60f048aca
--- /dev/null
+++ b/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-2.35.0-persistence.xml
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog.xml b/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog.xml
index 08541fc2a2..a5da6591b7 100644
--- a/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog.xml
+++ b/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog.xml
@@ -37,4 +37,7 @@
+
+
+
diff --git a/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/configuration/FileUploadThreadPoolConfiguration.java b/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/configuration/FileUploadThreadPoolConfiguration.java
index 380f72b4e7..a66a473101 100644
--- a/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/configuration/FileUploadThreadPoolConfiguration.java
+++ b/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/configuration/FileUploadThreadPoolConfiguration.java
@@ -8,7 +8,7 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
+import jakarta.inject.Inject;
import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration;
import org.cloudfoundry.multiapps.controller.process.util.PriorityCallable;
import org.cloudfoundry.multiapps.controller.process.util.PriorityFuture;
@@ -16,8 +16,6 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import jakarta.inject.Inject;
-
@Configuration
public class FileUploadThreadPoolConfiguration {
@@ -75,4 +73,17 @@ public ExecutorService asyncFileUploadExecutor(LinkedBlockingQueue fil
fileUploadFromUrlQueue);
}
+ @Bean(name = "deployFromUrlExecutor")
+ public ExecutorService deployFromUrlExecutor() {
+ return new ThreadPoolExecutor(5,
+ // The max thread count should match the maximum capacity of asyncFileUploadExecutor (queue size + max threads).
+ // A lower value may cause unnecessary task rejections.
+ // A higher value may cause job failures when asyncFileUploadExecutor becomes full.
+ applicationConfiguration.getDeployFromUrlExecutorMaxThreads(),
+ // As the threads are only updating a row and waiting it is ok to have more threads
+ 30,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>()); // A synchronous queue is used so deploy from url jobs immediately start
+ // a new thread that updates the database job entry
+ }
}
diff --git a/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/stream/CountingInputStream.java b/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/stream/CountingInputStream.java
new file mode 100644
index 0000000000..10fd1a9095
--- /dev/null
+++ b/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/stream/CountingInputStream.java
@@ -0,0 +1,23 @@
+package org.cloudfoundry.multiapps.controller.process.stream;
+
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.input.ProxyInputStream;
+
+public class CountingInputStream extends ProxyInputStream {
+
+ private final AtomicLong bytes;
+
+ public CountingInputStream(InputStream proxy, AtomicLong counterRef) {
+ super(proxy);
+ bytes = counterRef;
+ }
+
+ @Override
+ protected void afterRead(int n) {
+ if (n > 0) {
+ bytes.addAndGet(n);
+ }
+ }
+
+}
diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java
index 748a134eaa..4dbc9507c9 100644
--- a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java
+++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java
@@ -24,6 +24,7 @@ public final class Messages {
public static final String NO_VALID_OBJECT_STORE_CONFIGURATION_FOUND = "No valid Object Store configuration found!";
public static final String MISSING_PROPERTIES_FOR_CREATING_THE_SPECIFIC_PROVIDER = "Missing properties for creating the specific provider!";
public static final String DEPLOY_FROM_URL_WRONG_CREDENTIALS = "Credentials to {0} are wrong. Make sure that they are correct.";
+ public static final String JOB_NOT_UPDATED_FOR_0_SECONDS = "Job not updated for {0} seconds";
public static final String FAILED_TO_CREATE_BLOB_STORE_CONTEXT = "Failed to create BlobStoreContext";
@@ -54,13 +55,8 @@ public final class Messages {
public static final String CLEARING_FLOWABLE_LOCK_OWNER_THREW_AN_EXCEPTION_0 = "Clearing Flowable lock owner on JVM shutdown threw an exception: {0}";
public static final String FETCHING_FILE_FAILED = "Fetching file {0} in space {1} failed with: {2}";
public static final String ASYNC_UPLOAD_JOB_FAILED = "Async upload job {0} failed with: {1}";
- public static final String JOB_0_WAS_NOT_FOUND_IN_THE_RUNNING_TASKS = "Job \"{0}\" was not found in the running tasks";
- public static final String JOB_IS_NOT_BEING_EXECUTED = "Job is not being executed";
- public static final String JOB_0_EXISTS_IN_STATE_1_BUT_DOES_NOT_EXISTS_IN_THE_RUNNING_TASKS = "Job \"{0}\" exists in state \"{1}\" but does not exists in the running tasks";
- public static final String JOB_THREAD_IS_NOT_RUNNING_BUT_STATE_IS_STILL_IN_PROGRESS_UPLOAD_FAILED = "Job thread is not running but state is still in progress. Upload failed";
// WARN log messages
- public static final String THE_JOB_EXISTS_BUT_IT_IS_NOT_RUNNING_DELETING = "The job exists but it is not running. Deleting";
// INFO log messages
public static final String ALM_SERVICE_ENV_INITIALIZED = "Deploy service environment initialized";
@@ -72,6 +68,7 @@ public final class Messages {
public static final String CLEARING_LOCK_OWNER = "Clearing lock owner {0}...";
public static final String CLEARED_LOCK_OWNER = "Cleared lock owner {0}";
public static final String OBJECT_STORE_WITH_PROVIDER_0_CREATED = "Object store with provider: {0} created";
+ public static final String JOB_WITH_ID_WAS_NOT_UPDATED_WITHIN_SECONDS = "Job with ID: {} was not updated within: {} seconds";
// DEBUG log messages
public static final String RECEIVED_UPLOAD_REQUEST = "Received upload request on URI: {}";
@@ -79,7 +76,7 @@ public final class Messages {
public static final String UPLOADED_FILE = "Uploaded file \"{}\" with name {}, size {}, space {}, and digest {} (algorithm {}) for {} ms.";
public static final String ASYNC_UPLOAD_JOB_EXISTS = "Async upload job for URL {} exists: {}";
public static final String CREATING_ASYNC_UPLOAD_JOB = "Creating async upload job for URL {} with ID: {}";
- public static final String ASYNC_UPLOAD_JOB_REJECTED = "Async upload job {} rejected. Deleting entry";
+ public static final String ASYNC_UPLOAD_JOB_REJECTED = "Async upload job with space guid: {}, namespace: {}, URL: {} rejected.";
public static final String STARTING_DOWNLOAD_OF_MTAR = "Starting download of MTAR from remote endpoint: {}";
public static final String UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID = "Uploaded MTAR from remote endpoint {} with job id: {} in {} ms";
public static final String ASYNC_UPLOAD_JOB_FINISHED = "Async upload job {} finished";
diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java
index ef93af4f52..42a7eeb070 100644
--- a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java
+++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java
@@ -1,33 +1,17 @@
package org.cloudfoundry.multiapps.controller.web.api.impl;
import java.io.BufferedInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
-import java.net.URI;
-import java.net.URLDecoder;
-import java.net.http.HttpClient;
-import java.net.http.HttpClient.Redirect;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.net.http.HttpResponse.BodyHandlers;
-import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
-import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
-import java.util.Arrays;
import java.util.Base64;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.input.ProxyInputStream;
+import jakarta.inject.Inject;
+import jakarta.inject.Named;
import org.cloudfoundry.multiapps.common.SLException;
import org.cloudfoundry.multiapps.controller.api.FilesApiService;
import org.cloudfoundry.multiapps.controller.api.model.AsyncUploadResult;
@@ -36,18 +20,13 @@
import org.cloudfoundry.multiapps.controller.api.model.ImmutableAsyncUploadResult;
import org.cloudfoundry.multiapps.controller.api.model.ImmutableFileMetadata;
import org.cloudfoundry.multiapps.controller.api.model.UserCredentials;
-import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier;
-import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor;
import org.cloudfoundry.multiapps.controller.core.auditlogging.FilesApiServiceAuditLog;
import org.cloudfoundry.multiapps.controller.core.helpers.DescriptorParserFacadeFactory;
-import org.cloudfoundry.multiapps.controller.core.model.CachedMap;
import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration;
-import org.cloudfoundry.multiapps.controller.core.util.FileUtils;
import org.cloudfoundry.multiapps.controller.core.util.UriUtil;
import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry;
import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry.State;
import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry;
-import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableAsyncUploadJobEntry;
import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry;
import org.cloudfoundry.multiapps.controller.persistence.services.AsyncUploadJobService;
import org.cloudfoundry.multiapps.controller.persistence.services.FileService;
@@ -56,11 +35,10 @@
import org.cloudfoundry.multiapps.controller.process.util.PriorityFuture;
import org.cloudfoundry.multiapps.controller.web.Constants;
import org.cloudfoundry.multiapps.controller.web.Messages;
+import org.cloudfoundry.multiapps.controller.web.upload.AsyncUploadJobOrchestrator;
+import org.cloudfoundry.multiapps.controller.web.upload.exception.RejectedAsyncUploadJobException;
import org.cloudfoundry.multiapps.controller.web.util.SecurityContextUtil;
import org.cloudfoundry.multiapps.controller.web.util.ServletUtil;
-import org.cloudfoundry.multiapps.mta.handlers.ArchiveHandler;
-import org.cloudfoundry.multiapps.mta.handlers.DescriptorParserFacade;
-import org.cloudfoundry.multiapps.mta.model.DeploymentDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
@@ -69,25 +47,18 @@
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
-import jakarta.inject.Inject;
-import jakarta.inject.Named;
-
@Named
public class FilesApiServiceImpl implements FilesApiService {
private static final Logger LOGGER = LoggerFactory.getLogger(FilesApiServiceImpl.class);
- private static final int ERROR_RESPONSE_BODY_MAX_LENGTH = 4 * 1024;
- private static final int INPUT_STREAM_BUFFER_SIZE = 16 * 1024;
- private static final Duration HTTP_CONNECT_TIMEOUT = Duration.ofMinutes(10);
private static final String RETRY_AFTER_SECONDS = "30";
- private static final String USERNAME_PASSWORD_URL_FORMAT = "{0}:{1}";
+ private static final int INPUT_STREAM_BUFFER_SIZE = 16 * 1024;
+ private static final int UPDATE_JOB_TIMEOUT = 30;
+
static {
System.setProperty(Constants.RETRY_LIMIT_PROPERTY, "0");
}
- private final CachedMap jobCounters = new CachedMap<>(Duration.ofHours(1));
- private final CachedMap> runningTasks = new CachedMap<>(Duration.ofHours(1));
- private final ResilientOperationExecutor resilientOperationExecutor = getResilientOperationExecutor();
@Inject
@Named("fileService")
private FileService fileService;
@@ -98,11 +69,10 @@ public class FilesApiServiceImpl implements FilesApiService {
@Inject
private AsyncUploadJobService uploadJobService;
@Inject
- @Named("asyncFileUploadExecutor")
- private ExecutorService deployFromUrlExecutor;
- @Inject
private FilesApiServiceAuditLog filesApiServiceAuditLog;
@Inject
+ private AsyncUploadJobOrchestrator asyncUploadJobOrchestrator;
+ @Inject
private ExecutorService fileStorageThreadPool;
@Override
@@ -148,19 +118,23 @@ public ResponseEntity startUploadFromUrl(String spaceGuid, String namespac
LOGGER.trace(Messages.RECEIVED_UPLOAD_FROM_URL_REQUEST, urlWithoutUserInfo);
filesApiServiceAuditLog.logStartUploadFromUrl(SecurityContextUtil.getUsername(), spaceGuid, decodedUrl);
var existingJob = getExistingJob(spaceGuid, namespace, urlWithoutUserInfo);
- if (existingJob != null) {
- if (runningTasks.get(existingJob.getId()) != null) {
- LOGGER.info(Messages.ASYNC_UPLOAD_JOB_EXISTS, urlWithoutUserInfo, existingJob);
- return ResponseEntity.status(HttpStatus.SEE_OTHER)
- .header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, existingJob.getId()))
- .header("x-cf-app-instance", configuration.getApplicationGuid() + ":" + existingJob.getInstanceIndex())
- .build();
- } else {
- LOGGER.warn(Messages.THE_JOB_EXISTS_BUT_IT_IS_NOT_RUNNING_DELETING);
- deleteAsyncJobEntry(existingJob);
- }
+ if (existingJob == null) {
+ return triggerUploadFromUrl(spaceGuid, namespace, urlWithoutUserInfo, decodedUrl, fileUrl.getUserCredentials());
+ }
+ if (hasJobStuck(existingJob)) {
+ deleteAsyncJobEntry(existingJob);
+ return triggerUploadFromUrl(spaceGuid, namespace, urlWithoutUserInfo, decodedUrl, fileUrl.getUserCredentials());
}
- return triggerUploadFromUrl(spaceGuid, namespace, urlWithoutUserInfo, decodedUrl, fileUrl.getUserCredentials());
+ LOGGER.info(Messages.ASYNC_UPLOAD_JOB_EXISTS, urlWithoutUserInfo, existingJob);
+ return ResponseEntity.status(HttpStatus.SEE_OTHER)
+ .header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, existingJob.getId()))
+ .build();
+ }
+
+ private boolean hasJobStuck(AsyncUploadJobEntry existingJob) {
+ return existingJob.getUpdatedAt()
+ .isBefore(LocalDateTime.now()
+ .minusSeconds(UPDATE_JOB_TIMEOUT));
}
private String getLocationHeader(String spaceGuid, String jobId) {
@@ -175,51 +149,37 @@ public ResponseEntity getUploadFromUrlJob(String spaceGuid, S
return ResponseEntity.notFound()
.build();
}
- return getAsyncUploadResult(spaceGuid, namespace, job);
+ return getAsyncUploadResult(job);
}
- private ResponseEntity getAsyncUploadResult(String spaceGuid, String namespace, AsyncUploadJobEntry job) {
+ private ResponseEntity getAsyncUploadResult(AsyncUploadJobEntry job) {
if (job.getState() == State.RUNNING || job.getState() == State.INITIAL) {
- Future> futureTask = runningTasks.get(job.getId());
- if (futureTask == null) {
- LOGGER.error(MessageFormat.format(Messages.JOB_0_WAS_NOT_FOUND_IN_THE_RUNNING_TASKS, job.getId()));
- return ResponseEntity.ok(createErrorResult(Messages.JOB_IS_NOT_BEING_EXECUTED,
- AsyncUploadResult.ClientAction.RETRY_UPLOAD));
- }
- if (!futureTask.isDone()) {
- var count = jobCounters.getOrDefault(job.getId(), new AtomicLong(-1));
- return ResponseEntity.ok(ImmutableAsyncUploadResult.builder()
- .status(AsyncUploadResult.JobStatus.RUNNING)
- .bytes(count.get())
- .build());
- }
- var jobWithLatestState = getJob(job.getId(), spaceGuid, namespace);
- if (jobWithLatestState.getState() == State.RUNNING || jobWithLatestState.getState() == State.INITIAL) {
- LOGGER.error(MessageFormat.format(Messages.JOB_0_EXISTS_IN_STATE_1_BUT_DOES_NOT_EXISTS_IN_THE_RUNNING_TASKS, job.getId(),
- jobWithLatestState.getState()));
- return ResponseEntity.ok(createErrorResult(Messages.JOB_THREAD_IS_NOT_RUNNING_BUT_STATE_IS_STILL_IN_PROGRESS_UPLOAD_FAILED,
- AsyncUploadResult.ClientAction.RETRY_UPLOAD));
+ if (hasJobStuck(job)) {
+ LOGGER.info(Messages.JOB_WITH_ID_WAS_NOT_UPDATED_WITHIN_SECONDS, job.getId(), UPDATE_JOB_TIMEOUT);
+ return ResponseEntity.ok(
+ createErrorResult(MessageFormat.format(Messages.JOB_NOT_UPDATED_FOR_0_SECONDS, UPDATE_JOB_TIMEOUT),
+ AsyncUploadResult.ClientAction.RETRY_UPLOAD));
}
+ return ResponseEntity.ok(ImmutableAsyncUploadResult.builder()
+ .status(AsyncUploadResult.JobStatus.RUNNING)
+ .bytes(job.getBytesRead())
+ .build());
}
if (job.getState() == State.ERROR) {
- jobCounters.remove(job.getId());
- runningTasks.remove(job.getId());
return ResponseEntity.ok(createErrorResult(job.getError()));
}
- return addFileEntryToAsyncUploadResult(spaceGuid, job);
+ return addFileEntryToAsyncUploadResult(job);
}
- private ResponseEntity addFileEntryToAsyncUploadResult(String spaceGuid, AsyncUploadJobEntry job) {
+ private ResponseEntity addFileEntryToAsyncUploadResult(AsyncUploadJobEntry job) {
FileEntry fileEntry;
try {
- fileEntry = fileService.getFile(spaceGuid, job.getFileId());
+ fileEntry = fileService.getFile(job.getSpaceGuid(), job.getFileId());
} catch (FileStorageException e) {
- LOGGER.error(MessageFormat.format(Messages.FETCHING_FILE_FAILED, job.getFileId(), spaceGuid, e.getMessage()), e);
+ LOGGER.error(MessageFormat.format(Messages.FETCHING_FILE_FAILED, job.getFileId(), job.getSpaceGuid(), e.getMessage()), e);
return ResponseEntity.ok(createErrorResult(e.getMessage()));
}
FileMetadata file = parseFileEntry(fileEntry);
- jobCounters.remove(job.getId());
- runningTasks.remove(job.getId());
return ResponseEntity.status(HttpStatus.CREATED)
.body(ImmutableAsyncUploadResult.builder()
.status(AsyncUploadResult.JobStatus.FINISHED)
@@ -254,10 +214,6 @@ private FileEntry doUploadFile(String spaceGuid, String namespace, MultipartFile
in);
}
- protected ResilientOperationExecutor getResilientOperationExecutor() {
- return new ResilientOperationExecutor();
- }
-
private FileMetadata parseFileEntry(FileEntry fileEntry) {
return ImmutableFileMetadata.builder()
.id(fileEntry.getId())
@@ -276,7 +232,6 @@ private AsyncUploadJobEntry getExistingJob(String spaceGuid, String namespace, S
.user(SecurityContextUtil.getUsername())
.namespace(namespace)
.url(url)
- .instanceIndex(configuration.getApplicationInstanceIndex())
.withoutFinishedAt()
.withStateAnyOf(State.INITIAL, State.RUNNING)
.list();
@@ -295,39 +250,21 @@ private void deleteAsyncJobEntry(AsyncUploadJobEntry entry) {
private ResponseEntity triggerUploadFromUrl(String spaceGuid, String namespace, String urlWithoutUserInfo, String decodedUrl,
UserCredentials userCredentials) {
- var entry = createJobEntry(spaceGuid, namespace, urlWithoutUserInfo);
- LOGGER.debug(Messages.CREATING_ASYNC_UPLOAD_JOB, urlWithoutUserInfo, entry.getId());
- uploadJobService.add(entry);
try {
- Future> runningTask = deployFromUrlExecutor.submit(() -> uploadFileFromUrl(entry, spaceGuid, namespace, decodedUrl,
- userCredentials));
- runningTasks.put(entry.getId(), runningTask);
- } catch (RejectedExecutionException ignored) {
- LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_REJECTED, entry.getId());
- deleteAsyncJobEntry(entry);
+ AsyncUploadJobEntry entry = asyncUploadJobOrchestrator.executeUploadFromUrl(spaceGuid, namespace, urlWithoutUserInfo,
+ decodedUrl, userCredentials);
+ return ResponseEntity.accepted()
+ .header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, entry.getId()))
+ .build();
+ } catch (RejectedAsyncUploadJobException rejectedJobException) {
+ LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_REJECTED, spaceGuid, namespace, urlWithoutUserInfo);
+ if (rejectedJobException.getAsyncUploadJobEntry() != null) {
+ deleteAsyncJobEntry(rejectedJobException.getAsyncUploadJobEntry());
+ }
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.header(HttpHeaders.RETRY_AFTER, RETRY_AFTER_SECONDS)
.build();
}
- return ResponseEntity.accepted()
- .header("x-cf-app-instance",
- configuration.getApplicationGuid() + ":" + configuration.getApplicationInstanceIndex())
- .header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, entry.getId()))
- .build();
- }
-
- private AsyncUploadJobEntry createJobEntry(String spaceGuid, String namespace, String url) {
- return ImmutableAsyncUploadJobEntry.builder()
- .id(UUID.randomUUID()
- .toString())
- .user(SecurityContextUtil.getUsername())
- .addedAt(LocalDateTime.now())
- .spaceGuid(spaceGuid)
- .namespace(namespace)
- .instanceIndex(configuration.getApplicationInstanceIndex())
- .url(url)
- .state(State.INITIAL)
- .build();
}
private AsyncUploadJobEntry getJob(String id, String spaceGuid, String namespace) {
@@ -338,7 +275,6 @@ private AsyncUploadJobEntry getJob(String id, String spaceGuid, String namespace
.spaceGuid(spaceGuid)
.user(SecurityContextUtil.getUsername())
.namespace(namespace)
- .instanceIndex(configuration.getApplicationInstanceIndex())
.list();
return jobs.isEmpty() ? null : jobs.get(0);
}
@@ -351,182 +287,4 @@ private AsyncUploadResult createErrorResult(String error, AsyncUploadResult.Clie
.build();
}
- private void uploadFileFromUrl(AsyncUploadJobEntry jobEntry, String spaceGuid, String namespace, String fileUrl,
- UserCredentials userCredentials) {
- var counter = new AtomicLong(0);
- jobCounters.put(jobEntry.getId(), counter);
- LOGGER.info(Messages.STARTING_DOWNLOAD_OF_MTAR, jobEntry.getUrl());
- var startTime = LocalDateTime.now();
- AsyncUploadJobEntry jobEntryWithStartTime = ImmutableAsyncUploadJobEntry.copyOf(jobEntry)
- .withState(State.RUNNING)
- .withStartedAt(startTime);
- try {
- jobEntryWithStartTime = uploadJobService.update(jobEntry, jobEntryWithStartTime);
- FileEntry fileEntry = resilientOperationExecutor.execute((CheckedSupplier) () -> doUploadFileFromUrl(spaceGuid,
- namespace,
- fileUrl,
- counter,
- userCredentials));
- LOGGER.trace(Messages.UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID, jobEntry.getUrl(), jobEntry.getId(),
- ChronoUnit.MILLIS.between(startTime, LocalDateTime.now()));
- var descriptor = fileService.processFileContent(spaceGuid, fileEntry.getId(), this::extractDeploymentDescriptor);
- LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_FINISHED, jobEntry.getId());
- uploadJobService.update(jobEntryWithStartTime, ImmutableAsyncUploadJobEntry.copyOf(jobEntryWithStartTime)
- .withFileId(fileEntry.getId())
- .withMtaId(descriptor.getId())
- .withFinishedAt(LocalDateTime.now())
- .withState(State.FINISHED));
- } catch (Exception e) {
- LOGGER.error(MessageFormat.format(Messages.ASYNC_UPLOAD_JOB_FAILED, jobEntry.getId(), e.getMessage()), e);
- uploadJobService.update(jobEntryWithStartTime, ImmutableAsyncUploadJobEntry.copyOf(jobEntryWithStartTime)
- .withError(e.getMessage())
- .withState(State.ERROR));
- }
- }
-
- private FileEntry doUploadFileFromUrl(String spaceGuid, String namespace, String fileUrl, AtomicLong counter,
- UserCredentials userCredentials)
- throws Exception {
- if (!UriUtil.isUrlSecure(fileUrl)) {
- throw new SLException(Messages.MTAR_ENDPOINT_NOT_SECURE);
- }
- UriUtil.validateUrl(fileUrl);
- HttpClient client = buildHttpClient(fileUrl);
-
- HttpResponse response = callRemoteEndpointWithRetry(client, fileUrl, userCredentials);
- long fileSize = response.headers()
- .firstValueAsLong(Constants.CONTENT_LENGTH)
- .orElseThrow(() -> new SLException(Messages.FILE_URL_RESPONSE_DID_NOT_RETURN_CONTENT_LENGTH));
-
- long maxUploadSize = configuration.getMaxUploadSize();
- if (fileSize > maxUploadSize) {
- throw new SLException(MessageFormat.format(Messages.MAX_UPLOAD_SIZE_EXCEEDED, maxUploadSize));
- }
-
- String fileName = extractFileName(fileUrl);
- FileUtils.validateFileHasExtension(fileName);
- resetCounterOnRetry(counter);
- // Normal stream returned from the http response always returns 0 when InputStream::available() is executed which seems to break
- // JClods library: https://issues.apache.org/jira/browse/JCLOUDS-1623
- try (CountingInputStream source = new CountingInputStream(response.body(), counter);
- BufferedInputStream bufferedContent = new BufferedInputStream(source, INPUT_STREAM_BUFFER_SIZE)) {
- LOGGER.debug(Messages.UPLOADING_MTAR_STREAM_FROM_REMOTE_ENDPOINT, response.uri());
- return fileService.addFile(ImmutableFileEntry.builder()
- .space(spaceGuid)
- .namespace(namespace)
- .name(fileName)
- .size(BigInteger.valueOf(fileSize))
- .build(),
- bufferedContent);
- }
- }
-
- public HttpResponse callRemoteEndpointWithRetry(HttpClient client, String decodedUrl, UserCredentials userCredentials)
- throws Exception {
- return resilientOperationExecutor.execute((CheckedSupplier>) () -> {
- var request = buildFetchFileRequest(decodedUrl, userCredentials);
- LOGGER.debug(Messages.CALLING_REMOTE_MTAR_ENDPOINT, getMaskedUri(urlDecodeUrl(decodedUrl)));
- var response = client.send(request, BodyHandlers.ofInputStream());
- if (response.statusCode() / 100 != 2) {
- String error = readErrorBodyFromResponse(response);
- LOGGER.error(error);
- if (response.statusCode() == HttpStatus.UNAUTHORIZED.value()) {
- String errorMessage = MessageFormat.format(Messages.DEPLOY_FROM_URL_WRONG_CREDENTIALS,
- UriUtil.stripUserInfo(decodedUrl));
- throw new SLException(errorMessage);
- }
- throw new SLException(MessageFormat.format(Messages.ERROR_FROM_REMOTE_MTAR_ENDPOINT, getMaskedUri(urlDecodeUrl(decodedUrl)),
- response.statusCode(), error));
- }
- return response;
- });
- }
-
- private String getMaskedUri(String url) {
- if (url.contains("@")) {
- return url.substring(url.lastIndexOf("@"))
- .replace("@", "...");
- } else {
- return url;
- }
- }
-
- private String urlDecodeUrl(String url) {
- return URLDecoder.decode(url, StandardCharsets.UTF_8);
- }
-
- private void resetCounterOnRetry(AtomicLong counter) {
- counter.set(0);
- }
-
- protected HttpClient buildHttpClient(String decodedUrl) {
- return HttpClient.newBuilder()
- .version(HttpClient.Version.HTTP_2)
- .connectTimeout(HTTP_CONNECT_TIMEOUT)
- .followRedirects(Redirect.NORMAL)
- .build();
- }
-
- private HttpRequest buildFetchFileRequest(String decodedUrl, UserCredentials userCredentials) {
- var builder = HttpRequest.newBuilder()
- .GET()
- .timeout(Duration.ofMinutes(15));
- var uri = URI.create(decodedUrl);
- var userInfo = uri.getUserInfo();
- if (userCredentials != null) {
- builder.uri(uri);
- String userCredentialsUrlFormat = MessageFormat.format(USERNAME_PASSWORD_URL_FORMAT, userCredentials.getUsername(),
- userCredentials.getPassword());
- String encodedAuth = Base64.getEncoder()
- .encodeToString(userCredentialsUrlFormat.getBytes());
- builder.header(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth);
- } else if (userInfo != null) {
- builder.uri(URI.create(decodedUrl.replace(uri.getRawUserInfo() + "@", "")));
- String encodedAuth = Base64.getEncoder()
- .encodeToString(userInfo.getBytes());
- builder.header(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth);
- } else {
- builder.uri(uri);
- }
- return builder.build();
- }
-
- private String readErrorBodyFromResponse(HttpResponse response) throws IOException {
- try (InputStream is = response.body()) {
- byte[] buffer = new byte[ERROR_RESPONSE_BODY_MAX_LENGTH];
- int read = IOUtils.read(is, buffer);
- return new String(Arrays.copyOf(buffer, read));
- }
- }
-
- private String extractFileName(String url) {
- String path = URI.create(url)
- .getPath();
- if (path.indexOf('/') == -1) {
- return path;
- }
- String[] pathFragments = path.split("/");
- return pathFragments[pathFragments.length - 1];
- }
-
- private DeploymentDescriptor extractDeploymentDescriptor(InputStream appArchiveStream) {
- String descriptorString = ArchiveHandler.getDescriptor(appArchiveStream, configuration.getMaxMtaDescriptorSize());
- DescriptorParserFacade descriptorParserFacade = descriptorParserFactory.getInstance();
- return descriptorParserFacade.parseDeploymentDescriptor(descriptorString);
- }
-
- private static class CountingInputStream extends ProxyInputStream {
- private final AtomicLong bytes;
-
- public CountingInputStream(InputStream proxy, AtomicLong counterRef) {
- super(proxy);
- bytes = counterRef;
- }
-
- @Override
- protected void afterRead(int n) {
- bytes.addAndGet(n);
- }
- }
-
}
diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestrator.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestrator.java
new file mode 100644
index 0000000000..22dd65834f
--- /dev/null
+++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestrator.java
@@ -0,0 +1,275 @@
+package org.cloudfoundry.multiapps.controller.web.upload;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.net.URI;
+import java.text.MessageFormat;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import jakarta.inject.Inject;
+import jakarta.inject.Named;
+import org.cloudfoundry.multiapps.common.SLException;
+import org.cloudfoundry.multiapps.common.util.MiscUtil;
+import org.cloudfoundry.multiapps.controller.api.model.UserCredentials;
+import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier;
+import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor;
+import org.cloudfoundry.multiapps.controller.core.helpers.DescriptorParserFacadeFactory;
+import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration;
+import org.cloudfoundry.multiapps.controller.core.util.FileUtils;
+import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry;
+import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry;
+import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableAsyncUploadJobEntry;
+import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry;
+import org.cloudfoundry.multiapps.controller.persistence.services.AsyncUploadJobService;
+import org.cloudfoundry.multiapps.controller.persistence.services.FileService;
+import org.cloudfoundry.multiapps.controller.process.stream.CountingInputStream;
+import org.cloudfoundry.multiapps.controller.web.Messages;
+import org.cloudfoundry.multiapps.controller.web.upload.client.DeployFromUrlRemoteClient;
+import org.cloudfoundry.multiapps.controller.web.upload.client.FileFromUrlData;
+import org.cloudfoundry.multiapps.controller.web.upload.exception.RejectedAsyncUploadJobException;
+import org.cloudfoundry.multiapps.controller.web.util.SecurityContextUtil;
+import org.cloudfoundry.multiapps.mta.handlers.ArchiveHandler;
+import org.cloudfoundry.multiapps.mta.handlers.DescriptorParserFacade;
+import org.cloudfoundry.multiapps.mta.model.DeploymentDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Named
+public class AsyncUploadJobOrchestrator {
+
+ private static final int INPUT_STREAM_BUFFER_SIZE = 16 * 1024;
+
+ private static final long WAIT_TIME_BETWEEN_ASYNC_JOB_UPDATES_IN_MILLIS = Duration.ofSeconds(3)
+ .toMillis();
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUploadJobOrchestrator.class);
+
+ private final ResilientOperationExecutor resilientOperationExecutor = getResilientOperationExecutor();
+
+ private final ExecutorService asyncFileUploadExecutor;
+ private final ExecutorService deployFromUrlExecutor;
+ private final ApplicationConfiguration applicationConfiguration;
+ private final AsyncUploadJobService asyncUploadJobService;
+ private final FileService fileService;
+ private final DescriptorParserFacadeFactory descriptorParserFactory;
+ private final DeployFromUrlRemoteClient deployFromUrlRemoteClient;
+
+ @Inject
+ public AsyncUploadJobOrchestrator(ExecutorService asyncFileUploadExecutor, ExecutorService deployFromUrlExecutor,
+ ApplicationConfiguration applicationConfiguration, AsyncUploadJobService asyncUploadJobService,
+ FileService fileService, DescriptorParserFacadeFactory descriptorParserFactory,
+ DeployFromUrlRemoteClient deployFromUrlRemoteClient) {
+ this.asyncFileUploadExecutor = asyncFileUploadExecutor;
+ this.deployFromUrlExecutor = deployFromUrlExecutor;
+ this.applicationConfiguration = applicationConfiguration;
+ this.asyncUploadJobService = asyncUploadJobService;
+ this.fileService = fileService;
+ this.descriptorParserFactory = descriptorParserFactory;
+ this.deployFromUrlRemoteClient = deployFromUrlRemoteClient;
+ }
+
+ public AsyncUploadJobEntry executeUploadFromUrl(String spaceGuid, String namespace, String urlWithoutUserInfo, String decodedUrl,
+ UserCredentials userCredentials) {
+ var entry = createJobEntry(spaceGuid, namespace, urlWithoutUserInfo);
+ LOGGER.info(Messages.CREATING_ASYNC_UPLOAD_JOB, urlWithoutUserInfo, entry.getId());
+ asyncUploadJobService.add(entry);
+ try {
+ deployFromUrlExecutor.submit(() -> deployFromUrl(entry, decodedUrl, userCredentials));
+ } catch (RejectedExecutionException ignored) {
+ throw new RejectedAsyncUploadJobException(entry);
+ }
+ return entry;
+ }
+
+ private AsyncUploadJobEntry createJobEntry(String spaceGuid, String namespace, String url) {
+ return ImmutableAsyncUploadJobEntry.builder()
+ .id(UUID.randomUUID()
+ .toString())
+ .user(SecurityContextUtil.getUsername())
+ .addedAt(LocalDateTime.now())
+ .spaceGuid(spaceGuid)
+ .namespace(namespace)
+ .instanceIndex(applicationConfiguration.getApplicationInstanceIndex())
+ .url(url)
+ .state(AsyncUploadJobEntry.State.INITIAL)
+ .updatedAt(LocalDateTime.now())
+ .bytesRead(0L)
+ .build();
+ }
+
+ private void deployFromUrl(AsyncUploadJobEntry jobEntry, String fileUrl, UserCredentials userCredentials) {
+ LOGGER.info(Messages.STARTING_DOWNLOAD_OF_MTAR, jobEntry.getUrl());
+ var startTime = LocalDateTime.now();
+ Lock lock = new ReentrantLock();
+ AtomicLong counterRef = new AtomicLong();
+ try {
+ var updatedJobEntry = asyncUploadJobService.update(jobEntry, ImmutableAsyncUploadJobEntry.copyOf(jobEntry)
+ .withState(
+ AsyncUploadJobEntry.State.RUNNING)
+ .withUpdatedAt(LocalDateTime.now())
+ .withStartedAt(startTime));
+ startAsyncUploadFromUrlUpload(ImmutableUploadFromUrlContext.builder()
+ .jobEntry(updatedJobEntry)
+ .fileUrl(fileUrl)
+ .userCredentials(userCredentials)
+ .counterRef(counterRef)
+ .startTime(startTime)
+ .build(), lock);
+ updatedJobEntry = asyncUploadJobService.createQuery()
+ .id(updatedJobEntry.getId())
+ .singleResult();
+ monitorAsyncUploadJob(updatedJobEntry, lock, counterRef);
+ } catch (Exception e) {
+ LOGGER.error(MessageFormat.format(Messages.ASYNC_UPLOAD_JOB_FAILED, jobEntry.getId(), e.getMessage()), e);
+ updateFailedAsyncUploadJob(jobEntry, e, lock);
+ }
+ }
+
+ private void startAsyncUploadFromUrlUpload(UploadFromUrlContext uploadFromUrlContext, Lock lock) {
+ asyncFileUploadExecutor.submit(() -> {
+ try {
+ startSyncUploadFromUrlUpload(uploadFromUrlContext, lock);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ updateFailedAsyncUploadJob(uploadFromUrlContext.getJobEntry(), e, lock);
+ throw new SLException(e, e.getMessage());
+ }
+ });
+ }
+
+ private void startSyncUploadFromUrlUpload(UploadFromUrlContext uploadFromUrlContext, Lock lock) throws Exception {
+ FileEntry fileEntry = resilientOperationExecutor.execute(
+ (CheckedSupplier) () -> doUploadMtarFromUrl(uploadFromUrlContext, lock));
+ LOGGER.trace(Messages.UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID, uploadFromUrlContext.getJobEntry()
+ .getUrl(),
+ uploadFromUrlContext.getJobEntry()
+ .getId(), ChronoUnit.MILLIS.between(uploadFromUrlContext.getStartTime(), LocalDateTime.now()));
+ var descriptor = fileService.processFileContent(uploadFromUrlContext.getJobEntry()
+ .getSpaceGuid(), fileEntry.getId(),
+ this::extractDeploymentDescriptor);
+ LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_FINISHED, uploadFromUrlContext.getJobEntry()
+ .getId());
+ try {
+ lock.lock();
+ var uploadedEntry = asyncUploadJobService.createQuery()
+ .id(uploadFromUrlContext.getJobEntry()
+ .getId())
+ .singleResult();
+ asyncUploadJobService.update(uploadedEntry, ImmutableAsyncUploadJobEntry.copyOf(uploadedEntry)
+ .withFileId(fileEntry.getId())
+ .withMtaId(descriptor.getId())
+ .withUpdatedAt(LocalDateTime.now())
+ .withFinishedAt(LocalDateTime.now())
+ .withBytesRead(uploadFromUrlContext.getCounterRef()
+ .get())
+ .withState(AsyncUploadJobEntry.State.FINISHED));
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private FileEntry doUploadMtarFromUrl(UploadFromUrlContext uploadFromUrlContext, Lock lock) throws Exception {
+ FileFromUrlData fileFromUrlData = deployFromUrlRemoteClient.downloadFileFromUrl(uploadFromUrlContext);
+ String fileName = extractFileName(uploadFromUrlContext.getFileUrl());
+ FileUtils.validateFileHasExtension(fileName);
+ resetCounterOnRetry(uploadFromUrlContext, lock);
+ // Normal stream returned from the http response always returns 0 when InputStream::available() is executed which seems to break
+ // JClods library: https://issues.apache.org/jira/browse/JCLOUDS-1623
+ try (CountingInputStream source = new CountingInputStream(fileFromUrlData.fileInputStream(), uploadFromUrlContext.getCounterRef());
+ BufferedInputStream bufferedContent = new BufferedInputStream(source, INPUT_STREAM_BUFFER_SIZE)) {
+ LOGGER.debug(Messages.UPLOADING_MTAR_STREAM_FROM_REMOTE_ENDPOINT, fileFromUrlData.uri());
+ return fileService.addFile(ImmutableFileEntry.builder()
+ .space(uploadFromUrlContext.getJobEntry()
+ .getSpaceGuid())
+ .namespace(uploadFromUrlContext.getJobEntry()
+ .getNamespace())
+ .name(fileName)
+ .size(BigInteger.valueOf(fileFromUrlData.fileSize()))
+ .build(), bufferedContent);
+ }
+ }
+
+ private void resetCounterOnRetry(UploadFromUrlContext upload, Lock lock) {
+ try {
+ lock.lock();
+ upload.getCounterRef()
+ .set(0);
+ AsyncUploadJobEntry asyncUploadJobEntry = asyncUploadJobService.createQuery()
+ .id(upload.getJobEntry()
+ .getId())
+ .singleResult();
+ asyncUploadJobService.update(asyncUploadJobEntry, ImmutableAsyncUploadJobEntry.copyOf(asyncUploadJobEntry)
+ .withUpdatedAt(LocalDateTime.now())
+ .withBytesRead(0L));
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private String extractFileName(String url) {
+ String path = URI.create(url)
+ .getPath();
+ if (path.indexOf('/') == -1) {
+ return path;
+ }
+ String[] pathFragments = path.split("/");
+ return pathFragments[pathFragments.length - 1];
+ }
+
+ private DeploymentDescriptor extractDeploymentDescriptor(InputStream appArchiveStream) {
+ String descriptorString = ArchiveHandler.getDescriptor(appArchiveStream, applicationConfiguration.getMaxMtaDescriptorSize());
+ DescriptorParserFacade descriptorParserFacade = descriptorParserFactory.getInstance();
+ return descriptorParserFacade.parseDeploymentDescriptor(descriptorString);
+ }
+
+ private void monitorAsyncUploadJob(AsyncUploadJobEntry updatedJobEntry, Lock lock, AtomicLong counterRef) {
+ while (updatedJobEntry.getState() == AsyncUploadJobEntry.State.RUNNING) {
+ try {
+ lock.lock();
+ updatedJobEntry = asyncUploadJobService.createQuery()
+ .id(updatedJobEntry.getId())
+ .singleResult();
+ updatedJobEntry = asyncUploadJobService.update(updatedJobEntry, ImmutableAsyncUploadJobEntry.copyOf(updatedJobEntry)
+ .withBytesRead(counterRef.get())
+ .withUpdatedAt(
+ LocalDateTime.now()));
+ } finally {
+ lock.unlock();
+ }
+ waitBetweenUpdates();
+ }
+ }
+
+ protected void waitBetweenUpdates() {
+ MiscUtil.sleep(WAIT_TIME_BETWEEN_ASYNC_JOB_UPDATES_IN_MILLIS);
+ }
+
+ private void updateFailedAsyncUploadJob(AsyncUploadJobEntry jobEntry, Exception e, Lock lock) {
+ try {
+ lock.lock();
+ var failedEntry = asyncUploadJobService.createQuery()
+ .id(jobEntry.getId())
+ .singleResult();
+ asyncUploadJobService.update(failedEntry, ImmutableAsyncUploadJobEntry.copyOf(failedEntry)
+ .withUpdatedAt(LocalDateTime.now())
+ .withFinishedAt(LocalDateTime.now())
+ .withError(e.getMessage())
+ .withState(AsyncUploadJobEntry.State.ERROR));
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ protected ResilientOperationExecutor getResilientOperationExecutor() {
+ return new ResilientOperationExecutor();
+ }
+
+}
diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/UploadFromUrlContext.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/UploadFromUrlContext.java
new file mode 100644
index 0000000000..c1ef04215d
--- /dev/null
+++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/UploadFromUrlContext.java
@@ -0,0 +1,27 @@
+package org.cloudfoundry.multiapps.controller.web.upload;
+
+import java.time.LocalDateTime;
+import java.util.concurrent.atomic.AtomicLong;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.cloudfoundry.Nullable;
+import org.cloudfoundry.multiapps.controller.api.model.UserCredentials;
+import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@JsonSerialize(as = ImmutableUploadFromUrlContext.class)
+@JsonDeserialize(as = ImmutableUploadFromUrlContext.class)
+public interface UploadFromUrlContext {
+
+ AsyncUploadJobEntry getJobEntry();
+
+ String getFileUrl();
+
+ @Nullable
+ UserCredentials getUserCredentials();
+
+ AtomicLong getCounterRef();
+
+ LocalDateTime getStartTime();
+}
diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClient.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClient.java
new file mode 100644
index 0000000000..868ea8611c
--- /dev/null
+++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClient.java
@@ -0,0 +1,147 @@
+package org.cloudfoundry.multiapps.controller.web.upload.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.charset.StandardCharsets;
+import java.text.MessageFormat;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Base64;
+import jakarta.inject.Inject;
+import jakarta.inject.Named;
+import org.apache.commons.io.IOUtils;
+import org.cloudfoundry.multiapps.common.SLException;
+import org.cloudfoundry.multiapps.controller.api.model.UserCredentials;
+import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier;
+import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor;
+import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration;
+import org.cloudfoundry.multiapps.controller.core.util.UriUtil;
+import org.cloudfoundry.multiapps.controller.web.Constants;
+import org.cloudfoundry.multiapps.controller.web.Messages;
+import org.cloudfoundry.multiapps.controller.web.upload.UploadFromUrlContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+
+@Named
+public class DeployFromUrlRemoteClient {
+
+ private static final Duration HTTP_CONNECT_TIMEOUT = Duration.ofMinutes(10);
+ private static final String USERNAME_PASSWORD_URL_FORMAT = "{0}:{1}";
+ private static final int ERROR_RESPONSE_BODY_MAX_LENGTH = 4 * 1024;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DeployFromUrlRemoteClient.class);
+
+ private final HttpClient httpClient = buildHttpClient();
+ private final ResilientOperationExecutor resilientOperationExecutor = getResilientOperationExecutor();
+
+ private final ApplicationConfiguration applicationConfiguration;
+
+ @Inject
+ public DeployFromUrlRemoteClient(ApplicationConfiguration applicationConfiguration) {
+ this.applicationConfiguration = applicationConfiguration;
+ }
+
+ public FileFromUrlData downloadFileFromUrl(UploadFromUrlContext uploadFromUrlContext) throws Exception {
+ if (!UriUtil.isUrlSecure(uploadFromUrlContext.getFileUrl())) {
+ throw new SLException(Messages.MTAR_ENDPOINT_NOT_SECURE);
+ }
+ UriUtil.validateUrl(uploadFromUrlContext.getFileUrl());
+
+ HttpResponse response = callRemoteEndpointWithRetry(uploadFromUrlContext.getFileUrl(),
+ uploadFromUrlContext.getUserCredentials());
+ long fileSize = response.headers()
+ .firstValueAsLong(Constants.CONTENT_LENGTH)
+ .orElseThrow(() -> new SLException(Messages.FILE_URL_RESPONSE_DID_NOT_RETURN_CONTENT_LENGTH));
+
+ long maxUploadSize = applicationConfiguration.getMaxUploadSize();
+ if (fileSize > maxUploadSize) {
+ throw new SLException(MessageFormat.format(Messages.MAX_UPLOAD_SIZE_EXCEEDED, maxUploadSize));
+ }
+ return new FileFromUrlData(response.body(), response.uri(), fileSize);
+ }
+
+ private HttpResponse callRemoteEndpointWithRetry(String decodedUrl, UserCredentials userCredentials)
+ throws Exception {
+ return resilientOperationExecutor.execute((CheckedSupplier>) () -> {
+ var request = buildFetchFileRequest(decodedUrl, userCredentials);
+ LOGGER.debug(Messages.CALLING_REMOTE_MTAR_ENDPOINT, getMaskedUri(urlDecodeUrl(decodedUrl)));
+ var response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
+ if (response.statusCode() / 100 != 2) {
+ String error = readErrorBodyFromResponse(response);
+ LOGGER.error(error);
+ if (response.statusCode() == HttpStatus.UNAUTHORIZED.value()) {
+ String errorMessage = MessageFormat.format(Messages.DEPLOY_FROM_URL_WRONG_CREDENTIALS,
+ UriUtil.stripUserInfo(decodedUrl));
+ throw new SLException(errorMessage);
+ }
+ throw new SLException(MessageFormat.format(Messages.ERROR_FROM_REMOTE_MTAR_ENDPOINT, getMaskedUri(urlDecodeUrl(decodedUrl)),
+ response.statusCode(), error));
+ }
+ return response;
+ });
+ }
+
+ private String getMaskedUri(String url) {
+ if (url.contains("@")) {
+ return url.substring(url.lastIndexOf("@"))
+ .replace("@", "...");
+ } else {
+ return url;
+ }
+ }
+
+ private String urlDecodeUrl(String url) {
+ return URLDecoder.decode(url, StandardCharsets.UTF_8);
+ }
+
+ private HttpRequest buildFetchFileRequest(String decodedUrl, UserCredentials userCredentials) {
+ var builder = HttpRequest.newBuilder()
+ .GET()
+ .timeout(Duration.ofMinutes(15));
+ var uri = URI.create(decodedUrl);
+ var userInfo = uri.getUserInfo();
+ if (userCredentials != null) {
+ builder.uri(uri);
+ String userCredentialsUrlFormat = MessageFormat.format(USERNAME_PASSWORD_URL_FORMAT, userCredentials.getUsername(),
+ userCredentials.getPassword());
+ String encodedAuth = Base64.getEncoder()
+ .encodeToString(userCredentialsUrlFormat.getBytes());
+ builder.header(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth);
+ } else if (userInfo != null) {
+ builder.uri(URI.create(decodedUrl.replace(uri.getRawUserInfo() + "@", "")));
+ String encodedAuth = Base64.getEncoder()
+ .encodeToString(userInfo.getBytes());
+ builder.header(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth);
+ } else {
+ builder.uri(uri);
+ }
+ return builder.build();
+ }
+
+ private String readErrorBodyFromResponse(HttpResponse response) throws IOException {
+ try (InputStream is = response.body()) {
+ byte[] buffer = new byte[ERROR_RESPONSE_BODY_MAX_LENGTH];
+ int read = IOUtils.read(is, buffer);
+ return new String(Arrays.copyOf(buffer, read));
+ }
+ }
+
+ protected HttpClient buildHttpClient() {
+ return HttpClient.newBuilder()
+ .version(HttpClient.Version.HTTP_2)
+ .connectTimeout(HTTP_CONNECT_TIMEOUT)
+ .followRedirects(HttpClient.Redirect.NORMAL)
+ .build();
+ }
+
+ protected ResilientOperationExecutor getResilientOperationExecutor() {
+ return new ResilientOperationExecutor();
+ }
+}
diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/FileFromUrlData.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/FileFromUrlData.java
new file mode 100644
index 0000000000..585f3e9755
--- /dev/null
+++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/FileFromUrlData.java
@@ -0,0 +1,7 @@
+package org.cloudfoundry.multiapps.controller.web.upload.client;
+
+import java.io.InputStream;
+import java.net.URI;
+
+public record FileFromUrlData(InputStream fileInputStream, URI uri, long fileSize) {
+}
diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/exception/RejectedAsyncUploadJobException.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/exception/RejectedAsyncUploadJobException.java
new file mode 100644
index 0000000000..3fa4fb09bd
--- /dev/null
+++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/exception/RejectedAsyncUploadJobException.java
@@ -0,0 +1,17 @@
+package org.cloudfoundry.multiapps.controller.web.upload.exception;
+
+import java.util.concurrent.RejectedExecutionException;
+import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry;
+
+public class RejectedAsyncUploadJobException extends RejectedExecutionException {
+
+ private final AsyncUploadJobEntry asyncUploadJobEntry;
+
+ public RejectedAsyncUploadJobException(AsyncUploadJobEntry asyncUploadJobEntry) {
+ this.asyncUploadJobEntry = asyncUploadJobEntry;
+ }
+
+ public AsyncUploadJobEntry getAsyncUploadJobEntry() {
+ return asyncUploadJobEntry;
+ }
+}
diff --git a/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java
index 94197f8d70..77cad5d5ee 100644
--- a/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java
+++ b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java
@@ -1,19 +1,7 @@
package org.cloudfoundry.multiapps.controller.web.api.impl;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.math.BigInteger;
-import java.net.http.HttpClient;
-import java.net.http.HttpHeaders;
-import java.net.http.HttpRequest;
-import java.net.http.HttpResponse;
-import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Base64;
@@ -24,39 +12,33 @@
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
-
import org.apache.commons.lang3.RandomStringUtils;
import org.cloudfoundry.multiapps.common.SLException;
import org.cloudfoundry.multiapps.controller.api.model.AsyncUploadResult;
import org.cloudfoundry.multiapps.controller.api.model.FileMetadata;
import org.cloudfoundry.multiapps.controller.api.model.ImmutableFileUrl;
-import org.cloudfoundry.multiapps.controller.api.model.ImmutableUserCredentials;
-import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor;
import org.cloudfoundry.multiapps.controller.core.auditlogging.FilesApiServiceAuditLog;
-import org.cloudfoundry.multiapps.controller.core.helpers.DescriptorParserFacadeFactory;
-import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration;
import org.cloudfoundry.multiapps.controller.core.util.UserInfo;
import org.cloudfoundry.multiapps.controller.persistence.Constants;
import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry;
+import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry.State;
import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry;
+import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableAsyncUploadJobEntry;
import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry;
import org.cloudfoundry.multiapps.controller.persistence.query.AsyncUploadJobsQuery;
import org.cloudfoundry.multiapps.controller.persistence.services.AsyncUploadJobService;
import org.cloudfoundry.multiapps.controller.persistence.services.FileService;
import org.cloudfoundry.multiapps.controller.persistence.services.FileStorageException;
-import org.junit.jupiter.api.BeforeAll;
+import org.cloudfoundry.multiapps.controller.web.upload.AsyncUploadJobOrchestrator;
+import org.cloudfoundry.multiapps.controller.web.upload.exception.RejectedAsyncUploadJobException;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.Answers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.mockito.Spy;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -64,78 +46,75 @@
import org.springframework.security.oauth2.core.user.DefaultOAuth2User;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
-
-import jakarta.persistence.NoResultException;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_SELF;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
class FilesApiServiceImplTest {
- private static final String MTA_ID = "anatz";
- private static final String FILE_URL = Base64.getUrlEncoder()
- .encodeToString("https://host.domain/test.mtar?query=true".getBytes(StandardCharsets.UTF_8));
private static final String SPACE_GUID = "896e6be9-8217-4a1c-b938-09b30966157a";
private static final String NAMESPACE = "custom-namespace";
private static final String DIGEST_CHARACTER_TABLE = "123456789ABCDEF";
- private static final String DECODED_URL_WITH_CREDENTIALS_IN_THE_UTL = "https://abc:abv@google.com";
- private static final String DECODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL = "https://abc:ab%40v@google.com";
- private static final String ENCODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL_AND_IN_PARAM = "https://abc:ab%40v@google.com/%40user";
- private static final String DECODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL_AND_IN_PARAM = "https://abc:ab@v@google.com/@user";
+
+ private static final String FILE_URL = Base64.getUrlEncoder()
+ .encodeToString(
+ "https://host.domain/test.mtar?query=true".getBytes(StandardCharsets.UTF_8));
+ private static final String DECODED_URL_WITH_CREDENTIALS_IN_THE_URL = "https://abc:abv@google.com";
+
@Mock
private FileService fileService;
@Mock
private MultipartHttpServletRequest request;
@Mock
private MultipartFile file;
- @Mock
- private HttpClient httpClient;
@InjectMocks
- private final FilesApiServiceImpl testedClass = new FilesApiServiceImpl() {
- @Override
- protected HttpClient buildHttpClient(String url) {
- return httpClient;
- }
-
- @Override
- protected ResilientOperationExecutor getResilientOperationExecutor() {
- return new ResilientOperationExecutor().withRetryCount(0)
- .withWaitTimeBetweenRetriesInMillis(0);
- }
- };
+ private final FilesApiServiceImpl testedClass = new FilesApiServiceImpl();
@Mock
private FilesApiServiceAuditLog filesApiServiceAuditLog;
- @Mock
- private HttpResponse fileUrlResponse;
- @Mock(name = "deployFromUrlExecutor")
- private ExecutorService asyncFileUploadExecutor;
@Mock(name = "fileStorageThreadPool")
private ExecutorService fileStorageThreadPool;
@Mock
- private ApplicationConfiguration configuration;
- @Spy
- private DescriptorParserFacadeFactory descriptorParserFactory = new DescriptorParserFacadeFactory(configuration);
+ private AsyncUploadJobOrchestrator asyncUploadJobOrchestrator;
@Mock
private AsyncUploadJobService uploadJobService;
- @BeforeAll
- public static void setUser() {
- var user = new UserInfo("user1", "user1", null);
- var token = new DefaultOAuth2User(Collections.emptyList(), Map.of("user_info", user), "user_info");
- SecurityContextHolder.getContext()
- .setAuthentication(new OAuth2AuthenticationToken(token, Collections.emptyList(), "id"));
- }
-
@BeforeEach
public void initialize() throws Exception {
MockitoAnnotations.openMocks(this)
.close();
+ SecurityContextHolder.clearContext();
+ var user = new UserInfo("user1", "user1", null);
+ var token = new DefaultOAuth2User(Collections.emptyList(), Map.of("user_info", user), "user_info");
+ SecurityContextHolder.getContext()
+ .setAuthentication(new OAuth2AuthenticationToken(token, Collections.emptyList(), "id"));
Mockito.when(request.getRequestURI())
.thenReturn("");
prepareFileStorageThreadPool();
}
+ @AfterEach
+ public void cleanup() {
+ SecurityContextHolder.clearContext();
+ }
+
+ private void prepareFileStorageThreadPool() {
+ when(fileStorageThreadPool.submit(any(Callable.class))).thenAnswer(invocation -> {
+ Callable> callable = invocation.getArgument(0);
+ FutureTask> futureTask = new FutureTask<>(callable);
+ futureTask.run();
+ return futureTask;
+ });
+ }
+
@Test
void testGetMtaFiles() throws Exception {
FileEntry entryOne = createFileEntry("test.mtar");
- FileEntry entryTwo = createFileEntry("extension.mtaet");
+ FileEntry entryTwo = createFileEntry("extension.mtaext");
Mockito.when(fileService.listFiles(Mockito.eq(SPACE_GUID), Mockito.eq(NAMESPACE)))
.thenReturn(List.of(entryOne, entryTwo));
ResponseEntity> response = testedClass.getFiles(SPACE_GUID, NAMESPACE);
@@ -144,7 +123,28 @@ void testGetMtaFiles() throws Exception {
assertEquals(2, files.size());
assertMetadataMatches(entryOne, files.get(0));
assertMetadataMatches(entryTwo, files.get(1));
+ }
+ private FileEntry createFileEntry(String name) {
+ return ImmutableFileEntry.builder()
+ .id(UUID.randomUUID()
+ .toString())
+ .digest(RandomStringUtils.random(32, DIGEST_CHARACTER_TABLE))
+ .digestAlgorithm(Constants.DIGEST_ALGORITHM)
+ .name(name)
+ .namespace(NAMESPACE)
+ .size(BigInteger.valueOf(new Random().nextInt(1024 * 1024 * 10)))
+ .space(SPACE_GUID)
+ .build();
+ }
+
+ private void assertMetadataMatches(FileEntry expected, FileMetadata actual) {
+ assertEquals(expected.getId(), actual.getId());
+ assertEquals(expected.getName(), actual.getName());
+ assertEquals(expected.getSpace(), actual.getSpace());
+ assertEquals(expected.getSize(), actual.getSize());
+ assertEquals(expected.getDigest(), actual.getDigest());
+ assertEquals(expected.getDigestAlgorithm(), actual.getDigestAlgorithm());
}
@Test
@@ -175,8 +175,7 @@ void testUploadMtaFile() throws Exception {
.namespace(NAMESPACE)
.name(fileName)
.size(BigInteger.valueOf(fileSize))
- .build()),
- Mockito.any(InputStream.class)))
+ .build()), any(InputStream.class)))
.thenReturn(fileEntry);
ResponseEntity response = testedClass.uploadFile(request, SPACE_GUID, NAMESPACE);
@@ -189,402 +188,315 @@ void testUploadMtaFile() throws Exception {
.namespace(NAMESPACE)
.name(fileName)
.size(BigInteger.valueOf(fileSize))
- .build()),
- Mockito.any(InputStream.class));
+ .build()), any(InputStream.class));
FileMetadata fileMetadata = response.getBody();
assertMetadataMatches(fileEntry, fileMetadata);
}
@Test
- void testUploadFileFromUrl() throws Exception {
- String fileName = "test.mtar";
- FileEntry fileEntry = createFileEntry(fileName);
- HttpHeaders headers = HttpHeaders.of(Map.of("Content-Length", List.of("20")), (a, b) -> true);
- InputStream mockStream = Mockito.mock(InputStream.class);
- AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF);
- var jobEntry = mockUploadJobEntry(fileEntry.getId(), AsyncUploadJobEntry.State.FINISHED, null);
-
- Mockito.when(fileUrlResponse.headers())
- .thenReturn(headers);
- Mockito.when(fileUrlResponse.statusCode())
- .thenReturn(200);
- Mockito.when(fileUrlResponse.body())
- .thenReturn(mockStream);
-
- Mockito.when(httpClient.send(Mockito.any(), Mockito.eq(BodyHandlers.ofInputStream())))
- .thenReturn(fileUrlResponse);
- AsyncUploadJobsQuery queryReturningNoJobs = mock(AsyncUploadJobsQuery.class);
- when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(queryReturningNoJobs);
- when(query.list()).thenReturn(List.of(jobEntry));
- Mockito.when(uploadJobService.createQuery())
- .thenReturn(query);
- Mockito.when(uploadJobService.update(any(), any()))
- .thenReturn(jobEntry);
-
- Mockito.when(fileService.addFile(Mockito.eq(ImmutableFileEntry.builder()
- .name(SPACE_GUID)
- .namespace(NAMESPACE)
- .name(fileName)
- .size(BigInteger.valueOf(20L))
- .build()),
- Mockito.any(InputStream.class)))
- .thenReturn(fileEntry);
- Mockito.when(fileService.getFile(Mockito.eq(SPACE_GUID), Mockito.eq(fileEntry.getId())))
- .thenReturn(fileEntry);
- Mockito.when(configuration.getMaxUploadSize())
- .thenReturn(ApplicationConfiguration.DEFAULT_MAX_UPLOAD_SIZE);
- Future> future = Mockito.mock(Future.class);
- when(future.isDone()).thenReturn(true);
- prepareAsyncExecutor(future);
-
- ResponseEntity startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE,
- ImmutableFileUrl.of(FILE_URL, null));
-
- assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED);
-
- String jobUrl = startUploadResponse.getHeaders()
- .getFirst("Location");
- String jobGuid = jobUrl.substring(jobUrl.lastIndexOf('/'));
-
- ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobGuid);
-
- assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.CREATED);
-
- Mockito.verify(fileService)
- .addFile(Mockito.eq(ImmutableFileEntry.builder()
- .name(fileName)
- .namespace(NAMESPACE)
- .space(SPACE_GUID)
- .size(BigInteger.valueOf(20L))
- .build()),
- Mockito.any(InputStream.class));
-
- var responseBody = uploadJobResponse.getBody();
- var fileMetadata = responseBody.getFile();
- assertMetadataMatches(fileEntry, fileMetadata);
-
- var mtaId = responseBody.getMtaId();
- var status = responseBody.getStatus();
- assertEquals(mtaId, MTA_ID);
- assertEquals(status, AsyncUploadResult.JobStatus.FINISHED);
- }
-
- private void prepareFileStorageThreadPool() {
- when(fileStorageThreadPool.submit(any(Callable.class))).thenAnswer(invocation -> {
- Callable> callable = invocation.getArgument(0);
- FutureTask> futureTask = new FutureTask<>(callable);
- futureTask.run();
- return futureTask;
- });
- }
-
- private void prepareAsyncExecutor(Future> future) {
- Mockito.doAnswer(invocationOnMock -> {
- Runnable r = invocationOnMock.getArgument(0);
- r.run();
- return future;
- })
- .when(asyncFileUploadExecutor)
- .submit((Runnable) Mockito.any());
- }
-
- @Test
- void testUploadFileFromUrlWithInvalidJobId() {
- AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF);
- Mockito.doThrow(NoResultException.class)
- .when(query)
- .singleResult();
-
- Mockito.when(uploadJobService.createQuery())
- .thenReturn(query);
-
- ResponseEntity response = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, "invalid");
-
- assertEquals(response.getStatusCode(), HttpStatus.NOT_FOUND);
- }
-
- @Test
- void testFileUrlDoesntReturnContentLength() throws Exception {
- HttpHeaders headers = HttpHeaders.of(Collections.emptyMap(), (a, b) -> true);
- AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF);
- String error = "no content length";
- var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.ERROR, error);
-
- Mockito.when(fileUrlResponse.headers())
- .thenReturn(headers);
- Mockito.when(fileUrlResponse.statusCode())
- .thenReturn(200);
-
- Mockito.when(httpClient.send(Mockito.any(), Mockito.eq(BodyHandlers.ofInputStream())))
- .thenReturn(fileUrlResponse);
-
- AsyncUploadJobsQuery queryReturningNoJobs = mock(AsyncUploadJobsQuery.class);
- when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(queryReturningNoJobs);
- when(query.list()).thenReturn(List.of(jobEntry));
- Mockito.when(uploadJobService.createQuery())
- .thenReturn(query);
- Mockito.when(uploadJobService.update(any(), any()))
- .thenReturn(jobEntry);
- Future> future = Mockito.mock(Future.class);
- when(future.isDone()).thenReturn(true);
- prepareAsyncExecutor(future);
-
- ResponseEntity startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE,
- ImmutableFileUrl.of(FILE_URL, null));
-
- assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED);
-
- String jobUrl = startUploadResponse.getHeaders()
- .getFirst("Location");
- String jobGuid = jobUrl.substring(jobUrl.lastIndexOf('/'));
-
- ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobGuid);
-
- assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.OK);
-
- var responseBody = uploadJobResponse.getBody();
- assertEquals(responseBody.getStatus(), AsyncUploadResult.JobStatus.ERROR);
- assertEquals(responseBody.getError(), error);
+ void testStartDeployFromUrl() {
+ AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF);
+ when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery);
+ when(asyncUploadJobsQuery.list()).thenReturn(Collections.emptyList());
+
+ String expectedJobId = UUID.randomUUID()
+ .toString();
+ when(asyncUploadJobOrchestrator.executeUploadFromUrl(any(), any(), any(), any(), any())).thenReturn(
+ ImmutableAsyncUploadJobEntry.builder()
+ .id(expectedJobId)
+ .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL)
+ .startedAt(LocalDateTime.now())
+ .state(State.INITIAL)
+ .user("user1")
+ .spaceGuid(SPACE_GUID)
+ .instanceIndex(0)
+ .build());
+ ResponseEntity response = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.builder()
+ .fileUrl(FILE_URL)
+ .build());
+ assertEquals(HttpStatus.ACCEPTED, response.getStatusCode());
+ assertEquals("spaces/" + SPACE_GUID + "/files/jobs/" + expectedJobId, response.getHeaders()
+ .getLocation()
+ .toString());
+ Mockito.verify(asyncUploadJobOrchestrator)
+ .executeUploadFromUrl(eq(SPACE_GUID), eq(NAMESPACE), any(String.class), any(String.class), eq(null));
}
@Test
- void testUploadFromUrlWhenThereIsValidExistingJob() {
- AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF);
- when(uploadJobService.createQuery()).thenReturn(query);
- var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.ERROR, null);
- when(query.list()).thenReturn(List.of(jobEntry));
- Future> runningTask = mock(Future.class);
- prepareAsyncExecutor(runningTask);
- when(uploadJobService.update(any(), any())).thenReturn(jobEntry);
- ResponseEntity firstUpload = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.of(FILE_URL, null));
- String locationHeader = firstUpload.getHeaders()
- .getFirst(org.springframework.http.HttpHeaders.LOCATION);
- String createdJobId = locationHeader.substring(locationHeader.lastIndexOf("/") + 1);
- when(jobEntry.getId()).thenReturn(createdJobId);
- ResponseEntity secondUpload = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.of(FILE_URL, null));
- assertEquals(HttpStatus.SEE_OTHER, secondUpload.getStatusCode());
- }
-
- @Test
- void testFileUrlReturnsContentLengthAboveMaxUploadSize() throws Exception {
- long invalidFileSize = ApplicationConfiguration.DEFAULT_MAX_UPLOAD_SIZE + 1024;
- String fileSize = Long.toString(invalidFileSize);
- AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF);
- String error = "content length exceeds max permitted size of 4GB";
- HttpHeaders headers = HttpHeaders.of(Map.of("Content-Length", List.of(fileSize)), (a, b) -> true);
- var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.ERROR, error);
-
- Mockito.when(fileUrlResponse.headers())
- .thenReturn(headers);
- Mockito.when(fileUrlResponse.statusCode())
- .thenReturn(200);
-
- Mockito.when(httpClient.send(Mockito.any(), Mockito.eq(BodyHandlers.ofInputStream())))
- .thenReturn(fileUrlResponse);
-
- AsyncUploadJobsQuery queryReturningNoJobs = mock(AsyncUploadJobsQuery.class);
- when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(queryReturningNoJobs);
- when(query.list()).thenReturn(List.of(jobEntry));
- Mockito.when(uploadJobService.createQuery())
- .thenReturn(query);
- Mockito.when(uploadJobService.update(any(), any()))
- .thenReturn(jobEntry);
- Future> future = Mockito.mock(Future.class);
- when(future.isDone()).thenReturn(true);
- prepareAsyncExecutor(future);
-
- ResponseEntity startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE,
- ImmutableFileUrl.of(FILE_URL, null));
-
- assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED);
-
- String jobUrl = startUploadResponse.getHeaders()
- .getFirst("Location");
- String jobGuid = jobUrl.substring(jobUrl.lastIndexOf('/'));
-
- ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobGuid);
-
- assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.OK);
-
- var responseBody = uploadJobResponse.getBody();
- assertEquals(responseBody.getStatus(), AsyncUploadResult.JobStatus.ERROR);
- assertEquals(responseBody.getError(), error);
- }
-
- @ParameterizedTest
- @ValueSource(strings = { "https://host.domain/path/file?query=true", "http://host.domain/path/file.mtar?query=true" })
- void testUploadFileWithInvalidUrl(String url) throws Exception {
- AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF);
- HttpHeaders headers = HttpHeaders.of(Map.of("Content-Length", List.of("20")), (a, b) -> true);
- var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.ERROR, "error");
-
- Mockito.when(fileUrlResponse.statusCode())
- .thenReturn(200);
- Mockito.when(fileUrlResponse.headers())
- .thenReturn(headers);
-
- Mockito.when(httpClient.send(Mockito.any(), Mockito.eq(BodyHandlers.ofInputStream())))
- .thenReturn(fileUrlResponse);
-
- AsyncUploadJobsQuery queryReturningNoJobs = mock(AsyncUploadJobsQuery.class);
- when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(queryReturningNoJobs);
- when(query.list()).thenReturn(List.of(jobEntry));
- Mockito.when(uploadJobService.createQuery())
- .thenReturn(query);
- Mockito.when(uploadJobService.update(any(), any()))
- .thenReturn(jobEntry);
- Future> future = Mockito.mock(Future.class);
- when(future.isDone()).thenReturn(true);
- prepareAsyncExecutor(future);
+ void testStartDeployFromUrlWhenExecutorRejectsJob() {
+ AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF);
+ when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery);
+ when(asyncUploadJobsQuery.list()).thenReturn(Collections.emptyList());
- String invalidFileUrl = Base64.getUrlEncoder()
- .encodeToString(url.getBytes(StandardCharsets.UTF_8));
+ AsyncUploadJobEntry rejectedEntry = mock(AsyncUploadJobEntry.class);
+ when(rejectedEntry.getId()).thenReturn("rejected-job-id");
- ResponseEntity startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE,
- ImmutableFileUrl.of(invalidFileUrl, null));
+ when(asyncUploadJobOrchestrator.executeUploadFromUrl(any(), any(), any(), any(), any())).thenThrow(
+ new RejectedAsyncUploadJobException(rejectedEntry));
- assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED);
+ ResponseEntity response = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.builder()
+ .fileUrl(FILE_URL)
+ .build());
- String jobUrl = startUploadResponse.getHeaders()
- .getFirst("Location");
- String jobGuid = jobUrl.substring(jobUrl.lastIndexOf('/'));
+ assertEquals(HttpStatus.TOO_MANY_REQUESTS, response.getStatusCode());
+ assertEquals("30", response.getHeaders()
+ .getFirst("Retry-After"));
- ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobGuid);
+ Mockito.verify(asyncUploadJobOrchestrator)
+ .executeUploadFromUrl(eq(SPACE_GUID), eq(NAMESPACE), any(String.class), any(String.class), eq(null));
- assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.OK);
-
- var responseBody = uploadJobResponse.getBody();
- assertEquals(AsyncUploadResult.JobStatus.ERROR, responseBody.getStatus());
+ Mockito.verify(asyncUploadJobsQuery)
+ .delete();
}
@Test
- void testGetUploadFromUrlOnDifferentInstance() {
- AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF);
- when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(query);
- var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.RUNNING, null);
- when(query.list()).thenReturn(List.of(jobEntry));
- Mockito.when(uploadJobService.createQuery())
- .thenReturn(query);
- when(configuration.getApplicationInstanceIndex()).thenReturn(3);
- ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobEntry.getId());
- assertEquals(AsyncUploadResult.JobStatus.ERROR, uploadJobResponse.getBody()
- .getStatus());
+ void testStartDeployFromUrlWhenStuckJobExists() {
+ AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF);
+ when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery);
+
+ String existingJobId = UUID.randomUUID()
+ .toString();
+ AsyncUploadJobEntry stuckJob = ImmutableAsyncUploadJobEntry.builder()
+ .id(existingJobId)
+ .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL)
+ .startedAt(LocalDateTime.now()
+ .minusMinutes(5))
+ .updatedAt(LocalDateTime.now()
+ .minusMinutes(2))
+ .state(State.RUNNING)
+ .user("user1")
+ .spaceGuid(SPACE_GUID)
+ .instanceIndex(0)
+ .build();
+
+ when(asyncUploadJobsQuery.list()).thenReturn(List.of(stuckJob))
+ .thenReturn(Collections.emptyList());
+
+ String newJobId = UUID.randomUUID()
+ .toString();
+ when(asyncUploadJobOrchestrator.executeUploadFromUrl(any(), any(), any(), any(), any())).thenReturn(
+ ImmutableAsyncUploadJobEntry.builder()
+ .id(newJobId)
+ .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL)
+ .startedAt(LocalDateTime.now())
+ .state(State.INITIAL)
+ .user("user1")
+ .spaceGuid(SPACE_GUID)
+ .instanceIndex(0)
+ .build());
+
+ ResponseEntity response = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.builder()
+ .fileUrl(FILE_URL)
+ .build());
+
+ assertEquals(HttpStatus.ACCEPTED, response.getStatusCode());
+ assertEquals("spaces/" + SPACE_GUID + "/files/jobs/" + newJobId, response.getHeaders()
+ .getLocation()
+ .toString());
+
+ Mockito.verify(asyncUploadJobsQuery)
+ .id(existingJobId);
+ Mockito.verify(asyncUploadJobsQuery)
+ .delete();
+ Mockito.verify(asyncUploadJobOrchestrator)
+ .executeUploadFromUrl(eq(SPACE_GUID), eq(NAMESPACE), any(String.class), any(String.class), eq(null));
}
@Test
- void testCallRemoteEndpointWithRetry_withSuccessfulCall() throws Exception {
- HttpResponse