Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public final class Messages {
public static final String ABORTED_OPERATIONS_TTL_IN_SECONDS = "Aborted operations TTL in seconds: {0}";
public static final String SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = "Spring scheduler task executor threads: {0}";
public static final String FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = "Files async executor max threads: {0}";
public static final String DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = "Deploy from URL executor max threads: {0}";
public static final String ON_START_FILES_CLEANER_WITHOUT_CONTENT_ENABLED_0 = "On start files cleaner without content enabled: {0}";
public static final String THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER_0 = "Threads for file upload to controller: {0}";
public static final String THREADS_FOR_FILE_STORAGE_UPLOAD_0 = "Threads for file storage upload: {0}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -27,7 +26,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.support.CronExpression;

import static java.text.MessageFormat.format;

@Named
Expand Down Expand Up @@ -96,6 +94,7 @@ public class ApplicationConfiguration {
static final String CFG_ABORTED_OPERATIONS_TTL_IN_MINUTES = "ABORTED_OPERATIONS_TTL_IN_SECONDS";
static final String CFG_SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = "SPRING_SCHEDULER_TASK_EXECUTOR_THREADS";
static final String CFG_FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = "FILES_ASYNC_UPLOAD_EXECUTOR_THREADS";
static final String CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = "DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS";
static final String CFG_ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER = "ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER";
static final String CFG_THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER = "THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER";
static final String CFG_THREADS_FOR_FILE_STORAGE_UPLOAD = "THREADS_FOR_FILE_STORAGE_UPLOAD";
Expand Down Expand Up @@ -153,6 +152,7 @@ public class ApplicationConfiguration {
public static final String DEFAULT_GLOBAL_AUDITOR_ORIGIN = "uaa";
public static final int DEFAULT_SPRING_SCHEDULER_TASK_EXECUTOR_THREADS = 3;
public static final int DEFAULT_FILES_ASYNC_UPLOAD_EXECUTOR_MAX_THREADS = 6;
public static final int DEFAULT_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS = 26;
public static final boolean DEFAULT_ENABLE_ON_START_FILES_WITHOUT_CONTENT_CLEANER = false;
public static final int DEFAULT_THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER = 6;
public static final int DEFAULT_THREADS_FOR_FILE_STORAGE_UPLOAD = 7;
Expand Down Expand Up @@ -211,6 +211,7 @@ public class ApplicationConfiguration {
private Integer abortedOperationsTtlInSeconds;
private Integer springSchedulerTaskExecutorThreads;
private Integer filesAsyncUploadExecutorThreads;
private Integer deployFromUrlExecutorMaxThreads;
private Boolean isOnStartFilesWithoutContentCleanerEnabledThroughEnvironment;
private Integer threadsForFileUploadToController;
private Integer threadsForFileStorageUpload;
Expand Down Expand Up @@ -264,6 +265,7 @@ public void load() {
getServiceHandlingMaxParallelThreads();
getAbortedOperationsTtlInSeconds();
getFilesAsyncUploadExecutorMaxThreads();
getDeployFromUrlExecutorMaxThreads();
getObjectStoreRegions();
getIsReadinessHealthCheckEnabled();
}
Expand Down Expand Up @@ -407,6 +409,13 @@ public Integer getFilesAsyncUploadExecutorMaxThreads() {
return filesAsyncUploadExecutorThreads;
}

public Integer getDeployFromUrlExecutorMaxThreads() {
if (deployFromUrlExecutorMaxThreads == null) {
deployFromUrlExecutorMaxThreads = getDeployFromUrlExecutorMaxThreadsFromEnvironment();
}
return deployFromUrlExecutorMaxThreads;
}

public String getGlobalAuditorUser() {
if (globalAuditorUser == null) {
globalAuditorUser = getGlobalAuditorUserFromEnvironment();
Expand Down Expand Up @@ -832,6 +841,12 @@ private Integer getFilesAsyncUploadExecutorMaxThreadsFromEnvironment() {
return value;
}

private Integer getDeployFromUrlExecutorMaxThreadsFromEnvironment() {
Integer value = environment.getInteger(CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, DEFAULT_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS);
logEnvironmentVariable(CFG_DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, Messages.DEPLOY_FROM_URL_EXECUTOR_MAX_THREADS, value);
return value;
}

private String getGlobalAuditorUserFromEnvironment() {
String value = environment.getString(CFG_GLOBAL_AUDITOR_USER);
return value;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.cloudfoundry.multiapps.controller.persistence.dto;

import java.time.LocalDateTime;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;

import org.cloudfoundry.multiapps.controller.persistence.model.PersistenceMetadata;

@Entity
Expand All @@ -31,6 +29,8 @@ private AttributeNames() {
public static final String FILE_ID = "fileId";
public static final String ERROR = "error";
public static final String INSTANCE_INDEX = "instanceIndex";
public static final String BYTES_READ = "bytesRead";
public static final String UPDATED_AT = "updatedAt";
}

@Id
Expand Down Expand Up @@ -73,13 +73,19 @@ private AttributeNames() {
@Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_INSTANCE_INDEX, nullable = false)
private Integer instanceIndex;

@Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_BYTES_READ)
private Long bytesRead;

@Column(name = PersistenceMetadata.TableColumnNames.ASYNC_UPLOAD_JOB_UPDATED_AT)
private LocalDateTime updatedAt;

public AsyncUploadJobDto() {
// Required by JPA
}

public AsyncUploadJobDto(String id, String mtaUser, String state, String url, LocalDateTime addedAt, LocalDateTime startedAt,
LocalDateTime finishedAt, String namespace, String spaceGuid, String mtaId, String fileId, String error,
Integer instanceIndex) {
Integer instanceIndex, Long bytesRead, LocalDateTime updatedAt) {
this.id = id;
this.mtaUser = mtaUser;
this.state = state;
Expand All @@ -93,6 +99,8 @@ public AsyncUploadJobDto(String id, String mtaUser, String state, String url, Lo
this.fileId = fileId;
this.error = error;
this.instanceIndex = instanceIndex;
this.bytesRead = bytesRead;
this.updatedAt = updatedAt;
}

@Override
Expand Down Expand Up @@ -201,22 +209,40 @@ public void setInstanceIndex(Integer instanceIndex) {
this.instanceIndex = instanceIndex;
}

public Long getBytesRead() {
return bytesRead;
}

public void setBytesRead(Long bytesRead) {
this.bytesRead = bytesRead;
}

public LocalDateTime getUpdatedAt() {
return updatedAt;
}

public void setUpdatedAt(LocalDateTime updatedAt) {
this.updatedAt = updatedAt;
}

@Override
public String toString() {
return "AsyncUploadJobDto{" +
"id='" + id + '\'' +
", mtaUser='" + mtaUser + '\'' +
", state='" + state + '\'' +
", url='" + url + '\'' +
", addedAt=" + addedAt +
", startedAt=" + startedAt +
", finishedAt=" + finishedAt +
", namespace='" + namespace + '\'' +
", spaceGuid='" + spaceGuid + '\'' +
", mtaId='" + mtaId + '\'' +
", fileId='" + fileId + '\'' +
", error='" + error + '\'' +
", instanceIndex=" + instanceIndex +
'}';
"id='" + id + '\'' +
", mtaUser='" + mtaUser + '\'' +
", state='" + state + '\'' +
", url='" + url + '\'' +
", addedAt=" + addedAt +
", startedAt=" + startedAt +
", finishedAt=" + finishedAt +
", namespace='" + namespace + '\'' +
", spaceGuid='" + spaceGuid + '\'' +
", mtaId='" + mtaId + '\'' +
", fileId='" + fileId + '\'' +
", error='" + error + '\'' +
", instanceIndex=" + instanceIndex +
", bytesRead=" + bytesRead +
", updatedAt=" + updatedAt +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.cloudfoundry.multiapps.controller.persistence.model;

import java.time.LocalDateTime;

import org.cloudfoundry.multiapps.common.Nullable;
import org.immutables.value.Value;

Expand Down Expand Up @@ -44,4 +43,10 @@ enum State {
String getMtaId();

Integer getInstanceIndex();

@Nullable
Long getBytesRead();

@Nullable
LocalDateTime getUpdatedAt();
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ private TableColumnNames() {
public static final String ASYNC_UPLOAD_JOB_FILE_ID = "file_id";
public static final String ASYNC_UPLOAD_JOB_ERROR = "error";
public static final String ASYNC_UPLOAD_JOB_INSTANCE_INDEX = "instance_index";
public static final String ASYNC_UPLOAD_JOB_BYTES_READ = "bytes_read";
public static final String ASYNC_UPLOAD_JOB_UPDATED_AT = "updated_at";

public static final String BACKUP_DESCRIPTOR_ID = "id";
public static final String BACKUP_DESCRIPTOR_DESCRIPTOR = "descriptor";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.persistence.EntityManagerFactory;

import org.cloudfoundry.multiapps.common.ConflictException;
import org.cloudfoundry.multiapps.common.NotFoundException;
import org.cloudfoundry.multiapps.controller.persistence.Messages;
Expand Down Expand Up @@ -58,6 +57,8 @@ public AsyncUploadJobEntry fromDto(AsyncUploadJobDto dto) {
.fileId(dto.getFileId())
.error(dto.getError())
.instanceIndex(dto.getInstanceIndex())
.bytesRead(dto.getBytesRead())
.updatedAt(dto.getUpdatedAt())
.build();
}

Expand All @@ -76,7 +77,9 @@ public AsyncUploadJobDto toDto(AsyncUploadJobEntry entry) {
entry.getMtaId(),
entry.getFileId(),
entry.getError(),
entry.getInstanceIndex());
entry.getInstanceIndex(),
entry.getBytesRead(),
entry.getUpdatedAt());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<property name="eclipselink.weaving" value="static" />
<property name="eclipselink.logging.logger"
value="org.eclipse.persistence.logging.slf4j.SLF4JLogger" />
<property name="eclipselink.cache.shared.default" value="false"/>
</properties>
</persistence-unit>
</persistence>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.6.xsd">

<changeSet author="sap.com" id="add_columns_to_async_upload_job">
<preConditions>
<tableExists tableName="async_upload_job"/>
</preConditions>
<addColumn tableName="async_upload_job">
<column name="bytes_read" type="BIGINT"/>
</addColumn>
<addColumn tableName="async_upload_job">
<column name="updated_at" type="TIMESTAMP"/>
</addColumn>
</changeSet>

</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@
<include file="/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-1.183.0-persistence.xml"/>

<include file="/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-1.192.0-persistence.xml"/>

<include file="/org/cloudfoundry/multiapps/controller/persistence/db/changelog/db-changelog-2.35.0-persistence.xml"/>

</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;
import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration;
import org.cloudfoundry.multiapps.controller.process.util.PriorityCallable;
import org.cloudfoundry.multiapps.controller.process.util.PriorityFuture;
import org.cloudfoundry.multiapps.controller.process.util.PriorityFutureComparator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import jakarta.inject.Inject;

@Configuration
public class FileUploadThreadPoolConfiguration {

Expand Down Expand Up @@ -75,4 +73,17 @@ public ExecutorService asyncFileUploadExecutor(LinkedBlockingQueue<Runnable> fil
fileUploadFromUrlQueue);
}

@Bean(name = "deployFromUrlExecutor")
public ExecutorService deployFromUrlExecutor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is needed when we have asyncFileUploadExecutor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The asyncFileUploadExecutor is used for the actual upload. It has a queue of 20 which means it can store 20 jobs in the queue before starting a new thread.
Such executor will not scale well under load it might not update the update_at column in the database and the job might be consider as stale.
The goal of deployFromUrlExecutor is to update regularly updated_at value in the table and trigger the other executor to schedule the upload.

return new ThreadPoolExecutor(5,
// The max thread count should match the maximum capacity of asyncFileUploadExecutor (queue size + max threads).
// A lower value may cause unnecessary task rejections.
// A higher value may cause job failures when asyncFileUploadExecutor becomes full.
applicationConfiguration.getDeployFromUrlExecutorMaxThreads(),
// As the threads are only updating a row and waiting it is ok to have more threads
30,
TimeUnit.SECONDS,
new SynchronousQueue<>()); // A synchronous queue is used so deploy from url jobs immediately start
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if all threads are busy and there is already one task in the queue waiting for execution? What will happen with all other threads which tries to submit a new task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This queue does not store any jobs, if there is a free thread (up to 50) it will immediately start the job, if there are no available threads (50 are used) it will reject the job. According to statistics there are maximum of around ~320 started operations on eu10-004 per hour (most use normal file upload), so 450 threads seems more than enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If backend reject file upload task, does the cli will retry on another instance or it will retry on the same one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will check in the plugin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, usually hit the same instance because of sticky sessions, except the last retry...
described in the backlog for the session handling

// a new thread that updates the database job entry
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.cloudfoundry.multiapps.controller.process.stream;

import java.io.InputStream;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.input.ProxyInputStream;

public class CountingInputStream extends ProxyInputStream {

private final AtomicLong bytes;

public CountingInputStream(InputStream proxy, AtomicLong counterRef) {
super(proxy);
bytes = counterRef;
}

@Override
protected void afterRead(int n) {
if (n > 0) {
bytes.addAndGet(n);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public final class Messages {
public static final String NO_VALID_OBJECT_STORE_CONFIGURATION_FOUND = "No valid Object Store configuration found!";
public static final String MISSING_PROPERTIES_FOR_CREATING_THE_SPECIFIC_PROVIDER = "Missing properties for creating the specific provider!";
public static final String DEPLOY_FROM_URL_WRONG_CREDENTIALS = "Credentials to {0} are wrong. Make sure that they are correct.";
public static final String JOB_NOT_UPDATED_FOR_0_SECONDS = "Job not updated for {0} seconds";

public static final String FAILED_TO_CREATE_BLOB_STORE_CONTEXT = "Failed to create BlobStoreContext";

Expand Down Expand Up @@ -54,13 +55,8 @@ public final class Messages {
public static final String CLEARING_FLOWABLE_LOCK_OWNER_THREW_AN_EXCEPTION_0 = "Clearing Flowable lock owner on JVM shutdown threw an exception: {0}";
public static final String FETCHING_FILE_FAILED = "Fetching file {0} in space {1} failed with: {2}";
public static final String ASYNC_UPLOAD_JOB_FAILED = "Async upload job {0} failed with: {1}";
public static final String JOB_0_WAS_NOT_FOUND_IN_THE_RUNNING_TASKS = "Job \"{0}\" was not found in the running tasks";
public static final String JOB_IS_NOT_BEING_EXECUTED = "Job is not being executed";
public static final String JOB_0_EXISTS_IN_STATE_1_BUT_DOES_NOT_EXISTS_IN_THE_RUNNING_TASKS = "Job \"{0}\" exists in state \"{1}\" but does not exists in the running tasks";
public static final String JOB_THREAD_IS_NOT_RUNNING_BUT_STATE_IS_STILL_IN_PROGRESS_UPLOAD_FAILED = "Job thread is not running but state is still in progress. Upload failed";

// WARN log messages
public static final String THE_JOB_EXISTS_BUT_IT_IS_NOT_RUNNING_DELETING = "The job exists but it is not running. Deleting";

// INFO log messages
public static final String ALM_SERVICE_ENV_INITIALIZED = "Deploy service environment initialized";
Expand All @@ -72,14 +68,15 @@ public final class Messages {
public static final String CLEARING_LOCK_OWNER = "Clearing lock owner {0}...";
public static final String CLEARED_LOCK_OWNER = "Cleared lock owner {0}";
public static final String OBJECT_STORE_WITH_PROVIDER_0_CREATED = "Object store with provider: {0} created";
public static final String JOB_WITH_ID_WAS_NOT_UPDATED_WITHIN_SECONDS = "Job with ID: {} was not updated within: {} seconds";

// DEBUG log messages
public static final String RECEIVED_UPLOAD_REQUEST = "Received upload request on URI: {}";
public static final String RECEIVED_UPLOAD_FROM_URL_REQUEST = "Received upload from URL {} request";
public static final String UPLOADED_FILE = "Uploaded file \"{}\" with name {}, size {}, space {}, and digest {} (algorithm {}) for {} ms.";
public static final String ASYNC_UPLOAD_JOB_EXISTS = "Async upload job for URL {} exists: {}";
public static final String CREATING_ASYNC_UPLOAD_JOB = "Creating async upload job for URL {} with ID: {}";
public static final String ASYNC_UPLOAD_JOB_REJECTED = "Async upload job {} rejected. Deleting entry";
public static final String ASYNC_UPLOAD_JOB_REJECTED = "Async upload job with space guid: {}, namespace: {}, URL: {} rejected.";
public static final String STARTING_DOWNLOAD_OF_MTAR = "Starting download of MTAR from remote endpoint: {}";
public static final String UPLOADED_MTAR_FROM_REMOTE_ENDPOINT_AND_JOB_ID = "Uploaded MTAR from remote endpoint {} with job id: {} in {} ms";
public static final String ASYNC_UPLOAD_JOB_FINISHED = "Async upload job {} finished";
Expand Down
Loading