From 79895267b170663099609d4c0f4fb2fad47d7d5e Mon Sep 17 00:00:00 2001 From: IvanBorislavovDimitrov Date: Mon, 29 Sep 2025 14:38:43 +0300 Subject: [PATCH 1/2] Remove ecplipse link shared cache LMCROSSITXSADEPLOY-3285 --- .../src/main/resources/META-INF/persistence.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/multiapps-controller-persistence/src/main/resources/META-INF/persistence.xml b/multiapps-controller-persistence/src/main/resources/META-INF/persistence.xml index 363dce3249..684a213264 100644 --- a/multiapps-controller-persistence/src/main/resources/META-INF/persistence.xml +++ b/multiapps-controller-persistence/src/main/resources/META-INF/persistence.xml @@ -19,6 +19,7 @@ + From 0ae139b8821e332f78c374436f9dd9681b258a87 Mon Sep 17 00:00:00 2001 From: IvanBorislavovDimitrov Date: Mon, 29 Sep 2025 14:38:23 +0300 Subject: [PATCH 2/2] Refactor deploy from URL API so it does not depend on app instance LMCROSSITXSADEPLOY-3285 --- .../multiapps/controller/core/Messages.java | 1 + .../core/util/ApplicationConfiguration.java | 19 +- .../persistence/dto/AsyncUploadJobDto.java | 60 +- .../model/AsyncUploadJobEntry.java | 7 +- .../model/PersistenceMetadata.java | 2 + .../services/AsyncUploadJobService.java | 7 +- .../db-changelog-2.35.0-persistence.xml | 19 + .../persistence/db/changelog/db-changelog.xml | 3 + .../FileUploadThreadPoolConfiguration.java | 17 +- .../process/stream/CountingInputStream.java | 23 + .../multiapps/controller/web/Messages.java | 9 +- .../web/api/impl/FilesApiServiceImpl.java | 342 ++------ .../upload/AsyncUploadJobOrchestrator.java | 275 +++++++ .../web/upload/UploadFromUrlContext.java | 27 + .../client/DeployFromUrlRemoteClient.java | 147 ++++ .../web/upload/client/FileFromUrlData.java | 7 + .../RejectedAsyncUploadJobException.java | 17 + .../web/api/impl/FilesApiServiceImplTest.java | 758 ++++++++---------- .../impl/OperationsApiServiceImplTest.java | 2 - .../AsyncUploadJobOrchestratorTest.java | 457 +++++++++++ .../client/DeployFromUrlRemoteClientTest.java | 272 +++++++ 21 files changed, 1723 insertions(+), 748 deletions(-) create mode 100644 multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-2.35.0-persistence.xml create mode 100644 multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/stream/CountingInputStream.java create mode 100644 multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestrator.java create mode 100644 multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/UploadFromUrlContext.java create mode 100644 multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClient.java create mode 100644 multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/FileFromUrlData.java create mode 100644 multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/exception/RejectedAsyncUploadJobException.java create mode 100644 multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestratorTest.java create mode 100644 multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClientTest.java diff --git a/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/Messages.java b/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/Messages.java index 5d99deb953..9d684eb48d 100644 --- a/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/Messages.java +++ b/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/Messages.java @@ -193,6 +193,7 @@ public final class Messages { public static final String ABORTED_OPERATIONS_TTL_IN_SECONDS = "Aborted operations TTL in seconds: {0}"; public static final String SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = "Spring scheduler task executor threads: {0}"; public static final String FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = "Files async executor max threads: {0}"; + public static final String DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = "Deploy from URL executor max threads: {0}"; public static final String ON_START_FILES_CLEANER_WITHOUT_CONTENT_ENABLED_0 = "On start files cleaner without content enabled: {0}"; public static final String THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER_0 = "Threads for file upload to controller: {0}"; public static final String THREADS_FOR_FILE_STORAGE_UPLOAD_0 = "Threads for file storage upload: {0}"; diff --git a/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/util/ApplicationConfiguration.java b/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/util/ApplicationConfiguration.java index 7eba04496b..4d856b479f 100644 --- a/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/util/ApplicationConfiguration.java +++ b/multiapps-controller-core/src/main/java/org/cloudfoundry/multiapps/controller/core/util/ApplicationConfiguration.java @@ -9,7 +9,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import jakarta.inject.Inject; import jakarta.inject.Named; import org.apache.commons.collections4.CollectionUtils; @@ -27,7 +26,6 @@ import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.support.CronExpression; - import static java.text.MessageFormat.format; @Named @@ -96,6 +94,7 @@ public class ApplicationConfiguration { static final String CFG_ABORTED_OPERATIONS_TTL_IN_MINUTES = "ABORTED_OPERATIONS_TTL_IN_SECONDS"; static final String CFG_SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = "SPRING_SCHEDULER_TASK_EXECUTOR_THREADS"; static final String CFG_FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = "FILES_ASYNC_UPLOAD_EXECUTOR_THREADS"; + static final String CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = "DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS"; static final String CFG_ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER = "ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER"; static final String CFG_THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER = "THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER"; static final String CFG_THREADS_FOR_FILE_STORAGE_UPLOAD = "THREADS_FOR_FILE_STORAGE_UPLOAD"; @@ -153,6 +152,7 @@ public class ApplicationConfiguration { public static final String DEFAULT_GLOBAL_AUDITOR_ORIGIN = "uaa"; public static final int DEFAULT_SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = 3; public static final int DEFAULT_FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = 6; + public static final int DEFAULT_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = 26; public static final boolean DEFAULT_ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER = false; public static final int DEFAULT_THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER = 6; public static final int DEFAULT_THREADS_FOR_FILE_STORAGE_UPLOAD = 7; @@ -211,6 +211,7 @@ public class ApplicationConfiguration { private Integer abortedOperationsTtlInSeconds; private Integer springSchedulerTaskExecutorThreads; private Integer filesAsyncUploadExecutorThreads; + private Integer deployFromUrlExecutorMaxThreads; private Boolean isOnStartFilesWithoutContentCleanerEnabledThroughEnvironment; private Integer threadsForFileUploadToController; private Integer threadsForFileStorageUpload; @@ -264,6 +265,7 @@ public void load() { getServiceHandlingMaxParallelThreads(); getAbortedOperationsTtlInSeconds(); getFilesAsyncUploadExecutorMaxThreads(); + getDeployFromUrlExecutorMaxThreads(); getObjectStoreRegions(); getIsReadinessHealthCheckEnabled(); } @@ -407,6 +409,13 @@ public Integer getFilesAsyncUploadExecutorMaxThreads() { return filesAsyncUploadExecutorThreads; } + public Integer getDeployFromUrlExecutorMaxThreads() { + if (deployFromUrlExecutorMaxThreads == null) { + deployFromUrlExecutorMaxThreads = getDeployFromUrlExecutorMaxThreadsFromEnvironment(); + } + return deployFromUrlExecutorMaxThreads; + } + public String getGlobalAuditorUser() { if (globalAuditorUser == null) { globalAuditorUser = getGlobalAuditorUserFromEnvironment(); @@ -832,6 +841,12 @@ private Integer getFilesAsyncUploadExecutorMaxThreadsFromEnvironment() { return value; } + private Integer getDeployFromUrlExecutorMaxThreadsFromEnvironment() { + Integer value = environment.getInteger(CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, DEFAULT_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS); + logEnvironmentVariable(CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, Messages.DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, value); + return value; + } + private String getGlobalAuditorUserFromEnvironment() { String value = environment.getString(CFG_GLOBAL_AUDITOR_USER); return value; diff --git a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/dto/AsyncUploadJobDto.java b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/dto/AsyncUploadJobDto.java index a33ce80ccd..8249c40e56 100644 --- a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/dto/AsyncUploadJobDto.java +++ b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/dto/AsyncUploadJobDto.java @@ -1,12 +1,10 @@ package org.cloudfoundry.multiapps.controller.persistence.dto; import java.time.LocalDateTime; - import jakarta.persistence.Column; import jakarta.persistence.Entity; import jakarta.persistence.Id; import jakarta.persistence.Table; - import org.cloudfoundry.multiapps.controller.persistence.model.PersistenceMetadata; @Entity @@ -31,6 +29,8 @@ private AttributeNames() { public static final String FILE_ID = "fileId"; public static final String ERROR = "error"; public static final String INSTANCE_INDEX = "instanceIndex"; + public static final String BYTES_READ = "bytesRead"; + public static final String UPDATED_AT = "updatedAt"; } @Id @@ -73,13 +73,19 @@ private AttributeNames() { @Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_INSTANCE_INDEX, nullable = false) private Integer instanceIndex; + @Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_BYTES_READ) + private Long bytesRead; + + @Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_UPDATED_AT) + private LocalDateTime updatedAt; + public AsyncUploadJobDto() { // Required by JPA } public AsyncUploadJobDto(String id, String mtaUser, String state, String url, LocalDateTime addedAt, LocalDateTime startedAt, LocalDateTime finishedAt, String namespace, String spaceGuid, String mtaId, String fileId, String error, - Integer instanceIndex) { + Integer instanceIndex, Long bytesRead, LocalDateTime updatedAt) { this.id = id; this.mtaUser = mtaUser; this.state = state; @@ -93,6 +99,8 @@ public AsyncUploadJobDto(String id, String mtaUser, String state, String url, Lo this.fileId = fileId; this.error = error; this.instanceIndex = instanceIndex; + this.bytesRead = bytesRead; + this.updatedAt = updatedAt; } @Override @@ -201,22 +209,40 @@ public void setInstanceIndex(Integer instanceIndex) { this.instanceIndex = instanceIndex; } + public Long getBytesRead() { + return bytesRead; + } + + public void setBytesRead(Long bytesRead) { + this.bytesRead = bytesRead; + } + + public LocalDateTime getUpdatedAt() { + return updatedAt; + } + + public void setUpdatedAt(LocalDateTime updatedAt) { + this.updatedAt = updatedAt; + } + @Override public String toString() { return "AsyncUploadJobDto{" + - "id='" + id + '\'' + - ", mtaUser='" + mtaUser + '\'' + - ", state='" + state + '\'' + - ", url='" + url + '\'' + - ", addedAt=" + addedAt + - ", startedAt=" + startedAt + - ", finishedAt=" + finishedAt + - ", namespace='" + namespace + '\'' + - ", spaceGuid='" + spaceGuid + '\'' + - ", mtaId='" + mtaId + '\'' + - ", fileId='" + fileId + '\'' + - ", error='" + error + '\'' + - ", instanceIndex=" + instanceIndex + - '}'; + "id='" + id + '\'' + + ", mtaUser='" + mtaUser + '\'' + + ", state='" + state + '\'' + + ", url='" + url + '\'' + + ", addedAt=" + addedAt + + ", startedAt=" + startedAt + + ", finishedAt=" + finishedAt + + ", namespace='" + namespace + '\'' + + ", spaceGuid='" + spaceGuid + '\'' + + ", mtaId='" + mtaId + '\'' + + ", fileId='" + fileId + '\'' + + ", error='" + error + '\'' + + ", instanceIndex=" + instanceIndex + + ", bytesRead=" + bytesRead + + ", updatedAt=" + updatedAt + + '}'; } } diff --git a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/AsyncUploadJobEntry.java b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/AsyncUploadJobEntry.java index cdd69325fa..16bd46b4f9 100644 --- a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/AsyncUploadJobEntry.java +++ b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/AsyncUploadJobEntry.java @@ -1,7 +1,6 @@ package org.cloudfoundry.multiapps.controller.persistence.model; import java.time.LocalDateTime; - import org.cloudfoundry.multiapps.common.Nullable; import org.immutables.value.Value; @@ -44,4 +43,10 @@ enum State { String getMtaId(); Integer getInstanceIndex(); + + @Nullable + Long getBytesRead(); + + @Nullable + LocalDateTime getUpdatedAt(); } diff --git a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/PersistenceMetadata.java b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/PersistenceMetadata.java index 0a4232c509..e5c17112a1 100644 --- a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/PersistenceMetadata.java +++ b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/model/PersistenceMetadata.java @@ -102,6 +102,8 @@ private TableColumnNames() { public static final String ASYNC_UPLOAD_JOB_FILE_ID = "file_id"; public static final String ASYNC_UPLOAD_JOB_ERROR = "error"; public static final String ASYNC_UPLOAD_JOB_INSTANCE_INDEX = "instance_index"; + public static final String ASYNC_UPLOAD_JOB_BYTES_READ = "bytes_read"; + public static final String ASYNC_UPLOAD_JOB_UPDATED_AT = "updated_at"; public static final String BACKUP_DESCRIPTOR_ID = "id"; public static final String BACKUP_DESCRIPTOR_DESCRIPTOR = "descriptor"; diff --git a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/AsyncUploadJobService.java b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/AsyncUploadJobService.java index ca22f51bca..2aadf0b9ef 100644 --- a/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/AsyncUploadJobService.java +++ b/multiapps-controller-persistence/src/main/java/org/cloudfoundry/multiapps/controller/persistence/services/AsyncUploadJobService.java @@ -3,7 +3,6 @@ import jakarta.inject.Inject; import jakarta.inject.Named; import jakarta.persistence.EntityManagerFactory; - import org.cloudfoundry.multiapps.common.ConflictException; import org.cloudfoundry.multiapps.common.NotFoundException; import org.cloudfoundry.multiapps.controller.persistence.Messages; @@ -58,6 +57,8 @@ public AsyncUploadJobEntry fromDto(AsyncUploadJobDto dto) { .fileId(dto.getFileId()) .error(dto.getError()) .instanceIndex(dto.getInstanceIndex()) + .bytesRead(dto.getBytesRead()) + .updatedAt(dto.getUpdatedAt()) .build(); } @@ -76,7 +77,9 @@ public AsyncUploadJobDto toDto(AsyncUploadJobEntry entry) { entry.getMtaId(), entry.getFileId(), entry.getError(), - entry.getInstanceIndex()); + entry.getInstanceIndex(), + entry.getBytesRead(), + entry.getUpdatedAt()); } } diff --git a/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-2.35.0-persistence.xml b/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-2.35.0-persistence.xml new file mode 100644 index 0000000000..d60f048aca --- /dev/null +++ b/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-2.35.0-persistence.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + + diff --git a/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog.xml b/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog.xml index 08541fc2a2..a5da6591b7 100644 --- a/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog.xml +++ b/multiapps-controller-persistence/src/main/resources/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog.xml @@ -37,4 +37,7 @@ + + + diff --git a/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/configuration/FileUploadThreadPoolConfiguration.java b/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/configuration/FileUploadThreadPoolConfiguration.java index 380f72b4e7..a66a473101 100644 --- a/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/configuration/FileUploadThreadPoolConfiguration.java +++ b/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/configuration/FileUploadThreadPoolConfiguration.java @@ -8,7 +8,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import jakarta.inject.Inject; import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration; import org.cloudfoundry.multiapps.controller.process.util.PriorityCallable; import org.cloudfoundry.multiapps.controller.process.util.PriorityFuture; @@ -16,8 +16,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import jakarta.inject.Inject; - @Configuration public class FileUploadThreadPoolConfiguration { @@ -75,4 +73,17 @@ public ExecutorService asyncFileUploadExecutor(LinkedBlockingQueue fil fileUploadFromUrlQueue); } + @Bean(name = "deployFromUrlExecutor") + public ExecutorService deployFromUrlExecutor() { + return new ThreadPoolExecutor(5, + // The max thread count should match the maximum capacity of asyncFileUploadExecutor (queue size + max threads). + // A lower value may cause unnecessary task rejections. + // A higher value may cause job failures when asyncFileUploadExecutor becomes full. + applicationConfiguration.getDeployFromUrlExecutorMaxThreads(), + // As the threads are only updating a row and waiting it is ok to have more threads + 30, + TimeUnit.SECONDS, + new SynchronousQueue<>()); // A synchronous queue is used so deploy from url jobs immediately start + // a new thread that updates the database job entry + } } diff --git a/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/stream/CountingInputStream.java b/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/stream/CountingInputStream.java new file mode 100644 index 0000000000..10fd1a9095 --- /dev/null +++ b/multiapps-controller-process/src/main/java/org/cloudfoundry/multiapps/controller/process/stream/CountingInputStream.java @@ -0,0 +1,23 @@ +package org.cloudfoundry.multiapps.controller.process.stream; + +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.input.ProxyInputStream; + +public class CountingInputStream extends ProxyInputStream { + + private final AtomicLong bytes; + + public CountingInputStream(InputStream proxy, AtomicLong counterRef) { + super(proxy); + bytes = counterRef; + } + + @Override + protected void afterRead(int n) { + if (n > 0) { + bytes.addAndGet(n); + } + } + +} diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java index 748a134eaa..4dbc9507c9 100644 --- a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java +++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/Messages.java @@ -24,6 +24,7 @@ public final class Messages { public static final String NO_VALID_OBJECT_STORE_CONFIGURATION_FOUND = "No valid Object Store configuration found!"; public static final String MISSING_PROPERTIES_FOR_CREATING_THE_SPECIFIC_PROVIDER = "Missing properties for creating the specific provider!"; public static final String DEPLOY_FROM_URL_WRONG_CREDENTIALS = "Credentials to {0} are wrong. Make sure that they are correct."; + public static final String JOB_NOT_UPDATED_FOR_0_SECONDS = "Job not updated for {0} seconds"; public static final String FAILED_TO_CREATE_BLOB_STORE_CONTEXT = "Failed to create BlobStoreContext"; @@ -54,13 +55,8 @@ public final class Messages { public static final String CLEARING_FLOWABLE_LOCK_OWNER_THREW_AN_EXCEPTION_0 = "Clearing Flowable lock owner on JVM shutdown threw an exception: {0}"; public static final String FETCHING_FILE_FAILED = "Fetching file {0} in space {1} failed with: {2}"; public static final String ASYNC_UPLOAD_JOB_FAILED = "Async upload job {0} failed with: {1}"; - public static final String JOB_0_WAS_NOT_FOUND_IN_THE_RUNNING_TASKS = "Job \"{0}\" was not found in the running tasks"; - public static final String JOB_IS_NOT_BEING_EXECUTED = "Job is not being executed"; - public static final String JOB_0_EXISTS_IN_STATE_1_BUT_DOES_NOT_EXISTS_IN_THE_RUNNING_TASKS = "Job \"{0}\" exists in state \"{1}\" but does not exists in the running tasks"; - public static final String JOB_THREAD_IS_NOT_RUNNING_BUT_STATE_IS_STILL_IN_PROGRESS_UPLOAD_FAILED = "Job thread is not running but state is still in progress. Upload failed"; // WARN log messages - public static final String THE_JOB_EXISTS_BUT_IT_IS_NOT_RUNNING_DELETING = "The job exists but it is not running. Deleting"; // INFO log messages public static final String ALM_SERVICE_ENV_INITIALIZED = "Deploy service environment initialized"; @@ -72,6 +68,7 @@ public final class Messages { public static final String CLEARING_LOCK_OWNER = "Clearing lock owner {0}..."; public static final String CLEARED_LOCK_OWNER = "Cleared lock owner {0}"; public static final String OBJECT_STORE_WITH_PROVIDER_0_CREATED = "Object store with provider: {0} created"; + public static final String JOB_WITH_ID_WAS_NOT_UPDATED_WITHIN_SECONDS = "Job with ID: {} was not updated within: {} seconds"; // DEBUG log messages public static final String RECEIVED_UPLOAD_REQUEST = "Received upload request on URI: {}"; @@ -79,7 +76,7 @@ public final class Messages { public static final String UPLOADED_FILE = "Uploaded file \"{}\" with name {}, size {}, space {}, and digest {} (algorithm {}) for {} ms."; public static final String ASYNC_UPLOAD_JOB_EXISTS = "Async upload job for URL {} exists: {}"; public static final String CREATING_ASYNC_UPLOAD_JOB = "Creating async upload job for URL {} with ID: {}"; - public static final String ASYNC_UPLOAD_JOB_REJECTED = "Async upload job {} rejected. Deleting entry"; + public static final String ASYNC_UPLOAD_JOB_REJECTED = "Async upload job with space guid: {}, namespace: {}, URL: {} rejected."; public static final String STARTING_DOWNLOAD_OF_MTAR = "Starting download of MTAR from remote endpoint: {}"; public static final String UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID = "Uploaded MTAR from remote endpoint {} with job id: {} in {} ms"; public static final String ASYNC_UPLOAD_JOB_FINISHED = "Async upload job {} finished"; diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java index ef93af4f52..42a7eeb070 100644 --- a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java +++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImpl.java @@ -1,33 +1,17 @@ package org.cloudfoundry.multiapps.controller.web.api.impl; import java.io.BufferedInputStream; -import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; -import java.net.URI; -import java.net.URLDecoder; -import java.net.http.HttpClient; -import java.net.http.HttpClient.Redirect; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.net.http.HttpResponse.BodyHandlers; -import java.nio.charset.StandardCharsets; import java.text.MessageFormat; -import java.time.Duration; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; -import java.util.Arrays; import java.util.Base64; import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.io.input.ProxyInputStream; +import jakarta.inject.Inject; +import jakarta.inject.Named; import org.cloudfoundry.multiapps.common.SLException; import org.cloudfoundry.multiapps.controller.api.FilesApiService; import org.cloudfoundry.multiapps.controller.api.model.AsyncUploadResult; @@ -36,18 +20,13 @@ import org.cloudfoundry.multiapps.controller.api.model.ImmutableAsyncUploadResult; import org.cloudfoundry.multiapps.controller.api.model.ImmutableFileMetadata; import org.cloudfoundry.multiapps.controller.api.model.UserCredentials; -import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier; -import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor; import org.cloudfoundry.multiapps.controller.core.auditlogging.FilesApiServiceAuditLog; import org.cloudfoundry.multiapps.controller.core.helpers.DescriptorParserFacadeFactory; -import org.cloudfoundry.multiapps.controller.core.model.CachedMap; import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration; -import org.cloudfoundry.multiapps.controller.core.util.FileUtils; import org.cloudfoundry.multiapps.controller.core.util.UriUtil; import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry; import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry.State; import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry; -import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableAsyncUploadJobEntry; import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry; import org.cloudfoundry.multiapps.controller.persistence.services.AsyncUploadJobService; import org.cloudfoundry.multiapps.controller.persistence.services.FileService; @@ -56,11 +35,10 @@ import org.cloudfoundry.multiapps.controller.process.util.PriorityFuture; import org.cloudfoundry.multiapps.controller.web.Constants; import org.cloudfoundry.multiapps.controller.web.Messages; +import org.cloudfoundry.multiapps.controller.web.upload.AsyncUploadJobOrchestrator; +import org.cloudfoundry.multiapps.controller.web.upload.exception.RejectedAsyncUploadJobException; import org.cloudfoundry.multiapps.controller.web.util.SecurityContextUtil; import org.cloudfoundry.multiapps.controller.web.util.ServletUtil; -import org.cloudfoundry.multiapps.mta.handlers.ArchiveHandler; -import org.cloudfoundry.multiapps.mta.handlers.DescriptorParserFacade; -import org.cloudfoundry.multiapps.mta.model.DeploymentDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; @@ -69,25 +47,18 @@ import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartHttpServletRequest; -import jakarta.inject.Inject; -import jakarta.inject.Named; - @Named public class FilesApiServiceImpl implements FilesApiService { private static final Logger LOGGER = LoggerFactory.getLogger(FilesApiServiceImpl.class); - private static final int ERROR_RESPONSE_BODY_MAX_LENGTH = 4 * 1024; - private static final int INPUT_STREAM_BUFFER_SIZE = 16 * 1024; - private static final Duration HTTP_CONNECT_TIMEOUT = Duration.ofMinutes(10); private static final String RETRY_AFTER_SECONDS = "30"; - private static final String USERNAME_PASSWORD_URL_FORMAT = "{0}:{1}"; + private static final int INPUT_STREAM_BUFFER_SIZE = 16 * 1024; + private static final int UPDATE_JOB_TIMEOUT = 30; + static { System.setProperty(Constants.RETRY_LIMIT_PROPERTY, "0"); } - private final CachedMap jobCounters = new CachedMap<>(Duration.ofHours(1)); - private final CachedMap> runningTasks = new CachedMap<>(Duration.ofHours(1)); - private final ResilientOperationExecutor resilientOperationExecutor = getResilientOperationExecutor(); @Inject @Named("fileService") private FileService fileService; @@ -98,11 +69,10 @@ public class FilesApiServiceImpl implements FilesApiService { @Inject private AsyncUploadJobService uploadJobService; @Inject - @Named("asyncFileUploadExecutor") - private ExecutorService deployFromUrlExecutor; - @Inject private FilesApiServiceAuditLog filesApiServiceAuditLog; @Inject + private AsyncUploadJobOrchestrator asyncUploadJobOrchestrator; + @Inject private ExecutorService fileStorageThreadPool; @Override @@ -148,19 +118,23 @@ public ResponseEntity startUploadFromUrl(String spaceGuid, String namespac LOGGER.trace(Messages.RECEIVED_UPLOAD_FROM_URL_REQUEST, urlWithoutUserInfo); filesApiServiceAuditLog.logStartUploadFromUrl(SecurityContextUtil.getUsername(), spaceGuid, decodedUrl); var existingJob = getExistingJob(spaceGuid, namespace, urlWithoutUserInfo); - if (existingJob != null) { - if (runningTasks.get(existingJob.getId()) != null) { - LOGGER.info(Messages.ASYNC_UPLOAD_JOB_EXISTS, urlWithoutUserInfo, existingJob); - return ResponseEntity.status(HttpStatus.SEE_OTHER) - .header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, existingJob.getId())) - .header("x-cf-app-instance", configuration.getApplicationGuid() + ":" + existingJob.getInstanceIndex()) - .build(); - } else { - LOGGER.warn(Messages.THE_JOB_EXISTS_BUT_IT_IS_NOT_RUNNING_DELETING); - deleteAsyncJobEntry(existingJob); - } + if (existingJob == null) { + return triggerUploadFromUrl(spaceGuid, namespace, urlWithoutUserInfo, decodedUrl, fileUrl.getUserCredentials()); + } + if (hasJobStuck(existingJob)) { + deleteAsyncJobEntry(existingJob); + return triggerUploadFromUrl(spaceGuid, namespace, urlWithoutUserInfo, decodedUrl, fileUrl.getUserCredentials()); } - return triggerUploadFromUrl(spaceGuid, namespace, urlWithoutUserInfo, decodedUrl, fileUrl.getUserCredentials()); + LOGGER.info(Messages.ASYNC_UPLOAD_JOB_EXISTS, urlWithoutUserInfo, existingJob); + return ResponseEntity.status(HttpStatus.SEE_OTHER) + .header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, existingJob.getId())) + .build(); + } + + private boolean hasJobStuck(AsyncUploadJobEntry existingJob) { + return existingJob.getUpdatedAt() + .isBefore(LocalDateTime.now() + .minusSeconds(UPDATE_JOB_TIMEOUT)); } private String getLocationHeader(String spaceGuid, String jobId) { @@ -175,51 +149,37 @@ public ResponseEntity getUploadFromUrlJob(String spaceGuid, S return ResponseEntity.notFound() .build(); } - return getAsyncUploadResult(spaceGuid, namespace, job); + return getAsyncUploadResult(job); } - private ResponseEntity getAsyncUploadResult(String spaceGuid, String namespace, AsyncUploadJobEntry job) { + private ResponseEntity getAsyncUploadResult(AsyncUploadJobEntry job) { if (job.getState() == State.RUNNING || job.getState() == State.INITIAL) { - Future futureTask = runningTasks.get(job.getId()); - if (futureTask == null) { - LOGGER.error(MessageFormat.format(Messages.JOB_0_WAS_NOT_FOUND_IN_THE_RUNNING_TASKS, job.getId())); - return ResponseEntity.ok(createErrorResult(Messages.JOB_IS_NOT_BEING_EXECUTED, - AsyncUploadResult.ClientAction.RETRY_UPLOAD)); - } - if (!futureTask.isDone()) { - var count = jobCounters.getOrDefault(job.getId(), new AtomicLong(-1)); - return ResponseEntity.ok(ImmutableAsyncUploadResult.builder() - .status(AsyncUploadResult.JobStatus.RUNNING) - .bytes(count.get()) - .build()); - } - var jobWithLatestState = getJob(job.getId(), spaceGuid, namespace); - if (jobWithLatestState.getState() == State.RUNNING || jobWithLatestState.getState() == State.INITIAL) { - LOGGER.error(MessageFormat.format(Messages.JOB_0_EXISTS_IN_STATE_1_BUT_DOES_NOT_EXISTS_IN_THE_RUNNING_TASKS, job.getId(), - jobWithLatestState.getState())); - return ResponseEntity.ok(createErrorResult(Messages.JOB_THREAD_IS_NOT_RUNNING_BUT_STATE_IS_STILL_IN_PROGRESS_UPLOAD_FAILED, - AsyncUploadResult.ClientAction.RETRY_UPLOAD)); + if (hasJobStuck(job)) { + LOGGER.info(Messages.JOB_WITH_ID_WAS_NOT_UPDATED_WITHIN_SECONDS, job.getId(), UPDATE_JOB_TIMEOUT); + return ResponseEntity.ok( + createErrorResult(MessageFormat.format(Messages.JOB_NOT_UPDATED_FOR_0_SECONDS, UPDATE_JOB_TIMEOUT), + AsyncUploadResult.ClientAction.RETRY_UPLOAD)); } + return ResponseEntity.ok(ImmutableAsyncUploadResult.builder() + .status(AsyncUploadResult.JobStatus.RUNNING) + .bytes(job.getBytesRead()) + .build()); } if (job.getState() == State.ERROR) { - jobCounters.remove(job.getId()); - runningTasks.remove(job.getId()); return ResponseEntity.ok(createErrorResult(job.getError())); } - return addFileEntryToAsyncUploadResult(spaceGuid, job); + return addFileEntryToAsyncUploadResult(job); } - private ResponseEntity addFileEntryToAsyncUploadResult(String spaceGuid, AsyncUploadJobEntry job) { + private ResponseEntity addFileEntryToAsyncUploadResult(AsyncUploadJobEntry job) { FileEntry fileEntry; try { - fileEntry = fileService.getFile(spaceGuid, job.getFileId()); + fileEntry = fileService.getFile(job.getSpaceGuid(), job.getFileId()); } catch (FileStorageException e) { - LOGGER.error(MessageFormat.format(Messages.FETCHING_FILE_FAILED, job.getFileId(), spaceGuid, e.getMessage()), e); + LOGGER.error(MessageFormat.format(Messages.FETCHING_FILE_FAILED, job.getFileId(), job.getSpaceGuid(), e.getMessage()), e); return ResponseEntity.ok(createErrorResult(e.getMessage())); } FileMetadata file = parseFileEntry(fileEntry); - jobCounters.remove(job.getId()); - runningTasks.remove(job.getId()); return ResponseEntity.status(HttpStatus.CREATED) .body(ImmutableAsyncUploadResult.builder() .status(AsyncUploadResult.JobStatus.FINISHED) @@ -254,10 +214,6 @@ private FileEntry doUploadFile(String spaceGuid, String namespace, MultipartFile in); } - protected ResilientOperationExecutor getResilientOperationExecutor() { - return new ResilientOperationExecutor(); - } - private FileMetadata parseFileEntry(FileEntry fileEntry) { return ImmutableFileMetadata.builder() .id(fileEntry.getId()) @@ -276,7 +232,6 @@ private AsyncUploadJobEntry getExistingJob(String spaceGuid, String namespace, S .user(SecurityContextUtil.getUsername()) .namespace(namespace) .url(url) - .instanceIndex(configuration.getApplicationInstanceIndex()) .withoutFinishedAt() .withStateAnyOf(State.INITIAL, State.RUNNING) .list(); @@ -295,39 +250,21 @@ private void deleteAsyncJobEntry(AsyncUploadJobEntry entry) { private ResponseEntity triggerUploadFromUrl(String spaceGuid, String namespace, String urlWithoutUserInfo, String decodedUrl, UserCredentials userCredentials) { - var entry = createJobEntry(spaceGuid, namespace, urlWithoutUserInfo); - LOGGER.debug(Messages.CREATING_ASYNC_UPLOAD_JOB, urlWithoutUserInfo, entry.getId()); - uploadJobService.add(entry); try { - Future runningTask = deployFromUrlExecutor.submit(() -> uploadFileFromUrl(entry, spaceGuid, namespace, decodedUrl, - userCredentials)); - runningTasks.put(entry.getId(), runningTask); - } catch (RejectedExecutionException ignored) { - LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_REJECTED, entry.getId()); - deleteAsyncJobEntry(entry); + AsyncUploadJobEntry entry = asyncUploadJobOrchestrator.executeUploadFromUrl(spaceGuid, namespace, urlWithoutUserInfo, + decodedUrl, userCredentials); + return ResponseEntity.accepted() + .header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, entry.getId())) + .build(); + } catch (RejectedAsyncUploadJobException rejectedJobException) { + LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_REJECTED, spaceGuid, namespace, urlWithoutUserInfo); + if (rejectedJobException.getAsyncUploadJobEntry() != null) { + deleteAsyncJobEntry(rejectedJobException.getAsyncUploadJobEntry()); + } return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS) .header(HttpHeaders.RETRY_AFTER, RETRY_AFTER_SECONDS) .build(); } - return ResponseEntity.accepted() - .header("x-cf-app-instance", - configuration.getApplicationGuid() + ":" + configuration.getApplicationInstanceIndex()) - .header(HttpHeaders.LOCATION, getLocationHeader(spaceGuid, entry.getId())) - .build(); - } - - private AsyncUploadJobEntry createJobEntry(String spaceGuid, String namespace, String url) { - return ImmutableAsyncUploadJobEntry.builder() - .id(UUID.randomUUID() - .toString()) - .user(SecurityContextUtil.getUsername()) - .addedAt(LocalDateTime.now()) - .spaceGuid(spaceGuid) - .namespace(namespace) - .instanceIndex(configuration.getApplicationInstanceIndex()) - .url(url) - .state(State.INITIAL) - .build(); } private AsyncUploadJobEntry getJob(String id, String spaceGuid, String namespace) { @@ -338,7 +275,6 @@ private AsyncUploadJobEntry getJob(String id, String spaceGuid, String namespace .spaceGuid(spaceGuid) .user(SecurityContextUtil.getUsername()) .namespace(namespace) - .instanceIndex(configuration.getApplicationInstanceIndex()) .list(); return jobs.isEmpty() ? null : jobs.get(0); } @@ -351,182 +287,4 @@ private AsyncUploadResult createErrorResult(String error, AsyncUploadResult.Clie .build(); } - private void uploadFileFromUrl(AsyncUploadJobEntry jobEntry, String spaceGuid, String namespace, String fileUrl, - UserCredentials userCredentials) { - var counter = new AtomicLong(0); - jobCounters.put(jobEntry.getId(), counter); - LOGGER.info(Messages.STARTING_DOWNLOAD_OF_MTAR, jobEntry.getUrl()); - var startTime = LocalDateTime.now(); - AsyncUploadJobEntry jobEntryWithStartTime = ImmutableAsyncUploadJobEntry.copyOf(jobEntry) - .withState(State.RUNNING) - .withStartedAt(startTime); - try { - jobEntryWithStartTime = uploadJobService.update(jobEntry, jobEntryWithStartTime); - FileEntry fileEntry = resilientOperationExecutor.execute((CheckedSupplier) () -> doUploadFileFromUrl(spaceGuid, - namespace, - fileUrl, - counter, - userCredentials)); - LOGGER.trace(Messages.UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID, jobEntry.getUrl(), jobEntry.getId(), - ChronoUnit.MILLIS.between(startTime, LocalDateTime.now())); - var descriptor = fileService.processFileContent(spaceGuid, fileEntry.getId(), this::extractDeploymentDescriptor); - LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_FINISHED, jobEntry.getId()); - uploadJobService.update(jobEntryWithStartTime, ImmutableAsyncUploadJobEntry.copyOf(jobEntryWithStartTime) - .withFileId(fileEntry.getId()) - .withMtaId(descriptor.getId()) - .withFinishedAt(LocalDateTime.now()) - .withState(State.FINISHED)); - } catch (Exception e) { - LOGGER.error(MessageFormat.format(Messages.ASYNC_UPLOAD_JOB_FAILED, jobEntry.getId(), e.getMessage()), e); - uploadJobService.update(jobEntryWithStartTime, ImmutableAsyncUploadJobEntry.copyOf(jobEntryWithStartTime) - .withError(e.getMessage()) - .withState(State.ERROR)); - } - } - - private FileEntry doUploadFileFromUrl(String spaceGuid, String namespace, String fileUrl, AtomicLong counter, - UserCredentials userCredentials) - throws Exception { - if (!UriUtil.isUrlSecure(fileUrl)) { - throw new SLException(Messages.MTAR_ENDPOINT_NOT_SECURE); - } - UriUtil.validateUrl(fileUrl); - HttpClient client = buildHttpClient(fileUrl); - - HttpResponse response = callRemoteEndpointWithRetry(client, fileUrl, userCredentials); - long fileSize = response.headers() - .firstValueAsLong(Constants.CONTENT_LENGTH) - .orElseThrow(() -> new SLException(Messages.FILE_URL_RESPONSE_DID_NOT_RETURN_CONTENT_LENGTH)); - - long maxUploadSize = configuration.getMaxUploadSize(); - if (fileSize > maxUploadSize) { - throw new SLException(MessageFormat.format(Messages.MAX_UPLOAD_SIZE_EXCEEDED, maxUploadSize)); - } - - String fileName = extractFileName(fileUrl); - FileUtils.validateFileHasExtension(fileName); - resetCounterOnRetry(counter); - // Normal stream returned from the http response always returns 0 when InputStream::available() is executed which seems to break - // JClods library: https://issues.apache.org/jira/browse/JCLOUDS-1623 - try (CountingInputStream source = new CountingInputStream(response.body(), counter); - BufferedInputStream bufferedContent = new BufferedInputStream(source, INPUT_STREAM_BUFFER_SIZE)) { - LOGGER.debug(Messages.UPLOADING_MTAR_STREAM_FROM_REMOTE_ENDPOINT, response.uri()); - return fileService.addFile(ImmutableFileEntry.builder() - .space(spaceGuid) - .namespace(namespace) - .name(fileName) - .size(BigInteger.valueOf(fileSize)) - .build(), - bufferedContent); - } - } - - public HttpResponse callRemoteEndpointWithRetry(HttpClient client, String decodedUrl, UserCredentials userCredentials) - throws Exception { - return resilientOperationExecutor.execute((CheckedSupplier>) () -> { - var request = buildFetchFileRequest(decodedUrl, userCredentials); - LOGGER.debug(Messages.CALLING_REMOTE_MTAR_ENDPOINT, getMaskedUri(urlDecodeUrl(decodedUrl))); - var response = client.send(request, BodyHandlers.ofInputStream()); - if (response.statusCode() / 100 != 2) { - String error = readErrorBodyFromResponse(response); - LOGGER.error(error); - if (response.statusCode() == HttpStatus.UNAUTHORIZED.value()) { - String errorMessage = MessageFormat.format(Messages.DEPLOY_FROM_URL_WRONG_CREDENTIALS, - UriUtil.stripUserInfo(decodedUrl)); - throw new SLException(errorMessage); - } - throw new SLException(MessageFormat.format(Messages.ERROR_FROM_REMOTE_MTAR_ENDPOINT, getMaskedUri(urlDecodeUrl(decodedUrl)), - response.statusCode(), error)); - } - return response; - }); - } - - private String getMaskedUri(String url) { - if (url.contains("@")) { - return url.substring(url.lastIndexOf("@")) - .replace("@", "..."); - } else { - return url; - } - } - - private String urlDecodeUrl(String url) { - return URLDecoder.decode(url, StandardCharsets.UTF_8); - } - - private void resetCounterOnRetry(AtomicLong counter) { - counter.set(0); - } - - protected HttpClient buildHttpClient(String decodedUrl) { - return HttpClient.newBuilder() - .version(HttpClient.Version.HTTP_2) - .connectTimeout(HTTP_CONNECT_TIMEOUT) - .followRedirects(Redirect.NORMAL) - .build(); - } - - private HttpRequest buildFetchFileRequest(String decodedUrl, UserCredentials userCredentials) { - var builder = HttpRequest.newBuilder() - .GET() - .timeout(Duration.ofMinutes(15)); - var uri = URI.create(decodedUrl); - var userInfo = uri.getUserInfo(); - if (userCredentials != null) { - builder.uri(uri); - String userCredentialsUrlFormat = MessageFormat.format(USERNAME_PASSWORD_URL_FORMAT, userCredentials.getUsername(), - userCredentials.getPassword()); - String encodedAuth = Base64.getEncoder() - .encodeToString(userCredentialsUrlFormat.getBytes()); - builder.header(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth); - } else if (userInfo != null) { - builder.uri(URI.create(decodedUrl.replace(uri.getRawUserInfo() + "@", ""))); - String encodedAuth = Base64.getEncoder() - .encodeToString(userInfo.getBytes()); - builder.header(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth); - } else { - builder.uri(uri); - } - return builder.build(); - } - - private String readErrorBodyFromResponse(HttpResponse response) throws IOException { - try (InputStream is = response.body()) { - byte[] buffer = new byte[ERROR_RESPONSE_BODY_MAX_LENGTH]; - int read = IOUtils.read(is, buffer); - return new String(Arrays.copyOf(buffer, read)); - } - } - - private String extractFileName(String url) { - String path = URI.create(url) - .getPath(); - if (path.indexOf('/') == -1) { - return path; - } - String[] pathFragments = path.split("/"); - return pathFragments[pathFragments.length - 1]; - } - - private DeploymentDescriptor extractDeploymentDescriptor(InputStream appArchiveStream) { - String descriptorString = ArchiveHandler.getDescriptor(appArchiveStream, configuration.getMaxMtaDescriptorSize()); - DescriptorParserFacade descriptorParserFacade = descriptorParserFactory.getInstance(); - return descriptorParserFacade.parseDeploymentDescriptor(descriptorString); - } - - private static class CountingInputStream extends ProxyInputStream { - private final AtomicLong bytes; - - public CountingInputStream(InputStream proxy, AtomicLong counterRef) { - super(proxy); - bytes = counterRef; - } - - @Override - protected void afterRead(int n) { - bytes.addAndGet(n); - } - } - } diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestrator.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestrator.java new file mode 100644 index 0000000000..22dd65834f --- /dev/null +++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestrator.java @@ -0,0 +1,275 @@ +package org.cloudfoundry.multiapps.controller.web.upload; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.URI; +import java.text.MessageFormat; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import org.cloudfoundry.multiapps.common.SLException; +import org.cloudfoundry.multiapps.common.util.MiscUtil; +import org.cloudfoundry.multiapps.controller.api.model.UserCredentials; +import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier; +import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor; +import org.cloudfoundry.multiapps.controller.core.helpers.DescriptorParserFacadeFactory; +import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration; +import org.cloudfoundry.multiapps.controller.core.util.FileUtils; +import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry; +import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry; +import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableAsyncUploadJobEntry; +import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry; +import org.cloudfoundry.multiapps.controller.persistence.services.AsyncUploadJobService; +import org.cloudfoundry.multiapps.controller.persistence.services.FileService; +import org.cloudfoundry.multiapps.controller.process.stream.CountingInputStream; +import org.cloudfoundry.multiapps.controller.web.Messages; +import org.cloudfoundry.multiapps.controller.web.upload.client.DeployFromUrlRemoteClient; +import org.cloudfoundry.multiapps.controller.web.upload.client.FileFromUrlData; +import org.cloudfoundry.multiapps.controller.web.upload.exception.RejectedAsyncUploadJobException; +import org.cloudfoundry.multiapps.controller.web.util.SecurityContextUtil; +import org.cloudfoundry.multiapps.mta.handlers.ArchiveHandler; +import org.cloudfoundry.multiapps.mta.handlers.DescriptorParserFacade; +import org.cloudfoundry.multiapps.mta.model.DeploymentDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Named +public class AsyncUploadJobOrchestrator { + + private static final int INPUT_STREAM_BUFFER_SIZE = 16 * 1024; + + private static final long WAIT_TIME_BETWEEN_ASYNC_JOB_UPDATES_IN_MILLIS = Duration.ofSeconds(3) + .toMillis(); + + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncUploadJobOrchestrator.class); + + private final ResilientOperationExecutor resilientOperationExecutor = getResilientOperationExecutor(); + + private final ExecutorService asyncFileUploadExecutor; + private final ExecutorService deployFromUrlExecutor; + private final ApplicationConfiguration applicationConfiguration; + private final AsyncUploadJobService asyncUploadJobService; + private final FileService fileService; + private final DescriptorParserFacadeFactory descriptorParserFactory; + private final DeployFromUrlRemoteClient deployFromUrlRemoteClient; + + @Inject + public AsyncUploadJobOrchestrator(ExecutorService asyncFileUploadExecutor, ExecutorService deployFromUrlExecutor, + ApplicationConfiguration applicationConfiguration, AsyncUploadJobService asyncUploadJobService, + FileService fileService, DescriptorParserFacadeFactory descriptorParserFactory, + DeployFromUrlRemoteClient deployFromUrlRemoteClient) { + this.asyncFileUploadExecutor = asyncFileUploadExecutor; + this.deployFromUrlExecutor = deployFromUrlExecutor; + this.applicationConfiguration = applicationConfiguration; + this.asyncUploadJobService = asyncUploadJobService; + this.fileService = fileService; + this.descriptorParserFactory = descriptorParserFactory; + this.deployFromUrlRemoteClient = deployFromUrlRemoteClient; + } + + public AsyncUploadJobEntry executeUploadFromUrl(String spaceGuid, String namespace, String urlWithoutUserInfo, String decodedUrl, + UserCredentials userCredentials) { + var entry = createJobEntry(spaceGuid, namespace, urlWithoutUserInfo); + LOGGER.info(Messages.CREATING_ASYNC_UPLOAD_JOB, urlWithoutUserInfo, entry.getId()); + asyncUploadJobService.add(entry); + try { + deployFromUrlExecutor.submit(() -> deployFromUrl(entry, decodedUrl, userCredentials)); + } catch (RejectedExecutionException ignored) { + throw new RejectedAsyncUploadJobException(entry); + } + return entry; + } + + private AsyncUploadJobEntry createJobEntry(String spaceGuid, String namespace, String url) { + return ImmutableAsyncUploadJobEntry.builder() + .id(UUID.randomUUID() + .toString()) + .user(SecurityContextUtil.getUsername()) + .addedAt(LocalDateTime.now()) + .spaceGuid(spaceGuid) + .namespace(namespace) + .instanceIndex(applicationConfiguration.getApplicationInstanceIndex()) + .url(url) + .state(AsyncUploadJobEntry.State.INITIAL) + .updatedAt(LocalDateTime.now()) + .bytesRead(0L) + .build(); + } + + private void deployFromUrl(AsyncUploadJobEntry jobEntry, String fileUrl, UserCredentials userCredentials) { + LOGGER.info(Messages.STARTING_DOWNLOAD_OF_MTAR, jobEntry.getUrl()); + var startTime = LocalDateTime.now(); + Lock lock = new ReentrantLock(); + AtomicLong counterRef = new AtomicLong(); + try { + var updatedJobEntry = asyncUploadJobService.update(jobEntry, ImmutableAsyncUploadJobEntry.copyOf(jobEntry) + .withState( + AsyncUploadJobEntry.State.RUNNING) + .withUpdatedAt(LocalDateTime.now()) + .withStartedAt(startTime)); + startAsyncUploadFromUrlUpload(ImmutableUploadFromUrlContext.builder() + .jobEntry(updatedJobEntry) + .fileUrl(fileUrl) + .userCredentials(userCredentials) + .counterRef(counterRef) + .startTime(startTime) + .build(), lock); + updatedJobEntry = asyncUploadJobService.createQuery() + .id(updatedJobEntry.getId()) + .singleResult(); + monitorAsyncUploadJob(updatedJobEntry, lock, counterRef); + } catch (Exception e) { + LOGGER.error(MessageFormat.format(Messages.ASYNC_UPLOAD_JOB_FAILED, jobEntry.getId(), e.getMessage()), e); + updateFailedAsyncUploadJob(jobEntry, e, lock); + } + } + + private void startAsyncUploadFromUrlUpload(UploadFromUrlContext uploadFromUrlContext, Lock lock) { + asyncFileUploadExecutor.submit(() -> { + try { + startSyncUploadFromUrlUpload(uploadFromUrlContext, lock); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + updateFailedAsyncUploadJob(uploadFromUrlContext.getJobEntry(), e, lock); + throw new SLException(e, e.getMessage()); + } + }); + } + + private void startSyncUploadFromUrlUpload(UploadFromUrlContext uploadFromUrlContext, Lock lock) throws Exception { + FileEntry fileEntry = resilientOperationExecutor.execute( + (CheckedSupplier) () -> doUploadMtarFromUrl(uploadFromUrlContext, lock)); + LOGGER.trace(Messages.UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID, uploadFromUrlContext.getJobEntry() + .getUrl(), + uploadFromUrlContext.getJobEntry() + .getId(), ChronoUnit.MILLIS.between(uploadFromUrlContext.getStartTime(), LocalDateTime.now())); + var descriptor = fileService.processFileContent(uploadFromUrlContext.getJobEntry() + .getSpaceGuid(), fileEntry.getId(), + this::extractDeploymentDescriptor); + LOGGER.debug(Messages.ASYNC_UPLOAD_JOB_FINISHED, uploadFromUrlContext.getJobEntry() + .getId()); + try { + lock.lock(); + var uploadedEntry = asyncUploadJobService.createQuery() + .id(uploadFromUrlContext.getJobEntry() + .getId()) + .singleResult(); + asyncUploadJobService.update(uploadedEntry, ImmutableAsyncUploadJobEntry.copyOf(uploadedEntry) + .withFileId(fileEntry.getId()) + .withMtaId(descriptor.getId()) + .withUpdatedAt(LocalDateTime.now()) + .withFinishedAt(LocalDateTime.now()) + .withBytesRead(uploadFromUrlContext.getCounterRef() + .get()) + .withState(AsyncUploadJobEntry.State.FINISHED)); + } finally { + lock.unlock(); + } + } + + private FileEntry doUploadMtarFromUrl(UploadFromUrlContext uploadFromUrlContext, Lock lock) throws Exception { + FileFromUrlData fileFromUrlData = deployFromUrlRemoteClient.downloadFileFromUrl(uploadFromUrlContext); + String fileName = extractFileName(uploadFromUrlContext.getFileUrl()); + FileUtils.validateFileHasExtension(fileName); + resetCounterOnRetry(uploadFromUrlContext, lock); + // Normal stream returned from the http response always returns 0 when InputStream::available() is executed which seems to break + // JClods library: https://issues.apache.org/jira/browse/JCLOUDS-1623 + try (CountingInputStream source = new CountingInputStream(fileFromUrlData.fileInputStream(), uploadFromUrlContext.getCounterRef()); + BufferedInputStream bufferedContent = new BufferedInputStream(source, INPUT_STREAM_BUFFER_SIZE)) { + LOGGER.debug(Messages.UPLOADING_MTAR_STREAM_FROM_REMOTE_ENDPOINT, fileFromUrlData.uri()); + return fileService.addFile(ImmutableFileEntry.builder() + .space(uploadFromUrlContext.getJobEntry() + .getSpaceGuid()) + .namespace(uploadFromUrlContext.getJobEntry() + .getNamespace()) + .name(fileName) + .size(BigInteger.valueOf(fileFromUrlData.fileSize())) + .build(), bufferedContent); + } + } + + private void resetCounterOnRetry(UploadFromUrlContext upload, Lock lock) { + try { + lock.lock(); + upload.getCounterRef() + .set(0); + AsyncUploadJobEntry asyncUploadJobEntry = asyncUploadJobService.createQuery() + .id(upload.getJobEntry() + .getId()) + .singleResult(); + asyncUploadJobService.update(asyncUploadJobEntry, ImmutableAsyncUploadJobEntry.copyOf(asyncUploadJobEntry) + .withUpdatedAt(LocalDateTime.now()) + .withBytesRead(0L)); + } finally { + lock.unlock(); + } + } + + private String extractFileName(String url) { + String path = URI.create(url) + .getPath(); + if (path.indexOf('/') == -1) { + return path; + } + String[] pathFragments = path.split("/"); + return pathFragments[pathFragments.length - 1]; + } + + private DeploymentDescriptor extractDeploymentDescriptor(InputStream appArchiveStream) { + String descriptorString = ArchiveHandler.getDescriptor(appArchiveStream, applicationConfiguration.getMaxMtaDescriptorSize()); + DescriptorParserFacade descriptorParserFacade = descriptorParserFactory.getInstance(); + return descriptorParserFacade.parseDeploymentDescriptor(descriptorString); + } + + private void monitorAsyncUploadJob(AsyncUploadJobEntry updatedJobEntry, Lock lock, AtomicLong counterRef) { + while (updatedJobEntry.getState() == AsyncUploadJobEntry.State.RUNNING) { + try { + lock.lock(); + updatedJobEntry = asyncUploadJobService.createQuery() + .id(updatedJobEntry.getId()) + .singleResult(); + updatedJobEntry = asyncUploadJobService.update(updatedJobEntry, ImmutableAsyncUploadJobEntry.copyOf(updatedJobEntry) + .withBytesRead(counterRef.get()) + .withUpdatedAt( + LocalDateTime.now())); + } finally { + lock.unlock(); + } + waitBetweenUpdates(); + } + } + + protected void waitBetweenUpdates() { + MiscUtil.sleep(WAIT_TIME_BETWEEN_ASYNC_JOB_UPDATES_IN_MILLIS); + } + + private void updateFailedAsyncUploadJob(AsyncUploadJobEntry jobEntry, Exception e, Lock lock) { + try { + lock.lock(); + var failedEntry = asyncUploadJobService.createQuery() + .id(jobEntry.getId()) + .singleResult(); + asyncUploadJobService.update(failedEntry, ImmutableAsyncUploadJobEntry.copyOf(failedEntry) + .withUpdatedAt(LocalDateTime.now()) + .withFinishedAt(LocalDateTime.now()) + .withError(e.getMessage()) + .withState(AsyncUploadJobEntry.State.ERROR)); + } finally { + lock.unlock(); + } + } + + protected ResilientOperationExecutor getResilientOperationExecutor() { + return new ResilientOperationExecutor(); + } + +} diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/UploadFromUrlContext.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/UploadFromUrlContext.java new file mode 100644 index 0000000000..c1ef04215d --- /dev/null +++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/UploadFromUrlContext.java @@ -0,0 +1,27 @@ +package org.cloudfoundry.multiapps.controller.web.upload; + +import java.time.LocalDateTime; +import java.util.concurrent.atomic.AtomicLong; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.cloudfoundry.Nullable; +import org.cloudfoundry.multiapps.controller.api.model.UserCredentials; +import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry; +import org.immutables.value.Value; + +@Value.Immutable +@JsonSerialize(as = ImmutableUploadFromUrlContext.class) +@JsonDeserialize(as = ImmutableUploadFromUrlContext.class) +public interface UploadFromUrlContext { + + AsyncUploadJobEntry getJobEntry(); + + String getFileUrl(); + + @Nullable + UserCredentials getUserCredentials(); + + AtomicLong getCounterRef(); + + LocalDateTime getStartTime(); +} diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClient.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClient.java new file mode 100644 index 0000000000..868ea8611c --- /dev/null +++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClient.java @@ -0,0 +1,147 @@ +package org.cloudfoundry.multiapps.controller.web.upload.client; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URLDecoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.text.MessageFormat; +import java.time.Duration; +import java.util.Arrays; +import java.util.Base64; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import org.apache.commons.io.IOUtils; +import org.cloudfoundry.multiapps.common.SLException; +import org.cloudfoundry.multiapps.controller.api.model.UserCredentials; +import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier; +import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor; +import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration; +import org.cloudfoundry.multiapps.controller.core.util.UriUtil; +import org.cloudfoundry.multiapps.controller.web.Constants; +import org.cloudfoundry.multiapps.controller.web.Messages; +import org.cloudfoundry.multiapps.controller.web.upload.UploadFromUrlContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; + +@Named +public class DeployFromUrlRemoteClient { + + private static final Duration HTTP_CONNECT_TIMEOUT = Duration.ofMinutes(10); + private static final String USERNAME_PASSWORD_URL_FORMAT = "{0}:{1}"; + private static final int ERROR_RESPONSE_BODY_MAX_LENGTH = 4 * 1024; + + private static final Logger LOGGER = LoggerFactory.getLogger(DeployFromUrlRemoteClient.class); + + private final HttpClient httpClient = buildHttpClient(); + private final ResilientOperationExecutor resilientOperationExecutor = getResilientOperationExecutor(); + + private final ApplicationConfiguration applicationConfiguration; + + @Inject + public DeployFromUrlRemoteClient(ApplicationConfiguration applicationConfiguration) { + this.applicationConfiguration = applicationConfiguration; + } + + public FileFromUrlData downloadFileFromUrl(UploadFromUrlContext uploadFromUrlContext) throws Exception { + if (!UriUtil.isUrlSecure(uploadFromUrlContext.getFileUrl())) { + throw new SLException(Messages.MTAR_ENDPOINT_NOT_SECURE); + } + UriUtil.validateUrl(uploadFromUrlContext.getFileUrl()); + + HttpResponse response = callRemoteEndpointWithRetry(uploadFromUrlContext.getFileUrl(), + uploadFromUrlContext.getUserCredentials()); + long fileSize = response.headers() + .firstValueAsLong(Constants.CONTENT_LENGTH) + .orElseThrow(() -> new SLException(Messages.FILE_URL_RESPONSE_DID_NOT_RETURN_CONTENT_LENGTH)); + + long maxUploadSize = applicationConfiguration.getMaxUploadSize(); + if (fileSize > maxUploadSize) { + throw new SLException(MessageFormat.format(Messages.MAX_UPLOAD_SIZE_EXCEEDED, maxUploadSize)); + } + return new FileFromUrlData(response.body(), response.uri(), fileSize); + } + + private HttpResponse callRemoteEndpointWithRetry(String decodedUrl, UserCredentials userCredentials) + throws Exception { + return resilientOperationExecutor.execute((CheckedSupplier>) () -> { + var request = buildFetchFileRequest(decodedUrl, userCredentials); + LOGGER.debug(Messages.CALLING_REMOTE_MTAR_ENDPOINT, getMaskedUri(urlDecodeUrl(decodedUrl))); + var response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream()); + if (response.statusCode() / 100 != 2) { + String error = readErrorBodyFromResponse(response); + LOGGER.error(error); + if (response.statusCode() == HttpStatus.UNAUTHORIZED.value()) { + String errorMessage = MessageFormat.format(Messages.DEPLOY_FROM_URL_WRONG_CREDENTIALS, + UriUtil.stripUserInfo(decodedUrl)); + throw new SLException(errorMessage); + } + throw new SLException(MessageFormat.format(Messages.ERROR_FROM_REMOTE_MTAR_ENDPOINT, getMaskedUri(urlDecodeUrl(decodedUrl)), + response.statusCode(), error)); + } + return response; + }); + } + + private String getMaskedUri(String url) { + if (url.contains("@")) { + return url.substring(url.lastIndexOf("@")) + .replace("@", "..."); + } else { + return url; + } + } + + private String urlDecodeUrl(String url) { + return URLDecoder.decode(url, StandardCharsets.UTF_8); + } + + private HttpRequest buildFetchFileRequest(String decodedUrl, UserCredentials userCredentials) { + var builder = HttpRequest.newBuilder() + .GET() + .timeout(Duration.ofMinutes(15)); + var uri = URI.create(decodedUrl); + var userInfo = uri.getUserInfo(); + if (userCredentials != null) { + builder.uri(uri); + String userCredentialsUrlFormat = MessageFormat.format(USERNAME_PASSWORD_URL_FORMAT, userCredentials.getUsername(), + userCredentials.getPassword()); + String encodedAuth = Base64.getEncoder() + .encodeToString(userCredentialsUrlFormat.getBytes()); + builder.header(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth); + } else if (userInfo != null) { + builder.uri(URI.create(decodedUrl.replace(uri.getRawUserInfo() + "@", ""))); + String encodedAuth = Base64.getEncoder() + .encodeToString(userInfo.getBytes()); + builder.header(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth); + } else { + builder.uri(uri); + } + return builder.build(); + } + + private String readErrorBodyFromResponse(HttpResponse response) throws IOException { + try (InputStream is = response.body()) { + byte[] buffer = new byte[ERROR_RESPONSE_BODY_MAX_LENGTH]; + int read = IOUtils.read(is, buffer); + return new String(Arrays.copyOf(buffer, read)); + } + } + + protected HttpClient buildHttpClient() { + return HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .connectTimeout(HTTP_CONNECT_TIMEOUT) + .followRedirects(HttpClient.Redirect.NORMAL) + .build(); + } + + protected ResilientOperationExecutor getResilientOperationExecutor() { + return new ResilientOperationExecutor(); + } +} diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/FileFromUrlData.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/FileFromUrlData.java new file mode 100644 index 0000000000..585f3e9755 --- /dev/null +++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/client/FileFromUrlData.java @@ -0,0 +1,7 @@ +package org.cloudfoundry.multiapps.controller.web.upload.client; + +import java.io.InputStream; +import java.net.URI; + +public record FileFromUrlData(InputStream fileInputStream, URI uri, long fileSize) { +} diff --git a/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/exception/RejectedAsyncUploadJobException.java b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/exception/RejectedAsyncUploadJobException.java new file mode 100644 index 0000000000..3fa4fb09bd --- /dev/null +++ b/multiapps-controller-web/src/main/java/org/cloudfoundry/multiapps/controller/web/upload/exception/RejectedAsyncUploadJobException.java @@ -0,0 +1,17 @@ +package org.cloudfoundry.multiapps.controller.web.upload.exception; + +import java.util.concurrent.RejectedExecutionException; +import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry; + +public class RejectedAsyncUploadJobException extends RejectedExecutionException { + + private final AsyncUploadJobEntry asyncUploadJobEntry; + + public RejectedAsyncUploadJobException(AsyncUploadJobEntry asyncUploadJobEntry) { + this.asyncUploadJobEntry = asyncUploadJobEntry; + } + + public AsyncUploadJobEntry getAsyncUploadJobEntry() { + return asyncUploadJobEntry; + } +} diff --git a/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java index 94197f8d70..77cad5d5ee 100644 --- a/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java +++ b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/FilesApiServiceImplTest.java @@ -1,19 +1,7 @@ package org.cloudfoundry.multiapps.controller.web.api.impl; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayInputStream; import java.io.InputStream; import java.math.BigInteger; -import java.net.http.HttpClient; -import java.net.http.HttpHeaders; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.net.http.HttpResponse.BodyHandlers; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.Base64; @@ -24,39 +12,33 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.FutureTask; - import org.apache.commons.lang3.RandomStringUtils; import org.cloudfoundry.multiapps.common.SLException; import org.cloudfoundry.multiapps.controller.api.model.AsyncUploadResult; import org.cloudfoundry.multiapps.controller.api.model.FileMetadata; import org.cloudfoundry.multiapps.controller.api.model.ImmutableFileUrl; -import org.cloudfoundry.multiapps.controller.api.model.ImmutableUserCredentials; -import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor; import org.cloudfoundry.multiapps.controller.core.auditlogging.FilesApiServiceAuditLog; -import org.cloudfoundry.multiapps.controller.core.helpers.DescriptorParserFacadeFactory; -import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration; import org.cloudfoundry.multiapps.controller.core.util.UserInfo; import org.cloudfoundry.multiapps.controller.persistence.Constants; import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry; +import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry.State; import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry; +import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableAsyncUploadJobEntry; import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry; import org.cloudfoundry.multiapps.controller.persistence.query.AsyncUploadJobsQuery; import org.cloudfoundry.multiapps.controller.persistence.services.AsyncUploadJobService; import org.cloudfoundry.multiapps.controller.persistence.services.FileService; import org.cloudfoundry.multiapps.controller.persistence.services.FileStorageException; -import org.junit.jupiter.api.BeforeAll; +import org.cloudfoundry.multiapps.controller.web.upload.AsyncUploadJobOrchestrator; +import org.cloudfoundry.multiapps.controller.web.upload.exception.RejectedAsyncUploadJobException; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.Answers; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.mockito.Spy; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.core.context.SecurityContextHolder; @@ -64,78 +46,75 @@ import org.springframework.security.oauth2.core.user.DefaultOAuth2User; import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartHttpServletRequest; - -import jakarta.persistence.NoResultException; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_SELF; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; class FilesApiServiceImplTest { - private static final String MTA_ID = "anatz"; - private static final String FILE_URL = Base64.getUrlEncoder() - .encodeToString("https://host.domain/test.mtar?query=true".getBytes(StandardCharsets.UTF_8)); private static final String SPACE_GUID = "896e6be9-8217-4a1c-b938-09b30966157a"; private static final String NAMESPACE = "custom-namespace"; private static final String DIGEST_CHARACTER_TABLE = "123456789ABCDEF"; - private static final String DECODED_URL_WITH_CREDENTIALS_IN_THE_UTL = "https://abc:abv@google.com"; - private static final String DECODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL = "https://abc:ab%40v@google.com"; - private static final String ENCODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL_AND_IN_PARAM = "https://abc:ab%40v@google.com/%40user"; - private static final String DECODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL_AND_IN_PARAM = "https://abc:ab@v@google.com/@user"; + + private static final String FILE_URL = Base64.getUrlEncoder() + .encodeToString( + "https://host.domain/test.mtar?query=true".getBytes(StandardCharsets.UTF_8)); + private static final String DECODED_URL_WITH_CREDENTIALS_IN_THE_URL = "https://abc:abv@google.com"; + @Mock private FileService fileService; @Mock private MultipartHttpServletRequest request; @Mock private MultipartFile file; - @Mock - private HttpClient httpClient; @InjectMocks - private final FilesApiServiceImpl testedClass = new FilesApiServiceImpl() { - @Override - protected HttpClient buildHttpClient(String url) { - return httpClient; - } - - @Override - protected ResilientOperationExecutor getResilientOperationExecutor() { - return new ResilientOperationExecutor().withRetryCount(0) - .withWaitTimeBetweenRetriesInMillis(0); - } - }; + private final FilesApiServiceImpl testedClass = new FilesApiServiceImpl(); @Mock private FilesApiServiceAuditLog filesApiServiceAuditLog; - @Mock - private HttpResponse fileUrlResponse; - @Mock(name = "deployFromUrlExecutor") - private ExecutorService asyncFileUploadExecutor; @Mock(name = "fileStorageThreadPool") private ExecutorService fileStorageThreadPool; @Mock - private ApplicationConfiguration configuration; - @Spy - private DescriptorParserFacadeFactory descriptorParserFactory = new DescriptorParserFacadeFactory(configuration); + private AsyncUploadJobOrchestrator asyncUploadJobOrchestrator; @Mock private AsyncUploadJobService uploadJobService; - @BeforeAll - public static void setUser() { - var user = new UserInfo("user1", "user1", null); - var token = new DefaultOAuth2User(Collections.emptyList(), Map.of("user_info", user), "user_info"); - SecurityContextHolder.getContext() - .setAuthentication(new OAuth2AuthenticationToken(token, Collections.emptyList(), "id")); - } - @BeforeEach public void initialize() throws Exception { MockitoAnnotations.openMocks(this) .close(); + SecurityContextHolder.clearContext(); + var user = new UserInfo("user1", "user1", null); + var token = new DefaultOAuth2User(Collections.emptyList(), Map.of("user_info", user), "user_info"); + SecurityContextHolder.getContext() + .setAuthentication(new OAuth2AuthenticationToken(token, Collections.emptyList(), "id")); Mockito.when(request.getRequestURI()) .thenReturn(""); prepareFileStorageThreadPool(); } + @AfterEach + public void cleanup() { + SecurityContextHolder.clearContext(); + } + + private void prepareFileStorageThreadPool() { + when(fileStorageThreadPool.submit(any(Callable.class))).thenAnswer(invocation -> { + Callable callable = invocation.getArgument(0); + FutureTask futureTask = new FutureTask<>(callable); + futureTask.run(); + return futureTask; + }); + } + @Test void testGetMtaFiles() throws Exception { FileEntry entryOne = createFileEntry("test.mtar"); - FileEntry entryTwo = createFileEntry("extension.mtaet"); + FileEntry entryTwo = createFileEntry("extension.mtaext"); Mockito.when(fileService.listFiles(Mockito.eq(SPACE_GUID), Mockito.eq(NAMESPACE))) .thenReturn(List.of(entryOne, entryTwo)); ResponseEntity> response = testedClass.getFiles(SPACE_GUID, NAMESPACE); @@ -144,7 +123,28 @@ void testGetMtaFiles() throws Exception { assertEquals(2, files.size()); assertMetadataMatches(entryOne, files.get(0)); assertMetadataMatches(entryTwo, files.get(1)); + } + private FileEntry createFileEntry(String name) { + return ImmutableFileEntry.builder() + .id(UUID.randomUUID() + .toString()) + .digest(RandomStringUtils.random(32, DIGEST_CHARACTER_TABLE)) + .digestAlgorithm(Constants.DIGEST_ALGORITHM) + .name(name) + .namespace(NAMESPACE) + .size(BigInteger.valueOf(new Random().nextInt(1024 * 1024 * 10))) + .space(SPACE_GUID) + .build(); + } + + private void assertMetadataMatches(FileEntry expected, FileMetadata actual) { + assertEquals(expected.getId(), actual.getId()); + assertEquals(expected.getName(), actual.getName()); + assertEquals(expected.getSpace(), actual.getSpace()); + assertEquals(expected.getSize(), actual.getSize()); + assertEquals(expected.getDigest(), actual.getDigest()); + assertEquals(expected.getDigestAlgorithm(), actual.getDigestAlgorithm()); } @Test @@ -175,8 +175,7 @@ void testUploadMtaFile() throws Exception { .namespace(NAMESPACE) .name(fileName) .size(BigInteger.valueOf(fileSize)) - .build()), - Mockito.any(InputStream.class))) + .build()), any(InputStream.class))) .thenReturn(fileEntry); ResponseEntity response = testedClass.uploadFile(request, SPACE_GUID, NAMESPACE); @@ -189,402 +188,315 @@ void testUploadMtaFile() throws Exception { .namespace(NAMESPACE) .name(fileName) .size(BigInteger.valueOf(fileSize)) - .build()), - Mockito.any(InputStream.class)); + .build()), any(InputStream.class)); FileMetadata fileMetadata = response.getBody(); assertMetadataMatches(fileEntry, fileMetadata); } @Test - void testUploadFileFromUrl() throws Exception { - String fileName = "test.mtar"; - FileEntry fileEntry = createFileEntry(fileName); - HttpHeaders headers = HttpHeaders.of(Map.of("Content-Length", List.of("20")), (a, b) -> true); - InputStream mockStream = Mockito.mock(InputStream.class); - AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF); - var jobEntry = mockUploadJobEntry(fileEntry.getId(), AsyncUploadJobEntry.State.FINISHED, null); - - Mockito.when(fileUrlResponse.headers()) - .thenReturn(headers); - Mockito.when(fileUrlResponse.statusCode()) - .thenReturn(200); - Mockito.when(fileUrlResponse.body()) - .thenReturn(mockStream); - - Mockito.when(httpClient.send(Mockito.any(), Mockito.eq(BodyHandlers.ofInputStream()))) - .thenReturn(fileUrlResponse); - AsyncUploadJobsQuery queryReturningNoJobs = mock(AsyncUploadJobsQuery.class); - when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(queryReturningNoJobs); - when(query.list()).thenReturn(List.of(jobEntry)); - Mockito.when(uploadJobService.createQuery()) - .thenReturn(query); - Mockito.when(uploadJobService.update(any(), any())) - .thenReturn(jobEntry); - - Mockito.when(fileService.addFile(Mockito.eq(ImmutableFileEntry.builder() - .name(SPACE_GUID) - .namespace(NAMESPACE) - .name(fileName) - .size(BigInteger.valueOf(20L)) - .build()), - Mockito.any(InputStream.class))) - .thenReturn(fileEntry); - Mockito.when(fileService.getFile(Mockito.eq(SPACE_GUID), Mockito.eq(fileEntry.getId()))) - .thenReturn(fileEntry); - Mockito.when(configuration.getMaxUploadSize()) - .thenReturn(ApplicationConfiguration.DEFAULT_MAX_UPLOAD_SIZE); - Future future = Mockito.mock(Future.class); - when(future.isDone()).thenReturn(true); - prepareAsyncExecutor(future); - - ResponseEntity startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, - ImmutableFileUrl.of(FILE_URL, null)); - - assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED); - - String jobUrl = startUploadResponse.getHeaders() - .getFirst("Location"); - String jobGuid = jobUrl.substring(jobUrl.lastIndexOf('/')); - - ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobGuid); - - assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.CREATED); - - Mockito.verify(fileService) - .addFile(Mockito.eq(ImmutableFileEntry.builder() - .name(fileName) - .namespace(NAMESPACE) - .space(SPACE_GUID) - .size(BigInteger.valueOf(20L)) - .build()), - Mockito.any(InputStream.class)); - - var responseBody = uploadJobResponse.getBody(); - var fileMetadata = responseBody.getFile(); - assertMetadataMatches(fileEntry, fileMetadata); - - var mtaId = responseBody.getMtaId(); - var status = responseBody.getStatus(); - assertEquals(mtaId, MTA_ID); - assertEquals(status, AsyncUploadResult.JobStatus.FINISHED); - } - - private void prepareFileStorageThreadPool() { - when(fileStorageThreadPool.submit(any(Callable.class))).thenAnswer(invocation -> { - Callable callable = invocation.getArgument(0); - FutureTask futureTask = new FutureTask<>(callable); - futureTask.run(); - return futureTask; - }); - } - - private void prepareAsyncExecutor(Future future) { - Mockito.doAnswer(invocationOnMock -> { - Runnable r = invocationOnMock.getArgument(0); - r.run(); - return future; - }) - .when(asyncFileUploadExecutor) - .submit((Runnable) Mockito.any()); - } - - @Test - void testUploadFileFromUrlWithInvalidJobId() { - AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF); - Mockito.doThrow(NoResultException.class) - .when(query) - .singleResult(); - - Mockito.when(uploadJobService.createQuery()) - .thenReturn(query); - - ResponseEntity response = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, "invalid"); - - assertEquals(response.getStatusCode(), HttpStatus.NOT_FOUND); - } - - @Test - void testFileUrlDoesntReturnContentLength() throws Exception { - HttpHeaders headers = HttpHeaders.of(Collections.emptyMap(), (a, b) -> true); - AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF); - String error = "no content length"; - var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.ERROR, error); - - Mockito.when(fileUrlResponse.headers()) - .thenReturn(headers); - Mockito.when(fileUrlResponse.statusCode()) - .thenReturn(200); - - Mockito.when(httpClient.send(Mockito.any(), Mockito.eq(BodyHandlers.ofInputStream()))) - .thenReturn(fileUrlResponse); - - AsyncUploadJobsQuery queryReturningNoJobs = mock(AsyncUploadJobsQuery.class); - when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(queryReturningNoJobs); - when(query.list()).thenReturn(List.of(jobEntry)); - Mockito.when(uploadJobService.createQuery()) - .thenReturn(query); - Mockito.when(uploadJobService.update(any(), any())) - .thenReturn(jobEntry); - Future future = Mockito.mock(Future.class); - when(future.isDone()).thenReturn(true); - prepareAsyncExecutor(future); - - ResponseEntity startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, - ImmutableFileUrl.of(FILE_URL, null)); - - assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED); - - String jobUrl = startUploadResponse.getHeaders() - .getFirst("Location"); - String jobGuid = jobUrl.substring(jobUrl.lastIndexOf('/')); - - ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobGuid); - - assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.OK); - - var responseBody = uploadJobResponse.getBody(); - assertEquals(responseBody.getStatus(), AsyncUploadResult.JobStatus.ERROR); - assertEquals(responseBody.getError(), error); + void testStartDeployFromUrl() { + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + when(asyncUploadJobsQuery.list()).thenReturn(Collections.emptyList()); + + String expectedJobId = UUID.randomUUID() + .toString(); + when(asyncUploadJobOrchestrator.executeUploadFromUrl(any(), any(), any(), any(), any())).thenReturn( + ImmutableAsyncUploadJobEntry.builder() + .id(expectedJobId) + .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL) + .startedAt(LocalDateTime.now()) + .state(State.INITIAL) + .user("user1") + .spaceGuid(SPACE_GUID) + .instanceIndex(0) + .build()); + ResponseEntity response = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.builder() + .fileUrl(FILE_URL) + .build()); + assertEquals(HttpStatus.ACCEPTED, response.getStatusCode()); + assertEquals("spaces/" + SPACE_GUID + "/files/jobs/" + expectedJobId, response.getHeaders() + .getLocation() + .toString()); + Mockito.verify(asyncUploadJobOrchestrator) + .executeUploadFromUrl(eq(SPACE_GUID), eq(NAMESPACE), any(String.class), any(String.class), eq(null)); } @Test - void testUploadFromUrlWhenThereIsValidExistingJob() { - AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF); - when(uploadJobService.createQuery()).thenReturn(query); - var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.ERROR, null); - when(query.list()).thenReturn(List.of(jobEntry)); - Future runningTask = mock(Future.class); - prepareAsyncExecutor(runningTask); - when(uploadJobService.update(any(), any())).thenReturn(jobEntry); - ResponseEntity firstUpload = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.of(FILE_URL, null)); - String locationHeader = firstUpload.getHeaders() - .getFirst(org.springframework.http.HttpHeaders.LOCATION); - String createdJobId = locationHeader.substring(locationHeader.lastIndexOf("/") + 1); - when(jobEntry.getId()).thenReturn(createdJobId); - ResponseEntity secondUpload = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.of(FILE_URL, null)); - assertEquals(HttpStatus.SEE_OTHER, secondUpload.getStatusCode()); - } - - @Test - void testFileUrlReturnsContentLengthAboveMaxUploadSize() throws Exception { - long invalidFileSize = ApplicationConfiguration.DEFAULT_MAX_UPLOAD_SIZE + 1024; - String fileSize = Long.toString(invalidFileSize); - AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF); - String error = "content length exceeds max permitted size of 4GB"; - HttpHeaders headers = HttpHeaders.of(Map.of("Content-Length", List.of(fileSize)), (a, b) -> true); - var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.ERROR, error); - - Mockito.when(fileUrlResponse.headers()) - .thenReturn(headers); - Mockito.when(fileUrlResponse.statusCode()) - .thenReturn(200); - - Mockito.when(httpClient.send(Mockito.any(), Mockito.eq(BodyHandlers.ofInputStream()))) - .thenReturn(fileUrlResponse); - - AsyncUploadJobsQuery queryReturningNoJobs = mock(AsyncUploadJobsQuery.class); - when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(queryReturningNoJobs); - when(query.list()).thenReturn(List.of(jobEntry)); - Mockito.when(uploadJobService.createQuery()) - .thenReturn(query); - Mockito.when(uploadJobService.update(any(), any())) - .thenReturn(jobEntry); - Future future = Mockito.mock(Future.class); - when(future.isDone()).thenReturn(true); - prepareAsyncExecutor(future); - - ResponseEntity startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, - ImmutableFileUrl.of(FILE_URL, null)); - - assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED); - - String jobUrl = startUploadResponse.getHeaders() - .getFirst("Location"); - String jobGuid = jobUrl.substring(jobUrl.lastIndexOf('/')); - - ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobGuid); - - assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.OK); - - var responseBody = uploadJobResponse.getBody(); - assertEquals(responseBody.getStatus(), AsyncUploadResult.JobStatus.ERROR); - assertEquals(responseBody.getError(), error); - } - - @ParameterizedTest - @ValueSource(strings = { "https://host.domain/path/file?query=true", "http://host.domain/path/file.mtar?query=true" }) - void testUploadFileWithInvalidUrl(String url) throws Exception { - AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF); - HttpHeaders headers = HttpHeaders.of(Map.of("Content-Length", List.of("20")), (a, b) -> true); - var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.ERROR, "error"); - - Mockito.when(fileUrlResponse.statusCode()) - .thenReturn(200); - Mockito.when(fileUrlResponse.headers()) - .thenReturn(headers); - - Mockito.when(httpClient.send(Mockito.any(), Mockito.eq(BodyHandlers.ofInputStream()))) - .thenReturn(fileUrlResponse); - - AsyncUploadJobsQuery queryReturningNoJobs = mock(AsyncUploadJobsQuery.class); - when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(queryReturningNoJobs); - when(query.list()).thenReturn(List.of(jobEntry)); - Mockito.when(uploadJobService.createQuery()) - .thenReturn(query); - Mockito.when(uploadJobService.update(any(), any())) - .thenReturn(jobEntry); - Future future = Mockito.mock(Future.class); - when(future.isDone()).thenReturn(true); - prepareAsyncExecutor(future); + void testStartDeployFromUrlWhenExecutorRejectsJob() { + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + when(asyncUploadJobsQuery.list()).thenReturn(Collections.emptyList()); - String invalidFileUrl = Base64.getUrlEncoder() - .encodeToString(url.getBytes(StandardCharsets.UTF_8)); + AsyncUploadJobEntry rejectedEntry = mock(AsyncUploadJobEntry.class); + when(rejectedEntry.getId()).thenReturn("rejected-job-id"); - ResponseEntity startUploadResponse = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, - ImmutableFileUrl.of(invalidFileUrl, null)); + when(asyncUploadJobOrchestrator.executeUploadFromUrl(any(), any(), any(), any(), any())).thenThrow( + new RejectedAsyncUploadJobException(rejectedEntry)); - assertEquals(startUploadResponse.getStatusCode(), HttpStatus.ACCEPTED); + ResponseEntity response = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.builder() + .fileUrl(FILE_URL) + .build()); - String jobUrl = startUploadResponse.getHeaders() - .getFirst("Location"); - String jobGuid = jobUrl.substring(jobUrl.lastIndexOf('/')); + assertEquals(HttpStatus.TOO_MANY_REQUESTS, response.getStatusCode()); + assertEquals("30", response.getHeaders() + .getFirst("Retry-After")); - ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobGuid); + Mockito.verify(asyncUploadJobOrchestrator) + .executeUploadFromUrl(eq(SPACE_GUID), eq(NAMESPACE), any(String.class), any(String.class), eq(null)); - assertEquals(uploadJobResponse.getStatusCode(), HttpStatus.OK); - - var responseBody = uploadJobResponse.getBody(); - assertEquals(AsyncUploadResult.JobStatus.ERROR, responseBody.getStatus()); + Mockito.verify(asyncUploadJobsQuery) + .delete(); } @Test - void testGetUploadFromUrlOnDifferentInstance() { - AsyncUploadJobsQuery query = Mockito.mock(AsyncUploadJobsQuery.class, Answers.RETURNS_SELF); - when(query.withStateAnyOf(AsyncUploadJobEntry.State.INITIAL, AsyncUploadJobEntry.State.RUNNING)).thenReturn(query); - var jobEntry = mockUploadJobEntry(null, AsyncUploadJobEntry.State.RUNNING, null); - when(query.list()).thenReturn(List.of(jobEntry)); - Mockito.when(uploadJobService.createQuery()) - .thenReturn(query); - when(configuration.getApplicationInstanceIndex()).thenReturn(3); - ResponseEntity uploadJobResponse = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobEntry.getId()); - assertEquals(AsyncUploadResult.JobStatus.ERROR, uploadJobResponse.getBody() - .getStatus()); + void testStartDeployFromUrlWhenStuckJobExists() { + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + + String existingJobId = UUID.randomUUID() + .toString(); + AsyncUploadJobEntry stuckJob = ImmutableAsyncUploadJobEntry.builder() + .id(existingJobId) + .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL) + .startedAt(LocalDateTime.now() + .minusMinutes(5)) + .updatedAt(LocalDateTime.now() + .minusMinutes(2)) + .state(State.RUNNING) + .user("user1") + .spaceGuid(SPACE_GUID) + .instanceIndex(0) + .build(); + + when(asyncUploadJobsQuery.list()).thenReturn(List.of(stuckJob)) + .thenReturn(Collections.emptyList()); + + String newJobId = UUID.randomUUID() + .toString(); + when(asyncUploadJobOrchestrator.executeUploadFromUrl(any(), any(), any(), any(), any())).thenReturn( + ImmutableAsyncUploadJobEntry.builder() + .id(newJobId) + .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL) + .startedAt(LocalDateTime.now()) + .state(State.INITIAL) + .user("user1") + .spaceGuid(SPACE_GUID) + .instanceIndex(0) + .build()); + + ResponseEntity response = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.builder() + .fileUrl(FILE_URL) + .build()); + + assertEquals(HttpStatus.ACCEPTED, response.getStatusCode()); + assertEquals("spaces/" + SPACE_GUID + "/files/jobs/" + newJobId, response.getHeaders() + .getLocation() + .toString()); + + Mockito.verify(asyncUploadJobsQuery) + .id(existingJobId); + Mockito.verify(asyncUploadJobsQuery) + .delete(); + Mockito.verify(asyncUploadJobOrchestrator) + .executeUploadFromUrl(eq(SPACE_GUID), eq(NAMESPACE), any(String.class), any(String.class), eq(null)); } @Test - void testCallRemoteEndpointWithRetry_withSuccessfulCall() throws Exception { - HttpResponse response = Mockito.mock(HttpResponse.class); - - when(httpClient.send(any(HttpRequest.class), any())).thenReturn(response); - when(response.statusCode()).thenReturn(HttpStatus.OK.value()); - - HttpResponse testResponce = testedClass.callRemoteEndpointWithRetry(httpClient, DECODED_URL_WITH_CREDENTIALS_IN_THE_UTL, - ImmutableUserCredentials.builder() - .build()); - assertEquals(200, testResponce.statusCode()); + void testStartDeployFromUrlWhenActiveJobExists() { + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + + String existingJobId = UUID.randomUUID() + .toString(); + AsyncUploadJobEntry activeJob = ImmutableAsyncUploadJobEntry.builder() + .id(existingJobId) + .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL) + .startedAt(LocalDateTime.now() + .minusMinutes(1)) + .updatedAt(LocalDateTime.now() + .minusSeconds(10)) + .state(State.RUNNING) + .user("user1") + .spaceGuid(SPACE_GUID) + .instanceIndex(0) + .build(); + + when(asyncUploadJobsQuery.list()).thenReturn(List.of(activeJob)); + + ResponseEntity response = testedClass.startUploadFromUrl(SPACE_GUID, NAMESPACE, ImmutableFileUrl.builder() + .fileUrl(FILE_URL) + .build()); + + assertEquals(HttpStatus.SEE_OTHER, response.getStatusCode()); + assertEquals("spaces/" + SPACE_GUID + "/files/jobs/" + existingJobId, response.getHeaders() + .getLocation() + .toString()); + + Mockito.verify(asyncUploadJobsQuery, Mockito.never()) + .id(any()); + Mockito.verify(asyncUploadJobsQuery, Mockito.never()) + .delete(); + Mockito.verify(asyncUploadJobOrchestrator, Mockito.never()) + .executeUploadFromUrl(any(), any(), any(), any(), any()); } @Test - void testCallRemoteEndpointWithRetry_withUnauthorizedResponce() throws Exception { - HttpResponse response = Mockito.mock(HttpResponse.class); - - when(httpClient.send(any(HttpRequest.class), any())).thenReturn(response); - when(response.statusCode()).thenReturn(HttpStatus.UNAUTHORIZED.value()); - when(response.body()).thenReturn(new ByteArrayInputStream("test".getBytes())); - - SLException thrownException = assertThrows(SLException.class, - () -> testedClass.callRemoteEndpointWithRetry(httpClient, - DECODED_URL_WITH_CREDENTIALS_IN_THE_UTL, - ImmutableUserCredentials.builder() - .build())); - - assertEquals("Credentials to https://google.com are wrong. Make sure that they are correct.", thrownException.getMessage()); + void testGetUploadFromUrlJobWhenJobNotFound() { + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + when(asyncUploadJobsQuery.list()).thenReturn(Collections.emptyList()); + + ResponseEntity response = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, "non-existent-job-id"); + + assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode()); + + Mockito.verify(asyncUploadJobsQuery) + .id("non-existent-job-id"); + Mockito.verify(asyncUploadJobsQuery) + .spaceGuid(SPACE_GUID); + Mockito.verify(asyncUploadJobsQuery) + .user("user1"); + Mockito.verify(asyncUploadJobsQuery) + .namespace(NAMESPACE); } @Test - void testCallRemoteEndpointWithRetry_withBadRequest() throws Exception { - HttpResponse response = Mockito.mock(HttpResponse.class); - - when(httpClient.send(any(HttpRequest.class), any())).thenReturn(response); - when(response.statusCode()).thenReturn(HttpStatus.BAD_REQUEST.value()); - when(response.body()).thenReturn(new ByteArrayInputStream("test".getBytes())); - - SLException thrownException = assertThrows(SLException.class, - () -> testedClass.callRemoteEndpointWithRetry(httpClient, - DECODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL, - ImmutableUserCredentials.builder() - .build())); - - assertEquals("Error from remote MTAR endpoint ...google.com with status code 400, message: test", thrownException.getMessage()); + void testGetUploadFromUrlJobWhenJobIsSuccessful() throws FileStorageException { + String jobId = UUID.randomUUID() + .toString(); + String fileId = UUID.randomUUID() + .toString(); + String mtaId = "test-mta"; + + AsyncUploadJobEntry successfulJob = ImmutableAsyncUploadJobEntry.builder() + .id(jobId) + .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL) + .startedAt(LocalDateTime.now() + .minusMinutes(5)) + .updatedAt(LocalDateTime.now() + .minusMinutes(1)) + .state(State.FINISHED) + .user("user1") + .spaceGuid(SPACE_GUID) + .instanceIndex(0) + .fileId(fileId) + .mtaId(mtaId) + .build(); + + FileEntry fileEntry = createFileEntry("test.mtar"); + + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + when(asyncUploadJobsQuery.list()).thenReturn(List.of(successfulJob)); + when(fileService.getFile(SPACE_GUID, fileId)).thenReturn(fileEntry); + + ResponseEntity response = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobId); + + assertEquals(HttpStatus.CREATED, response.getStatusCode()); + AsyncUploadResult result = response.getBody(); + assertEquals(AsyncUploadResult.JobStatus.FINISHED, result.getStatus()); + assertEquals(mtaId, result.getMtaId()); + assertEquals(fileEntry.getId(), result.getFile() + .getId()); + assertEquals(fileEntry.getName(), result.getFile() + .getName()); } @Test - void testCallRemoteEndpointWithRetry_withBadRequestAndAtInParam() throws Exception { - HttpResponse response = Mockito.mock(HttpResponse.class); - - when(httpClient.send(any(HttpRequest.class), any())).thenReturn(response); - when(response.statusCode()).thenReturn(HttpStatus.BAD_REQUEST.value()); - when(response.body()).thenReturn(new ByteArrayInputStream("test".getBytes())); + void testGetUploadFromUrlJobWhenJobIsStuck() { + String jobId = UUID.randomUUID() + .toString(); + + AsyncUploadJobEntry stuckJob = ImmutableAsyncUploadJobEntry.builder() + .id(jobId) + .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL) + .startedAt(LocalDateTime.now() + .minusMinutes(5)) + .updatedAt(LocalDateTime.now() + .minusMinutes(2)) + .state(State.RUNNING) + .user("user1") + .spaceGuid(SPACE_GUID) + .instanceIndex(0) + .build(); + + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + when(asyncUploadJobsQuery.list()).thenReturn(List.of(stuckJob)); + + ResponseEntity response = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobId); - SLException thrownException = assertThrows(SLException.class, - () -> testedClass.callRemoteEndpointWithRetry(httpClient, - ENCODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL_AND_IN_PARAM, - ImmutableUserCredentials.builder() - .build())); - - assertEquals("Error from remote MTAR endpoint ...user with status code 400, message: test", thrownException.getMessage()); + assertEquals(HttpStatus.OK, response.getStatusCode()); + AsyncUploadResult result = response.getBody(); + assertEquals(AsyncUploadResult.JobStatus.ERROR, result.getStatus()); + assertEquals(List.of(AsyncUploadResult.ClientAction.RETRY_UPLOAD), result.getClientActions()); + assertTrue(result.getError() + .contains("30")); } @Test - void testCallRemoteEndpointWithRetry_withDecodedUrl() throws Exception { - HttpResponse response = Mockito.mock(HttpResponse.class); - - when(httpClient.send(any(HttpRequest.class), any())).thenReturn(response); - when(response.statusCode()).thenReturn(HttpStatus.BAD_REQUEST.value()); - when(response.body()).thenReturn(new ByteArrayInputStream("test".getBytes())); - - assertThrows(IllegalArgumentException.class, - () -> testedClass.callRemoteEndpointWithRetry(httpClient, DECODED_URL_WITH_CREDENTIALS_AND_AT_IN_THE_UTL_AND_IN_PARAM, - null)); - } + void testGetUploadFromUrlJobWhenJobIsRunning() { + String jobId = UUID.randomUUID() + .toString(); + Long bytesRead = 1024L; + + AsyncUploadJobEntry runningJob = ImmutableAsyncUploadJobEntry.builder() + .id(jobId) + .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL) + .startedAt(LocalDateTime.now() + .minusMinutes(1)) + .updatedAt(LocalDateTime.now() + .minusSeconds(10)) + .state(State.RUNNING) + .user("user1") + .spaceGuid(SPACE_GUID) + .instanceIndex(0) + .bytesRead(bytesRead) + .build(); + + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + when(asyncUploadJobsQuery.list()).thenReturn(List.of(runningJob)); + + ResponseEntity response = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobId); - private void assertMetadataMatches(FileEntry expected, FileMetadata actual) { - assertEquals(expected.getId(), actual.getId()); - assertEquals(expected.getName(), actual.getName()); - assertEquals(expected.getSpace(), actual.getSpace()); - assertEquals(expected.getSize(), actual.getSize()); - assertEquals(expected.getDigest(), actual.getDigest()); - assertEquals(expected.getDigestAlgorithm(), actual.getDigestAlgorithm()); + assertEquals(HttpStatus.OK, response.getStatusCode()); + AsyncUploadResult result = response.getBody(); + assertEquals(AsyncUploadResult.JobStatus.RUNNING, result.getStatus()); + assertEquals(bytesRead, result.getBytes()); } - private FileEntry createFileEntry(String name) { - return ImmutableFileEntry.builder() - .id(UUID.randomUUID() - .toString()) - .digest(RandomStringUtils.random(32, DIGEST_CHARACTER_TABLE)) - .digestAlgorithm(Constants.DIGEST_ALGORITHM) - .name(name) - .namespace(NAMESPACE) - .size(BigInteger.valueOf(new Random().nextInt(1024 * 1024 * 10))) - .space(SPACE_GUID) - .build(); - } + @Test + void testGetUploadFromUrlJobWhenJobIsInErrorState() { + String jobId = UUID.randomUUID() + .toString(); + String errorMessage = "Download failed due to network error"; + + AsyncUploadJobEntry errorJob = ImmutableAsyncUploadJobEntry.builder() + .id(jobId) + .url(DECODED_URL_WITH_CREDENTIALS_IN_THE_URL) + .startedAt(LocalDateTime.now() + .minusMinutes(5)) + .updatedAt(LocalDateTime.now() + .minusMinutes(1)) + .state(State.ERROR) + .user("user1") + .spaceGuid(SPACE_GUID) + .instanceIndex(0) + .error(errorMessage) + .build(); + + AsyncUploadJobsQuery asyncUploadJobsQuery = mock(AsyncUploadJobsQuery.class, RETURNS_SELF); + when(uploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + when(asyncUploadJobsQuery.list()).thenReturn(List.of(errorJob)); + + ResponseEntity response = testedClass.getUploadFromUrlJob(SPACE_GUID, NAMESPACE, jobId); - private AsyncUploadJobEntry mockUploadJobEntry(String fileId, AsyncUploadJobEntry.State jobState, String error) { - AsyncUploadJobEntry jobEntry = Mockito.mock(AsyncUploadJobEntry.class); - when(jobEntry.getId()).thenReturn(MTA_ID); - when(jobEntry.getMtaId()).thenReturn(MTA_ID); - when(jobEntry.getUser()).thenReturn("user1"); - when(jobEntry.getSpaceGuid()).thenReturn(SPACE_GUID); - when(jobEntry.getStartedAt()).thenReturn(LocalDateTime.MIN); - when(jobEntry.getFileId()).thenReturn(fileId); - when(jobEntry.getState()).thenReturn(jobState); - when(jobEntry.getUrl()).thenReturn("https://artifactory.sap/mta"); - when(jobEntry.getInstanceIndex()).thenReturn(0); - if (jobState == AsyncUploadJobEntry.State.ERROR) { - when(jobEntry.getError()).thenReturn(error); - } - return jobEntry; + assertEquals(HttpStatus.OK, response.getStatusCode()); + AsyncUploadResult result = response.getBody(); + assertEquals(AsyncUploadResult.JobStatus.ERROR, result.getStatus()); + assertEquals(errorMessage, result.getError()); } } diff --git a/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/OperationsApiServiceImplTest.java b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/OperationsApiServiceImplTest.java index 26554e57fe..a6ee011b3d 100644 --- a/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/OperationsApiServiceImplTest.java +++ b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/api/impl/OperationsApiServiceImplTest.java @@ -8,7 +8,6 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; - import jakarta.persistence.NoResultException; import jakarta.servlet.http.HttpServletRequest; import org.cloudfoundry.multiapps.common.ContentException; @@ -53,7 +52,6 @@ import org.springframework.security.oauth2.client.authentication.OAuth2AuthenticationToken; import org.springframework.security.oauth2.core.user.OAuth2User; import org.springframework.web.server.ResponseStatusException; - import static org.cloudfoundry.multiapps.controller.core.util.SecurityUtil.USER_INFO; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestratorTest.java b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestratorTest.java new file mode 100644 index 0000000000..c0533e5ac2 --- /dev/null +++ b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/upload/AsyncUploadJobOrchestratorTest.java @@ -0,0 +1,457 @@ +package org.cloudfoundry.multiapps.controller.web.upload; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.URI; +import java.time.LocalDateTime; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import org.cloudfoundry.multiapps.common.SLException; +import org.cloudfoundry.multiapps.controller.api.model.UserCredentials; +import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier; +import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor; +import org.cloudfoundry.multiapps.controller.core.helpers.DescriptorParserFacadeFactory; +import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration; +import org.cloudfoundry.multiapps.controller.persistence.model.AsyncUploadJobEntry; +import org.cloudfoundry.multiapps.controller.persistence.model.FileEntry; +import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableAsyncUploadJobEntry; +import org.cloudfoundry.multiapps.controller.persistence.model.ImmutableFileEntry; +import org.cloudfoundry.multiapps.controller.persistence.query.AsyncUploadJobsQuery; +import org.cloudfoundry.multiapps.controller.persistence.services.AsyncUploadJobService; +import org.cloudfoundry.multiapps.controller.persistence.services.FileService; +import org.cloudfoundry.multiapps.controller.web.upload.client.DeployFromUrlRemoteClient; +import org.cloudfoundry.multiapps.controller.web.upload.client.FileFromUrlData; +import org.cloudfoundry.multiapps.controller.web.util.SecurityContextUtil; +import org.cloudfoundry.multiapps.mta.handlers.DescriptorParserFacade; +import org.cloudfoundry.multiapps.mta.model.DeploymentDescriptor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.MockitoAnnotations; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class AsyncUploadJobOrchestratorTest { + + private static final String SPACE_GUID = "space-123"; + private static final String NAMESPACE = "test-namespace"; + private static final String FILE_URL = "https://example.com/file.mtar"; + private static final String DECODED_URL = "https://example.com/file.mtar"; + private static final String JOB_ID = "job-123"; + private static final String MTA_ID = "test-mta"; + private static final String FILE_ID = "file-123"; + private static final String USERNAME = "test-user"; + private static final String ERROR_MESSAGE = "Test error message"; + private static final long NORMAL_FILE_SIZE = 1024L; + + @Mock + private ExecutorService asyncFileUploadExecutor; + + @Mock + private ExecutorService deployFromUrlExecutor; + + @Mock + private ApplicationConfiguration applicationConfiguration; + + @Mock + private AsyncUploadJobService asyncUploadJobService; + + @Mock + private FileService fileService; + + @Mock + private DescriptorParserFacadeFactory descriptorParserFactory; + + @Mock + private DeployFromUrlRemoteClient deployFromUrlRemoteClient; + + @Mock + private DescriptorParserFacade descriptorParserFacade; + + @Mock + private DeploymentDescriptor deploymentDescriptor; + + @Mock + private UserCredentials userCredentials; + + @Mock + private AsyncUploadJobsQuery asyncUploadJobsQuery; + + private TestableAsyncUploadJobOrchestrator asyncUploadJobExecutor; + private AtomicBoolean asyncFileExecutorCalled; + private AtomicBoolean deployFromUrlExecutorCalled; + + @BeforeEach + void setUp() throws Exception { + MockitoAnnotations.openMocks(this) + .close(); + asyncFileExecutorCalled = new AtomicBoolean(false); + deployFromUrlExecutorCalled = new AtomicBoolean(false); + + doAnswer(invocation -> { + deployFromUrlExecutorCalled.set(true); + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(deployFromUrlExecutor) + .submit(any(Runnable.class)); + + doAnswer(invocation -> { + asyncFileExecutorCalled.set(true); + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(asyncFileUploadExecutor) + .submit(any(Runnable.class)); + + asyncUploadJobExecutor = new TestableAsyncUploadJobOrchestrator( + asyncFileUploadExecutor, + deployFromUrlExecutor, + applicationConfiguration, + asyncUploadJobService, + fileService, + descriptorParserFactory, + deployFromUrlRemoteClient + ); + + when(applicationConfiguration.getApplicationInstanceIndex()).thenReturn(0); + when(applicationConfiguration.getMaxMtaDescriptorSize()).thenReturn(1024L * 1024); + + when(asyncUploadJobService.createQuery()).thenReturn(asyncUploadJobsQuery); + when(asyncUploadJobsQuery.id(anyString())).thenReturn(asyncUploadJobsQuery); + + when(descriptorParserFactory.getInstance()).thenReturn(descriptorParserFacade); + when(deploymentDescriptor.getId()).thenReturn(MTA_ID); + when(userCredentials.getUsername()).thenReturn("testuser"); + when(userCredentials.getPassword()).thenReturn("testpass"); + } + + @Test + void testSuccessfulJobExecution() throws Exception { + AsyncUploadJobEntry initialEntry = createInitialJobEntry(); + AsyncUploadJobEntry runningEntry = ImmutableAsyncUploadJobEntry.copyOf(initialEntry) + .withState(AsyncUploadJobEntry.State.RUNNING) + .withStartedAt(LocalDateTime.now()); + AsyncUploadJobEntry finishedEntry = ImmutableAsyncUploadJobEntry.copyOf(runningEntry) + .withState(AsyncUploadJobEntry.State.FINISHED) + .withFileId(FILE_ID) + .withMtaId(MTA_ID) + .withFinishedAt(LocalDateTime.now()); + + FileEntry fileEntry = createFileEntry(); + + when(asyncUploadJobService.add(any(AsyncUploadJobEntry.class))).thenReturn(initialEntry); + when(asyncUploadJobService.update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class))) + .thenReturn(runningEntry) + .thenReturn(finishedEntry); + + when(asyncUploadJobsQuery.singleResult()) + .thenReturn(runningEntry) + .thenReturn(finishedEntry); + + setupSuccessfulRemoteClientMocks(); + + when(fileService.addFile(any(FileEntry.class), any(InputStream.class))).thenReturn(fileEntry); + when(fileService.processFileContent(anyString(), anyString(), any())).thenReturn(deploymentDescriptor); + when(descriptorParserFacade.parseDeploymentDescriptor(anyString())).thenReturn(deploymentDescriptor); + + try (MockedStatic mockedSecurityContext = mockStatic(SecurityContextUtil.class)) { + mockedSecurityContext.when(SecurityContextUtil::getUsername) + .thenReturn(USERNAME); + + AsyncUploadJobEntry result = asyncUploadJobExecutor.executeUploadFromUrl( + SPACE_GUID, NAMESPACE, FILE_URL, DECODED_URL, userCredentials); + + assertEquals(SPACE_GUID, result.getSpaceGuid()); + assertEquals(NAMESPACE, result.getNamespace()); + assertEquals(FILE_URL, result.getUrl()); + assertEquals(AsyncUploadJobEntry.State.INITIAL, result.getState()); + + verify(asyncUploadJobService).add(any(AsyncUploadJobEntry.class)); + verify(deployFromUrlExecutor).submit(any(Runnable.class)); + verify(deployFromUrlRemoteClient).downloadFileFromUrl(any()); + verify(fileService).addFile(any(FileEntry.class), any(InputStream.class)); + verify(fileService).processFileContent(anyString(), anyString(), any()); + } + } + + @Test + void testJobExecutionWithRemoteClientException() throws Exception { + AsyncUploadJobEntry initialEntry = createInitialJobEntry(); + AsyncUploadJobEntry runningEntry = ImmutableAsyncUploadJobEntry.copyOf(initialEntry) + .withState(AsyncUploadJobEntry.State.RUNNING) + .withStartedAt(LocalDateTime.now()); + AsyncUploadJobEntry errorEntry = ImmutableAsyncUploadJobEntry.copyOf(runningEntry) + .withState(AsyncUploadJobEntry.State.ERROR); + + when(asyncUploadJobService.add(any(AsyncUploadJobEntry.class))).thenReturn(initialEntry); + when(asyncUploadJobService.update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class))) + .thenReturn(runningEntry) + .thenReturn(errorEntry); + when(asyncUploadJobsQuery.singleResult()).thenReturn(runningEntry); + + when(deployFromUrlRemoteClient.downloadFileFromUrl(any())) + .thenThrow(new SLException("Remote client error")); + + try (MockedStatic mockedSecurityContext = mockStatic(SecurityContextUtil.class)) { + mockedSecurityContext.when(SecurityContextUtil::getUsername) + .thenReturn(USERNAME); + + AsyncUploadJobEntry result = asyncUploadJobExecutor.executeUploadFromUrl( + SPACE_GUID, NAMESPACE, FILE_URL, DECODED_URL, userCredentials); + + verify(asyncUploadJobService).add(any(AsyncUploadJobEntry.class)); + verify(deployFromUrlExecutor).submit(any(Runnable.class)); + verify(asyncUploadJobService, times(3)).update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class)); + } + } + + @Test + void testJobExecutionWithFileServiceException() throws Exception { + AsyncUploadJobEntry initialEntry = createInitialJobEntry(); + AsyncUploadJobEntry runningEntry = ImmutableAsyncUploadJobEntry.copyOf(initialEntry) + .withState(AsyncUploadJobEntry.State.RUNNING) + .withStartedAt(LocalDateTime.now()); + AsyncUploadJobEntry errorEntry = ImmutableAsyncUploadJobEntry.copyOf(runningEntry) + .withState(AsyncUploadJobEntry.State.ERROR); + + when(asyncUploadJobService.add(any(AsyncUploadJobEntry.class))).thenReturn(initialEntry); + when(asyncUploadJobService.update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class))) + .thenReturn(runningEntry) + .thenReturn(errorEntry); + when(asyncUploadJobsQuery.singleResult()).thenReturn(runningEntry); + + setupSuccessfulRemoteClientMocks(); + + when(fileService.addFile(any(FileEntry.class), any(InputStream.class))) + .thenThrow(new RuntimeException(ERROR_MESSAGE)); + + try (MockedStatic mockedSecurityContext = mockStatic(SecurityContextUtil.class)) { + mockedSecurityContext.when(SecurityContextUtil::getUsername) + .thenReturn(USERNAME); + + AsyncUploadJobEntry result = asyncUploadJobExecutor.executeUploadFromUrl( + SPACE_GUID, NAMESPACE, FILE_URL, DECODED_URL, userCredentials); + + verify(asyncUploadJobService).add(any(AsyncUploadJobEntry.class)); + verify(deployFromUrlExecutor).submit(any(Runnable.class)); + verify(asyncUploadJobService, times(4)).update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class)); + } + } + + @Test + void testJobExecutionWithDescriptorParsingException() throws Exception { + AsyncUploadJobEntry initialEntry = createInitialJobEntry(); + AsyncUploadJobEntry runningEntry = ImmutableAsyncUploadJobEntry.copyOf(initialEntry) + .withState(AsyncUploadJobEntry.State.RUNNING) + .withStartedAt(LocalDateTime.now()); + AsyncUploadJobEntry errorEntry = ImmutableAsyncUploadJobEntry.copyOf(runningEntry) + .withState(AsyncUploadJobEntry.State.ERROR); + + FileEntry fileEntry = createFileEntry(); + + when(asyncUploadJobService.add(any(AsyncUploadJobEntry.class))).thenReturn(initialEntry); + when(asyncUploadJobService.update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class))) + .thenReturn(runningEntry) + .thenReturn(errorEntry); + when(asyncUploadJobsQuery.singleResult()).thenReturn(runningEntry); + + setupSuccessfulRemoteClientMocks(); + when(fileService.addFile(any(FileEntry.class), any(InputStream.class))).thenReturn(fileEntry); + + when(fileService.processFileContent(anyString(), anyString(), any())) + .thenThrow(new RuntimeException(ERROR_MESSAGE)); + + try (MockedStatic mockedSecurityContext = mockStatic(SecurityContextUtil.class)) { + mockedSecurityContext.when(SecurityContextUtil::getUsername) + .thenReturn(USERNAME); + + AsyncUploadJobEntry result = asyncUploadJobExecutor.executeUploadFromUrl( + SPACE_GUID, NAMESPACE, FILE_URL, DECODED_URL, userCredentials); + + verify(asyncUploadJobService).add(any(AsyncUploadJobEntry.class)); + verify(deployFromUrlExecutor).submit(any(Runnable.class)); + verify(asyncUploadJobService, times(4)).update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class)); + } + } + + @Test + void testJobMonitoringWithMultipleUpdates() throws Exception { + AsyncUploadJobEntry initialEntry = createInitialJobEntry(); + AsyncUploadJobEntry runningEntry1 = ImmutableAsyncUploadJobEntry.copyOf(initialEntry) + .withState(AsyncUploadJobEntry.State.RUNNING) + .withStartedAt(LocalDateTime.now()) + .withBytesRead(100L); + AsyncUploadJobEntry runningEntry2 = ImmutableAsyncUploadJobEntry.copyOf(runningEntry1) + .withBytesRead(500L) + .withUpdatedAt(LocalDateTime.now()); + AsyncUploadJobEntry runningEntry3 = ImmutableAsyncUploadJobEntry.copyOf(runningEntry2) + .withBytesRead(800L) + .withUpdatedAt(LocalDateTime.now()); + AsyncUploadJobEntry finishedEntry = ImmutableAsyncUploadJobEntry.copyOf(runningEntry3) + .withState(AsyncUploadJobEntry.State.FINISHED) + .withFileId(FILE_ID) + .withMtaId(MTA_ID) + .withFinishedAt(LocalDateTime.now()); + + FileEntry fileEntry = createFileEntry(); + + when(asyncUploadJobService.add(any(AsyncUploadJobEntry.class))).thenReturn(initialEntry); + when(asyncUploadJobService.update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class))) + .thenReturn(runningEntry1) + .thenReturn(runningEntry2) + .thenReturn(runningEntry3) + .thenReturn(finishedEntry); + + when(asyncUploadJobsQuery.singleResult()) + .thenReturn(runningEntry1) + .thenReturn(runningEntry2) + .thenReturn(runningEntry3) + .thenReturn(finishedEntry); + + setupSuccessfulRemoteClientMocks(); + when(fileService.addFile(any(FileEntry.class), any(InputStream.class))).thenReturn(fileEntry); + when(fileService.processFileContent(anyString(), anyString(), any())).thenReturn(deploymentDescriptor); + when(descriptorParserFacade.parseDeploymentDescriptor(anyString())).thenReturn(deploymentDescriptor); + + try (MockedStatic mockedSecurityContext = mockStatic(SecurityContextUtil.class)) { + mockedSecurityContext.when(SecurityContextUtil::getUsername) + .thenReturn(USERNAME); + + AsyncUploadJobEntry result = asyncUploadJobExecutor.executeUploadFromUrl( + SPACE_GUID, NAMESPACE, FILE_URL, DECODED_URL, userCredentials); + + verify(asyncUploadJobService, times(4)).update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class)); + verify(asyncUploadJobsQuery, times(4)).singleResult(); + } + } + + @Test + void testJobCreationWithCorrectProperties() { + AsyncUploadJobEntry initialEntry = createInitialJobEntry(); + AsyncUploadJobEntry runningEntry = ImmutableAsyncUploadJobEntry.copyOf(initialEntry) + .withState(AsyncUploadJobEntry.State.RUNNING) + .withStartedAt(LocalDateTime.now()); + + when(asyncUploadJobService.add(any(AsyncUploadJobEntry.class))).thenReturn(initialEntry); + when(asyncUploadJobService.update(any(AsyncUploadJobEntry.class), any(AsyncUploadJobEntry.class))) + .thenReturn(runningEntry); + when(asyncUploadJobsQuery.singleResult()).thenReturn(runningEntry); + + // Mock the executor to NOT run the task to avoid the full workflow + doAnswer(invocation -> { + deployFromUrlExecutorCalled.set(true); + // Don't run the runnable - just mark that it was called + return null; + }).when(deployFromUrlExecutor) + .submit(any(Runnable.class)); + + try (MockedStatic mockedSecurityContext = mockStatic(SecurityContextUtil.class)) { + mockedSecurityContext.when(SecurityContextUtil::getUsername) + .thenReturn(USERNAME); + + AsyncUploadJobEntry result = asyncUploadJobExecutor.executeUploadFromUrl( + SPACE_GUID, NAMESPACE, FILE_URL, DECODED_URL, userCredentials); + + assertEquals(SPACE_GUID, result.getSpaceGuid()); + assertEquals(NAMESPACE, result.getNamespace()); + assertEquals(FILE_URL, result.getUrl()); + assertEquals(USERNAME, result.getUser()); + assertEquals(AsyncUploadJobEntry.State.INITIAL, result.getState()); + assertEquals(0, result.getInstanceIndex()); + assertEquals(0L, result.getBytesRead()); + + verify(asyncUploadJobService).add(any(AsyncUploadJobEntry.class)); + verify(deployFromUrlExecutor).submit(any(Runnable.class)); + } + } + + @Test + void testAsyncExecutorSubmission() { + AsyncUploadJobEntry initialEntry = createInitialJobEntry(); + when(asyncUploadJobService.add(any(AsyncUploadJobEntry.class))).thenReturn(initialEntry); + + // Mock the executor to NOT run the task to avoid the full workflow + doAnswer(invocation -> { + deployFromUrlExecutorCalled.set(true); + // Don't run the runnable - just mark that it was called + return null; + }).when(deployFromUrlExecutor) + .submit(any(Runnable.class)); + + try (MockedStatic mockedSecurityContext = mockStatic(SecurityContextUtil.class)) { + mockedSecurityContext.when(SecurityContextUtil::getUsername) + .thenReturn(USERNAME); + + asyncUploadJobExecutor.executeUploadFromUrl(SPACE_GUID, NAMESPACE, FILE_URL, DECODED_URL, userCredentials); + + verify(deployFromUrlExecutor).submit(any(Runnable.class)); + } + } + + private AsyncUploadJobEntry createInitialJobEntry() { + return ImmutableAsyncUploadJobEntry.builder() + .id(JOB_ID) + .user(USERNAME) + .addedAt(LocalDateTime.now()) + .spaceGuid(SPACE_GUID) + .namespace(NAMESPACE) + .instanceIndex(0) + .url(FILE_URL) + .state(AsyncUploadJobEntry.State.INITIAL) + .updatedAt(LocalDateTime.now()) + .bytesRead(0L) + .build(); + } + + private FileEntry createFileEntry() { + return ImmutableFileEntry.builder() + .id(FILE_ID) + .space(SPACE_GUID) + .namespace(NAMESPACE) + .name("file.mtar") + .size(BigInteger.valueOf(NORMAL_FILE_SIZE)) + .build(); + } + + private void setupSuccessfulRemoteClientMocks() throws Exception { + ByteArrayInputStream inputStream = new ByteArrayInputStream("test content".getBytes()); + FileFromUrlData fileFromUrlData = new FileFromUrlData(inputStream, URI.create(FILE_URL), NORMAL_FILE_SIZE); + when(deployFromUrlRemoteClient.downloadFileFromUrl(any())).thenReturn(fileFromUrlData); + } + + private static class TestableAsyncUploadJobOrchestrator extends AsyncUploadJobOrchestrator { + + public TestableAsyncUploadJobOrchestrator(ExecutorService asyncFileUploadExecutor, ExecutorService deployFromUrlExecutor, + ApplicationConfiguration applicationConfiguration, + AsyncUploadJobService asyncUploadJobService, + FileService fileService, DescriptorParserFacadeFactory descriptorParserFactory, + DeployFromUrlRemoteClient deployFromUrlRemoteClient) { + super(asyncFileUploadExecutor, deployFromUrlExecutor, applicationConfiguration, + asyncUploadJobService, fileService, descriptorParserFactory, deployFromUrlRemoteClient); + } + + @Override + protected void waitBetweenUpdates() { + // do nothing + } + + @Override + protected ResilientOperationExecutor getResilientOperationExecutor() { + return new ResilientOperationExecutor() { + @Override + public T execute(CheckedSupplier operation) throws Exception { + return operation.get(); + } + }; + } + } +} diff --git a/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClientTest.java b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClientTest.java new file mode 100644 index 0000000000..1611dc5976 --- /dev/null +++ b/multiapps-controller-web/src/test/java/org/cloudfoundry/multiapps/controller/web/upload/client/DeployFromUrlRemoteClientTest.java @@ -0,0 +1,272 @@ +package org.cloudfoundry.multiapps.controller.web.upload.client; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.text.MessageFormat; +import java.util.OptionalLong; +import org.cloudfoundry.multiapps.common.SLException; +import org.cloudfoundry.multiapps.controller.api.model.UserCredentials; +import org.cloudfoundry.multiapps.controller.client.util.CheckedSupplier; +import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor; +import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration; +import org.cloudfoundry.multiapps.controller.web.Constants; +import org.cloudfoundry.multiapps.controller.web.Messages; +import org.cloudfoundry.multiapps.controller.web.upload.UploadFromUrlContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class DeployFromUrlRemoteClientTest { + + private static final String SECURE_URL = "https://example.com/file.zip"; + private static final String INSECURE_URL = "http://example.com/file.zip"; + private static final String URL_WITH_CREDENTIALS = "https://user:pass@example.com/file.zip"; + private static final String URL_WITH_ENCODED_CHARS = "https://example.com/file%20with%20spaces.zip"; + private static final long FILE_SIZE = 1024L; + private static final long MAX_UPLOAD_SIZE = 2048L; + + @Mock + private ApplicationConfiguration applicationConfiguration; + + @Mock + private HttpClient httpClient; + + @Mock + private HttpResponse httpResponse; + + @Mock + private HttpHeaders httpHeaders; + + @Mock + private UserCredentials userCredentials; + + @Mock + private UploadFromUrlContext uploadContext; + + private TestableDeployFromUrlRemoteClient client; + + private class TestableDeployFromUrlRemoteClient extends DeployFromUrlRemoteClient { + + public TestableDeployFromUrlRemoteClient(ApplicationConfiguration applicationConfiguration) { + super(applicationConfiguration); + } + + @Override + protected HttpClient buildHttpClient() { + return httpClient; + } + + @Override + protected ResilientOperationExecutor getResilientOperationExecutor() { + return new ResilientOperationExecutor() { + @Override + public T execute(CheckedSupplier operation) throws Exception { + return operation.get(); + } + }; + } + } + + @BeforeEach + void setUp() throws Exception { + MockitoAnnotations.openMocks(this) + .close(); + client = new TestableDeployFromUrlRemoteClient(applicationConfiguration); + when(applicationConfiguration.getMaxUploadSize()).thenReturn(MAX_UPLOAD_SIZE); + when(uploadContext.getFileUrl()).thenReturn(SECURE_URL); + when(uploadContext.getUserCredentials()).thenReturn(userCredentials); + when(userCredentials.getUsername()).thenReturn("testUser"); + when(userCredentials.getPassword()).thenReturn("testPassword"); + } + + @Test + void downloadFileFromUrlSuccess() throws Exception { + InputStream inputStream = new ByteArrayInputStream("test content".getBytes()); + when(httpResponse.body()).thenReturn(inputStream); + when(httpResponse.headers()).thenReturn(httpHeaders); + when(httpResponse.uri()).thenReturn(URI.create(SECURE_URL)); + when(httpResponse.statusCode()).thenReturn(200); + when(httpHeaders.firstValueAsLong(Constants.CONTENT_LENGTH)).thenReturn(OptionalLong.of(FILE_SIZE)); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + FileFromUrlData result = client.downloadFileFromUrl(uploadContext); + + assertEquals(inputStream, result.fileInputStream()); + assertEquals(URI.create(SECURE_URL), result.uri()); + assertEquals(FILE_SIZE, result.fileSize()); + } + + @Test + void downloadFileFromUrlInsecureUrl() { + when(uploadContext.getFileUrl()).thenReturn(INSECURE_URL); + Exception exception = assertThrows(SLException.class, + () -> client.downloadFileFromUrl(uploadContext)); + assertEquals(Messages.MTAR_ENDPOINT_NOT_SECURE, exception.getMessage()); + } + + @Test + void downloadFileFromUrlNoContentLength() throws Exception { + when(httpResponse.headers()).thenReturn(httpHeaders); + when(httpResponse.statusCode()).thenReturn(200); + when(httpHeaders.firstValueAsLong(Constants.CONTENT_LENGTH)).thenReturn(OptionalLong.empty()); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + SLException exception = assertThrows(SLException.class, + () -> client.downloadFileFromUrl(uploadContext)); + assertEquals(Messages.FILE_URL_RESPONSE_DID_NOT_RETURN_CONTENT_LENGTH, exception.getMessage()); + + } + + @Test + void downloadFileFromUrlFileSizeExceedsLimit() throws Exception { + long oversizedFile = MAX_UPLOAD_SIZE + 1; + when(httpResponse.headers()).thenReturn(httpHeaders); + when(httpResponse.statusCode()).thenReturn(200); + when(httpHeaders.firstValueAsLong(Constants.CONTENT_LENGTH)).thenReturn(OptionalLong.of(oversizedFile)); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + SLException exception = assertThrows(SLException.class, + () -> client.downloadFileFromUrl(uploadContext)); + assertEquals(MessageFormat.format(Messages.MAX_UPLOAD_SIZE_EXCEEDED, MAX_UPLOAD_SIZE), + exception.getMessage()); + + } + + @Test + void callRemoteEndpointWithRetrySuccess() throws Exception { + InputStream inputStream = new ByteArrayInputStream("test content".getBytes()); + when(httpResponse.statusCode()).thenReturn(200); + when(httpResponse.body()).thenReturn(inputStream); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + when(httpResponse.headers()).thenReturn(httpHeaders); + when(httpHeaders.firstValueAsLong(Constants.CONTENT_LENGTH)).thenReturn(OptionalLong.of(FILE_SIZE)); + + FileFromUrlData result = client.downloadFileFromUrl(uploadContext); + verify(httpClient).send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream())); + assertEquals(1024, result.fileSize()); + } + + @Test + void callRemoteEndpointWithRetryUnauthorizedError() throws Exception { + InputStream errorStream = new ByteArrayInputStream("Unauthorized".getBytes()); + when(httpResponse.statusCode()).thenReturn(401); + when(httpResponse.body()).thenReturn(errorStream); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + SLException exception = assertThrows(SLException.class, + () -> client.downloadFileFromUrl(uploadContext)); + assertEquals("Credentials to https://example.com/file.zip are wrong. Make sure that they are correct.", exception.getMessage()); + } + + @Test + void callRemoteEndpointWithRetryServerError() throws Exception { + InputStream errorStream = new ByteArrayInputStream("Internal Server Error".getBytes()); + when(httpResponse.statusCode()).thenReturn(500); + when(httpResponse.body()).thenReturn(errorStream); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + SLException exception = assertThrows(SLException.class, + () -> client.downloadFileFromUrl(uploadContext)); + assertTrue(exception.getMessage() + .contains("500")); + + } + + @Test + void downloadFileFromUrlWithNullUserCredentials() throws Exception { + when(uploadContext.getUserCredentials()).thenReturn(null); + InputStream inputStream = new ByteArrayInputStream("test content".getBytes()); + when(httpResponse.body()).thenReturn(inputStream); + when(httpResponse.headers()).thenReturn(httpHeaders); + when(httpResponse.uri()).thenReturn(URI.create(SECURE_URL)); + when(httpResponse.statusCode()).thenReturn(200); + when(httpHeaders.firstValueAsLong(Constants.CONTENT_LENGTH)).thenReturn(OptionalLong.of(FILE_SIZE)); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + FileFromUrlData result = client.downloadFileFromUrl(uploadContext); + + assertEquals(inputStream, result.fileInputStream()); + assertEquals(URI.create(SECURE_URL), result.uri()); + assertEquals(FILE_SIZE, result.fileSize()); + + } + + @Test + void resilientOperationExecutorExceptionHandling() throws Exception { + Exception testException = new RuntimeException("Test exception"); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenThrow(testException); + + RuntimeException exception = assertThrows(RuntimeException.class, + () -> client.downloadFileFromUrl(uploadContext)); + assertEquals("Test exception", exception.getMessage()); + + } + + @Test + void downloadFileFromUrlWithCredentialsInUrl() throws Exception { + when(uploadContext.getFileUrl()).thenReturn(URL_WITH_CREDENTIALS); + when(uploadContext.getUserCredentials()).thenReturn(null); + InputStream inputStream = new ByteArrayInputStream("test content".getBytes()); + when(httpResponse.body()).thenReturn(inputStream); + when(httpResponse.headers()).thenReturn(httpHeaders); + when(httpResponse.uri()).thenReturn(URI.create("https://example.com/file.zip")); + when(httpResponse.statusCode()).thenReturn(200); + when(httpHeaders.firstValueAsLong(Constants.CONTENT_LENGTH)).thenReturn(OptionalLong.of(FILE_SIZE)); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + FileFromUrlData result = client.downloadFileFromUrl(uploadContext); + + assertEquals(inputStream, result.fileInputStream()); + assertEquals(URI.create("https://example.com/file.zip"), result.uri()); + assertEquals(FILE_SIZE, result.fileSize()); + + verify(httpClient).send(argThat(request -> + !request.uri() + .toString() + .contains("user:pass@") && + request.headers() + .firstValue("Authorization") + .isPresent() && + request.headers() + .firstValue("Authorization") + .get() + .startsWith("Basic ") + ), eq(HttpResponse.BodyHandlers.ofInputStream())); + + } + + @Test + void downloadFileFromUrlWithEncodedCharacters() throws Exception { + when(uploadContext.getFileUrl()).thenReturn(URL_WITH_ENCODED_CHARS); + InputStream inputStream = new ByteArrayInputStream("test content".getBytes()); + when(httpResponse.body()).thenReturn(inputStream); + when(httpResponse.headers()).thenReturn(httpHeaders); + when(httpResponse.uri()).thenReturn(URI.create("https://example.com/file%20with%20spaces.zip")); + when(httpResponse.statusCode()).thenReturn(200); + when(httpHeaders.firstValueAsLong(Constants.CONTENT_LENGTH)).thenReturn(OptionalLong.of(FILE_SIZE)); + when(httpClient.send(any(HttpRequest.class), eq(HttpResponse.BodyHandlers.ofInputStream()))).thenReturn(httpResponse); + + FileFromUrlData result = client.downloadFileFromUrl(uploadContext); + + assertEquals(inputStream, result.fileInputStream()); + assertEquals(URI.create("https://example.com/file%20with%20spaces.zip"), result.uri()); + assertEquals(FILE_SIZE, result.fileSize()); + + } + +}