Skip to content

Commit ba24a4f

Browse files
authored
[Fix #1137] Adding Callable Task Proxy builder (#1138)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent a93c52b commit ba24a4f

File tree

5 files changed

+91
-11
lines changed

5 files changed

+91
-11
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.serverlessworkflow.impl.events.EventConsumer;
3030
import io.serverlessworkflow.impl.events.EventPublisher;
3131
import io.serverlessworkflow.impl.events.InMemoryEvents;
32+
import io.serverlessworkflow.impl.executors.CallableTaskProxyBuilder;
3233
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
3334
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
3435
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
@@ -48,6 +49,7 @@
4849
import java.util.Collections;
4950
import java.util.HashMap;
5051
import java.util.HashSet;
52+
import java.util.List;
5153
import java.util.Map;
5254
import java.util.Optional;
5355
import java.util.ServiceLoader;
@@ -83,6 +85,7 @@ public class WorkflowApplication implements AutoCloseable {
8385
private final Optional<URITemplateResolver> templateResolver;
8486
private final Optional<FunctionReader> functionReader;
8587
private final URI defaultCatalogURI;
88+
private final Collection<CallableTaskProxyBuilder> callableProxyBuilders;
8689

8790
private WorkflowApplication(Builder builder) {
8891
this.taskFactory = builder.taskFactory;
@@ -110,6 +113,7 @@ private WorkflowApplication(Builder builder) {
110113
this.functionReader = builder.functionReader;
111114
this.defaultCatalogURI = builder.defaultCatalogURI;
112115
this.id = builder.id;
116+
this.callableProxyBuilders = builder.callableProxyBuilders;
113117
}
114118

115119
public TaskExecutorFactory taskFactory() {
@@ -170,10 +174,10 @@ public SchemaValidator getValidator(SchemaInline inline) {
170174
private String id;
171175
private TaskExecutorFactory taskFactory;
172176
private Collection<ExpressionFactory> exprFactories = new HashSet<>();
173-
private Collection<WorkflowExecutionListener> listeners =
174-
ServiceLoader.load(WorkflowExecutionListener.class).stream()
175-
.map(Provider::get)
176-
.collect(Collectors.toList());
177+
private List<WorkflowExecutionListener> listeners =
178+
loadFromServiceLoader(WorkflowExecutionListener.class);
179+
private List<CallableTaskProxyBuilder> callableProxyBuilders =
180+
loadFromServiceLoader(CallableTaskProxyBuilder.class);
177181
private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get();
178182
private SchemaValidatorFactory schemaValidatorFactory;
179183
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
@@ -211,6 +215,11 @@ public Builder withListener(WorkflowExecutionListener listener) {
211215
return this;
212216
}
213217

218+
public Builder withCallableProxy(CallableTaskProxyBuilder builder) {
219+
callableProxyBuilders.add(builder);
220+
return this;
221+
}
222+
214223
public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
215224
this.taskFactory = factory;
216225
return this;
@@ -369,11 +378,17 @@ public WorkflowApplication build() {
369378
if (defaultCatalogURI == null) {
370379
defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog");
371380
}
381+
Collections.sort(listeners);
382+
Collections.sort(callableProxyBuilders);
372383
if (id == null) {
373384
id = idFactory.get();
374385
}
375386
return new WorkflowApplication(this);
376387
}
388+
389+
private <T> List<T> loadFromServiceLoader(Class<T> clazz) {
390+
return ServiceLoader.load(clazz).stream().map(Provider::get).collect(Collectors.toList());
391+
}
377392
}
378393

379394
public Map<WorkflowDefinitionId, WorkflowDefinition> workflowDefinitions() {
@@ -474,4 +489,8 @@ public <T> Optional<T> additionalObject(
474489
return Optional.ofNullable(additionalObjects.get(name))
475490
.map(v -> (T) v.apply(workflowContext, taskContext));
476491
}
492+
493+
public Collection<CallableTaskProxyBuilder> callableProxyBuilders() {
494+
return callableProxyBuilders;
495+
}
477496
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
2222
import io.serverlessworkflow.impl.WorkflowModel;
2323
import io.serverlessworkflow.impl.WorkflowMutablePosition;
24+
import java.util.List;
2425
import java.util.concurrent.CompletableFuture;
2526

2627
public class CallTaskExecutor<T extends TaskBase> extends RegularTaskExecutor<T> {
@@ -29,27 +30,37 @@ public class CallTaskExecutor<T extends TaskBase> extends RegularTaskExecutor<T>
2930

3031
public static class CallTaskExecutorBuilder<T extends TaskBase>
3132
extends RegularTaskExecutorBuilder<T> {
32-
private final CallableTaskBuilder<T> callable;
33+
private CallableTaskBuilder<T> callableBuilder;
34+
private List<CallableTaskProxyBuilder> callableProxyBuilders;
35+
private CallableTask callable;
3336

3437
protected CallTaskExecutorBuilder(
3538
WorkflowMutablePosition position,
3639
T task,
3740
WorkflowDefinition definition,
38-
CallableTaskBuilder<T> callable) {
41+
CallableTaskBuilder<T> callableBuilder) {
3942
super(position, task, definition);
40-
this.callable = callable;
41-
callable.init(task, definition, position);
43+
this.callableProxyBuilders =
44+
definition.application().callableProxyBuilders().stream()
45+
.filter(t -> t.accept(task))
46+
.toList();
47+
callableBuilder.init(task, definition, position);
48+
this.callableBuilder = callableBuilder;
4249
}
4350

4451
@Override
4552
public CallTaskExecutor<T> buildInstance() {
53+
this.callable = callableBuilder.build();
54+
for (CallableTaskProxyBuilder callableBuilder : callableProxyBuilders) {
55+
this.callable = callableBuilder.build(callable);
56+
}
4657
return new CallTaskExecutor<>(this);
4758
}
4859
}
4960

5061
protected CallTaskExecutor(CallTaskExecutorBuilder<T> builder) {
5162
super(builder);
52-
this.callable = builder.callable.build();
63+
this.callable = builder.callable;
5364
}
5465

5566
@Override
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors;
17+
18+
import io.serverlessworkflow.api.types.TaskBase;
19+
import io.serverlessworkflow.impl.ServicePriority;
20+
21+
public interface CallableTaskProxyBuilder extends ServicePriority {
22+
23+
default boolean accept(TaskBase taskBase) {
24+
return true;
25+
}
26+
27+
CallableTask build(CallableTask delegate);
28+
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package io.serverlessworkflow.impl.lifecycle;
1717

18-
public interface WorkflowExecutionListener extends AutoCloseable {
18+
import io.serverlessworkflow.impl.ServicePriority;
19+
20+
public interface WorkflowExecutionListener extends AutoCloseable, ServicePriority {
1921

2022
default void onWorkflowStarted(WorkflowStartedEvent ev) {}
2123

impl/test/src/test/java/io/serverlessworkflow/impl/test/HTTPWorkflowDefinitionTest.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.IOException;
2626
import java.nio.charset.StandardCharsets;
2727
import java.util.Map;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.CompletionException;
2930
import mockwebserver3.MockResponse;
3031
import mockwebserver3.MockWebServer;
@@ -37,15 +38,34 @@
3738
import org.junit.jupiter.api.BeforeEach;
3839
import org.junit.jupiter.api.Disabled;
3940
import org.junit.jupiter.api.Test;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
4043

4144
public class HTTPWorkflowDefinitionTest {
4245

4346
private static WorkflowApplication appl;
4447
private static MockWebServer mockServer;
48+
private static final Logger logger = LoggerFactory.getLogger(HTTPWorkflowDefinitionTest.class);
4549

4650
@BeforeAll
4751
static void init() {
48-
appl = WorkflowApplication.builder().build();
52+
appl =
53+
WorkflowApplication.builder()
54+
.withCallableProxy(
55+
delegate ->
56+
(w, t, n) -> {
57+
long init = System.currentTimeMillis();
58+
CompletableFuture<WorkflowModel> result = delegate.apply(w, t, n);
59+
if (logger.isDebugEnabled()) {
60+
result.thenAccept(
61+
x ->
62+
logger.debug(
63+
"Http calls takes {} milliseconds",
64+
System.currentTimeMillis() - init));
65+
}
66+
return result;
67+
})
68+
.build();
4969
}
5070

5171
@AfterAll

0 commit comments

Comments
 (0)