-
Notifications
You must be signed in to change notification settings - Fork 42
Deploy from url split #1715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deploy from url split #1715
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" | ||
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
| xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog | ||
| http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.6.xsd"> | ||
|
|
||
| <changeSet author="sap.com" id="add_columns_to_async_upload_job"> | ||
| <preConditions> | ||
| <tableExists tableName="async_upload_job"/> | ||
| </preConditions> | ||
| <addColumn tableName="async_upload_job"> | ||
| <column name="bytes_read" type="BIGINT"/> | ||
| </addColumn> | ||
| <addColumn tableName="async_upload_job"> | ||
| <column name="updated_at" type="TIMESTAMP"/> | ||
| </addColumn> | ||
| </changeSet> | ||
|
|
||
| </databaseChangeLog> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,16 +8,14 @@ | |
| import java.util.concurrent.SynchronousQueue; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import jakarta.inject.Inject; | ||
| import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration; | ||
| import org.cloudfoundry.multiapps.controller.process.util.PriorityCallable; | ||
| import org.cloudfoundry.multiapps.controller.process.util.PriorityFuture; | ||
| import org.cloudfoundry.multiapps.controller.process.util.PriorityFutureComparator; | ||
| import org.springframework.context.annotation.Bean; | ||
| import org.springframework.context.annotation.Configuration; | ||
|
|
||
| import jakarta.inject.Inject; | ||
|
|
||
| @Configuration | ||
| public class FileUploadThreadPoolConfiguration { | ||
|
|
||
|
|
@@ -75,4 +73,17 @@ public ExecutorService asyncFileUploadExecutor(LinkedBlockingQueue<Runnable> fil | |
| fileUploadFromUrlQueue); | ||
| } | ||
|
|
||
| @Bean(name = "deployFromUrlExecutor") | ||
s-yonkov-yonkov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| public ExecutorService deployFromUrlExecutor() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this is needed when we have asyncFileUploadExecutor?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The asyncFileUploadExecutor is used for the actual upload. It has a queue of 20 which means it can store 20 jobs in the queue before starting a new thread. |
||
| return new ThreadPoolExecutor(5, | ||
| // The max thread count should match the maximum capacity of asyncFileUploadExecutor (queue size + max threads). | ||
| // A lower value may cause unnecessary task rejections. | ||
| // A higher value may cause job failures when asyncFileUploadExecutor becomes full. | ||
| applicationConfiguration.getDeployFromUrlExecutorMaxThreads(), | ||
| // As the threads are only updating a row and waiting it is ok to have more threads | ||
| 30, | ||
| TimeUnit.SECONDS, | ||
| new SynchronousQueue<>()); // A synchronous queue is used so deploy from url jobs immediately start | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will happen if all threads are busy and there is already one task in the queue waiting for execution? What will happen with all other threads which tries to submit a new task?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This queue does not store any jobs, if there is a free thread (up to 50) it will immediately start the job, if there are no available threads (50 are used) it will reject the job. According to statistics there are maximum of around ~320 started operations on eu10-004 per hour (most use normal file upload), so 450 threads seems more than enough.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If backend reject file upload task, does the cli will retry on another instance or it will retry on the same one?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will check in the plugin
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, usually hit the same instance because of sticky sessions, except the last retry... |
||
| // a new thread that updates the database job entry | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| package org.cloudfoundry.multiapps.controller.process.stream; | ||
|
|
||
| import java.io.InputStream; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import org.apache.commons.io.input.ProxyInputStream; | ||
|
|
||
| public class CountingInputStream extends ProxyInputStream { | ||
|
|
||
| private final AtomicLong bytes; | ||
|
|
||
| public CountingInputStream(InputStream proxy, AtomicLong counterRef) { | ||
| super(proxy); | ||
| bytes = counterRef; | ||
| } | ||
|
|
||
| @Override | ||
| protected void afterRead(int n) { | ||
s-yonkov-yonkov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (n > 0) { | ||
| bytes.addAndGet(n); | ||
| } | ||
| } | ||
|
|
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.