Skip to content

Commit 36f5e15

Browse files
committed
[Fix #1286] Non blocking persistence
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 42e89a8 commit 36f5e15

17 files changed

+322
-183
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/SchedulerListener.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent;
19-
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
19+
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
2020
import io.serverlessworkflow.impl.scheduler.Cancellable;
2121
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
2222
import java.time.Duration;
2323
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.ConcurrentHashMap;
2526

26-
class SchedulerListener implements WorkflowExecutionListener, AutoCloseable {
27+
class SchedulerListener implements WorkflowExecutionCompletableListener, AutoCloseable {
2728

2829
private final WorkflowScheduler scheduler;
2930
private final Map<WorkflowDefinition, WorkflowValueResolver<Duration>> afterMap =
@@ -39,7 +40,7 @@ public void addAfter(WorkflowDefinition definition, WorkflowValueResolver<Durati
3940
}
4041

4142
@Override
42-
public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
43+
public CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
4344
WorkflowDefinition workflowDefinition = (WorkflowDefinition) ev.workflowContext().definition();
4445
WorkflowValueResolver<Duration> after = afterMap.get(workflowDefinition);
4546
if (after != null) {
@@ -49,6 +50,7 @@ public void onWorkflowCompleted(WorkflowCompletedEvent ev) {
4950
workflowDefinition,
5051
after.apply((WorkflowContext) ev.workflowContext(), null, ev.output())));
5152
}
53+
return CompletableFuture.completedFuture(null);
5254
}
5355

5456
public void removeAfter(WorkflowDefinition definition) {

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
3535
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
3636
import io.serverlessworkflow.impl.expressions.RuntimeDescriptor;
37+
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
3738
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
39+
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListenerAdapter;
3840
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
3941
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
4042
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
@@ -67,7 +69,7 @@ public class WorkflowApplication implements AutoCloseable {
6769
private final ResourceLoaderFactory resourceLoaderFactory;
6870
private final SchemaValidatorFactory schemaValidatorFactory;
6971
private final WorkflowInstanceIdFactory idFactory;
70-
private final Collection<WorkflowExecutionListener> listeners;
72+
private final Collection<WorkflowExecutionCompletableListener> listeners;
7173
private final Map<WorkflowDefinitionId, WorkflowDefinition> definitions;
7274
private final WorkflowPositionFactory positionFactory;
7375
private final ExecutorServiceFactory executorFactory;
@@ -137,7 +139,7 @@ public ResourceLoaderFactory resourceLoaderFactory() {
137139
return resourceLoaderFactory;
138140
}
139141

140-
public Collection<WorkflowExecutionListener> listeners() {
142+
public Collection<WorkflowExecutionCompletableListener> listeners() {
141143
return listeners;
142144
}
143145

@@ -175,8 +177,10 @@ public SchemaValidator getValidator(SchemaInline inline) {
175177
private String id;
176178
private TaskExecutorFactory taskFactory;
177179
private Collection<ExpressionFactory> exprFactories = new HashSet<>();
178-
private List<WorkflowExecutionListener> listeners =
179-
loadFromServiceLoader(WorkflowExecutionListener.class);
180+
private List<WorkflowExecutionCompletableListener> listeners =
181+
ServiceLoader.load(WorkflowExecutionListener.class).stream()
182+
.map(v -> new WorkflowExecutionListenerAdapter(v.get()))
183+
.collect(Collectors.toList());
180184
private List<CallableTaskProxyBuilder> callableProxyBuilders =
181185
loadFromServiceLoader(CallableTaskProxyBuilder.class);
182186
private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get();
@@ -212,6 +216,11 @@ public Builder withId(String id) {
212216
}
213217

214218
public Builder withListener(WorkflowExecutionListener listener) {
219+
listeners.add(new WorkflowExecutionListenerAdapter(listener));
220+
return this;
221+
}
222+
223+
public Builder withListener(WorkflowExecutionCompletableListener listener) {
215224
listeners.add(listener);
216225
return this;
217226
}
@@ -414,7 +423,7 @@ public void close() {
414423
}
415424
definitions.clear();
416425

417-
for (WorkflowExecutionListener listener : listeners) {
426+
for (WorkflowExecutionCompletableListener listener : listeners) {
418427
safeClose(listener);
419428
}
420429
listeners.clear();

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,30 +70,43 @@ public CompletableFuture<WorkflowModel> start() {
7070
return startExecution(
7171
() -> {
7272
startedAt = Instant.now();
73-
publishEvent(
73+
return publishEvent(
7474
workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext)));
7575
});
7676
}
7777

78-
protected final CompletableFuture<WorkflowModel> startExecution(Runnable runnable) {
78+
protected final CompletableFuture<WorkflowModel> startExecution(
79+
Supplier<CompletableFuture<?>> runnable) {
7980
CompletableFuture<WorkflowModel> future = futureRef.get();
8081
if (future != null) {
8182
return future;
8283
}
8384
status(WorkflowStatus.RUNNING);
84-
runnable.run();
85+
8586
future =
86-
TaskExecutorHelper.processTaskList(
87-
workflowContext.definition().startTask(),
88-
workflowContext,
89-
Optional.empty(),
90-
workflowContext
91-
.definition()
92-
.inputFilter()
93-
.map(f -> f.apply(workflowContext, null, input))
94-
.orElse(input))
95-
.whenComplete(this::whenCompleted)
96-
.thenApply(this::whenSuccess);
87+
runnable
88+
.get()
89+
.thenCompose(
90+
v ->
91+
TaskExecutorHelper.processTaskList(
92+
workflowContext.definition().startTask(),
93+
workflowContext,
94+
Optional.empty(),
95+
workflowContext
96+
.definition()
97+
.inputFilter()
98+
.map(f -> f.apply(workflowContext, null, input))
99+
.orElse(input))
100+
.whenComplete(this::whenCompleted)
101+
.thenApply(this::whenSuccess)
102+
.thenCompose(
103+
model ->
104+
publishEvent(
105+
workflowContext,
106+
l ->
107+
l.onWorkflowCompleted(
108+
new WorkflowCompletedEvent(workflowContext, model)))
109+
.thenApply(__ -> model)));
97110
futureRef.set(future);
98111
return future;
99112
}
@@ -126,9 +139,6 @@ private WorkflowModel whenSuccess(WorkflowModel node) {
126139
.orElse(node);
127140
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
128141
status(WorkflowStatus.COMPLETED);
129-
publishEvent(
130-
workflowContext,
131-
l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext, output)));
132142
return output;
133143
}
134144

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,24 @@ public CompletableFuture<TaskContext> apply(
213213
completable =
214214
completable
215215
.thenCompose(workflowContext.instance()::suspendedCheck)
216+
.thenCompose(
217+
t -> {
218+
CompletableFuture<?> events =
219+
t.isRetrying()
220+
? publishEvent(
221+
workflowContext,
222+
l ->
223+
l.onTaskRetried(
224+
new TaskRetriedEvent(workflowContext, taskContext)))
225+
: publishEvent(
226+
workflowContext,
227+
l ->
228+
l.onTaskStarted(
229+
new TaskStartedEvent(workflowContext, taskContext)));
230+
return events.thenApply(v -> t);
231+
})
216232
.thenApply(
217233
t -> {
218-
if (t.isRetrying()) {
219-
publishEvent(
220-
workflowContext,
221-
l -> l.onTaskRetried(new TaskRetriedEvent(workflowContext, taskContext)));
222-
} else {
223-
publishEvent(
224-
workflowContext,
225-
l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext)));
226-
}
227234
inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput()));
228235
inputProcessor.ifPresent(
229236
p -> taskContext.input(p.apply(workflowContext, t, t.rawInput())));
@@ -251,13 +258,16 @@ public CompletableFuture<TaskContext> apply(
251258
p.apply(workflowContext, t, workflowContext.context())));
252259
contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context()));
253260
t.completedAt(Instant.now());
254-
publishEvent(
255-
workflowContext,
256-
l ->
257-
l.onTaskCompleted(
258-
new TaskCompletedEvent(workflowContext, taskContext)));
259261
return t;
260-
});
262+
})
263+
.thenCompose(
264+
t ->
265+
publishEvent(
266+
workflowContext,
267+
l ->
268+
l.onTaskCompleted(
269+
new TaskCompletedEvent(workflowContext, taskContext)))
270+
.thenApply(__ -> t));
261271
if (timeout.isPresent()) {
262272
completable =
263273
completable

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,19 @@
1616
package io.serverlessworkflow.impl.lifecycle;
1717

1818
import io.serverlessworkflow.impl.WorkflowContext;
19-
import java.util.function.Consumer;
20-
import org.slf4j.Logger;
21-
import org.slf4j.LoggerFactory;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.Function;
2221

2322
public class LifecycleEventsUtils {
2423

2524
private LifecycleEventsUtils() {}
2625

27-
private static final Logger logger = LoggerFactory.getLogger(LifecycleEventsUtils.class);
28-
29-
public static <T extends TaskEvent> void publishEvent(
30-
WorkflowContext workflowContext, Consumer<WorkflowExecutionListener> consumer) {
31-
workflowContext
32-
.definition()
33-
.application()
34-
.listeners()
35-
.forEach(
36-
v -> {
37-
try {
38-
consumer.accept(v);
39-
} catch (Exception ex) {
40-
logger.error("Error processing listener. Ignoring and going on", ex);
41-
}
42-
});
26+
public static CompletableFuture<?> publishEvent(
27+
WorkflowContext workflowContext,
28+
Function<WorkflowExecutionCompletableListener, CompletableFuture<?>> function) {
29+
return CompletableFuture.allOf(
30+
workflowContext.definition().application().listeners().stream()
31+
.map(v -> function.apply(v))
32+
.toArray(CompletableFuture[]::new));
4333
}
4434
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.lifecycle;
17+
18+
import io.serverlessworkflow.impl.ServicePriority;
19+
import java.util.concurrent.CompletableFuture;
20+
21+
public interface WorkflowExecutionCompletableListener extends AutoCloseable, ServicePriority {
22+
23+
default CompletableFuture<?> onWorkflowStarted(WorkflowStartedEvent ev) {
24+
return CompletableFuture.completedFuture(null);
25+
}
26+
27+
default CompletableFuture<?> onWorkflowSuspended(WorkflowSuspendedEvent ev) {
28+
return CompletableFuture.completedFuture(null);
29+
}
30+
31+
default CompletableFuture<?> onWorkflowResumed(WorkflowResumedEvent ev) {
32+
return CompletableFuture.completedFuture(null);
33+
}
34+
35+
default CompletableFuture<?> onWorkflowCompleted(WorkflowCompletedEvent ev) {
36+
return CompletableFuture.completedFuture(null);
37+
}
38+
39+
default CompletableFuture<?> onWorkflowFailed(WorkflowFailedEvent ev) {
40+
return CompletableFuture.completedFuture(null);
41+
}
42+
43+
default CompletableFuture<?> onWorkflowCancelled(WorkflowCancelledEvent ev) {
44+
return CompletableFuture.completedFuture(null);
45+
}
46+
47+
default CompletableFuture<?> onTaskStarted(TaskStartedEvent ev) {
48+
return CompletableFuture.completedFuture(null);
49+
}
50+
51+
default CompletableFuture<?> onTaskCompleted(TaskCompletedEvent ev) {
52+
return CompletableFuture.completedFuture(null);
53+
}
54+
55+
default CompletableFuture<?> onTaskFailed(TaskFailedEvent ev) {
56+
return CompletableFuture.completedFuture(null);
57+
}
58+
59+
default CompletableFuture<?> onTaskCancelled(TaskCancelledEvent ev) {
60+
return CompletableFuture.completedFuture(null);
61+
}
62+
63+
default CompletableFuture<?> onTaskSuspended(TaskSuspendedEvent ev) {
64+
return CompletableFuture.completedFuture(null);
65+
}
66+
67+
default CompletableFuture<?> onTaskResumed(TaskResumedEvent ev) {
68+
return CompletableFuture.completedFuture(null);
69+
}
70+
71+
default CompletableFuture<?> onTaskRetried(TaskRetriedEvent ev) {
72+
return CompletableFuture.completedFuture(null);
73+
}
74+
75+
default CompletableFuture<?> onWorkflowStatusChanged(WorkflowStatusEvent ev) {
76+
return CompletableFuture.completedFuture(null);
77+
}
78+
79+
@Override
80+
default void close() {}
81+
}

0 commit comments

Comments
 (0)