Skip to content

Commit bb9bb16

Browse files
committed
[Fix #1136] Run db writes asynchronously
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent ba24a4f commit bb9bb16

13 files changed

Lines changed: 162 additions & 54 deletions

File tree

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,53 @@
1717

1818
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
1919

20+
import java.time.Duration;
21+
import java.util.Optional;
22+
import java.util.concurrent.ExecutorService;
23+
2024
public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers {
2125

22-
private final PersistenceInstanceStore store;
26+
public static class Builder {
27+
28+
private final PersistenceInstanceStore store;
29+
private ExecutorService executorService;
30+
private Duration closeTimeout;
31+
32+
private Builder(PersistenceInstanceStore store) {
33+
this.store = store;
34+
}
35+
36+
public Builder withExecutorService(ExecutorService executorService) {
37+
this.executorService = executorService;
38+
return this;
39+
}
40+
41+
public Builder withCloseTimeout(Duration closeTimeout) {
42+
this.closeTimeout = closeTimeout;
43+
return this;
44+
}
2345

24-
public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) {
25-
return new DefaultPersistenceInstanceHandlers(
26-
new DefaultPersistenceInstanceWriter(store),
27-
new DefaultPersistenceInstanceReader(store),
28-
store);
46+
public PersistenceInstanceHandlers build() {
47+
return new DefaultPersistenceInstanceHandlers(
48+
new DefaultPersistenceInstanceWriter(
49+
store,
50+
Optional.ofNullable(executorService),
51+
closeTimeout == null ? Duration.ofSeconds(1) : closeTimeout),
52+
new DefaultPersistenceInstanceReader(store),
53+
store);
54+
}
2955
}
3056

57+
public static Builder builder(PersistenceInstanceStore store) {
58+
return new Builder(store);
59+
}
60+
61+
public static PersistenceInstanceHandlers from(PersistenceInstanceStore store) {
62+
return new Builder(store).build();
63+
}
64+
65+
private final PersistenceInstanceStore store;
66+
3167
private DefaultPersistenceInstanceHandlers(
3268
PersistenceInstanceWriter writer,
3369
PersistenceInstanceReader reader,
@@ -38,6 +74,7 @@ private DefaultPersistenceInstanceHandlers(
3874

3975
@Override
4076
public void close() {
77+
super.close();
4178
safeClose(store);
4279
}
4380
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,7 @@ public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition, String ap
5555
.onClose(() -> transaction.commit(definition))
5656
.map(v -> new WorkflowPersistenceInstance(definition, v));
5757
}
58+
59+
@Override
60+
public void close() {}
5861
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java

Lines changed: 74 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,75 +17,123 @@
1717

1818
import io.serverlessworkflow.impl.TaskContextData;
1919
import io.serverlessworkflow.impl.WorkflowContextData;
20+
import io.serverlessworkflow.impl.WorkflowDefinitionData;
2021
import io.serverlessworkflow.impl.WorkflowStatus;
22+
import java.time.Duration;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.TimeUnit;
2129
import java.util.function.Consumer;
2230

2331
public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter {
2432

2533
private final PersistenceInstanceStore store;
34+
private final Map<String, CompletableFuture<Void>> futuresMap = new ConcurrentHashMap<>();
35+
private final Optional<ExecutorService> executorService;
36+
private final Duration closeTimeout;
2637

27-
protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) {
38+
protected DefaultPersistenceInstanceWriter(
39+
PersistenceInstanceStore store,
40+
Optional<ExecutorService> executorService,
41+
Duration closeTimeout) {
2842
this.store = store;
43+
this.executorService = executorService;
44+
this.closeTimeout = closeTimeout;
2945
}
3046

3147
@Override
32-
public void started(WorkflowContextData workflowContext) {
33-
doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
48+
public CompletableFuture<Void> started(WorkflowContextData workflowContext) {
49+
return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
3450
}
3551

3652
@Override
37-
public void completed(WorkflowContextData workflowContext) {
38-
removeProcessInstance(workflowContext);
53+
public CompletableFuture<Void> completed(WorkflowContextData workflowContext) {
54+
return removeProcessInstance(workflowContext);
3955
}
4056

4157
@Override
42-
public void failed(WorkflowContextData workflowContext, Throwable ex) {
43-
removeProcessInstance(workflowContext);
58+
public CompletableFuture<Void> failed(WorkflowContextData workflowContext, Throwable ex) {
59+
return removeProcessInstance(workflowContext);
4460
}
4561

4662
@Override
47-
public void aborted(WorkflowContextData workflowContext) {
48-
removeProcessInstance(workflowContext);
63+
public CompletableFuture<Void> aborted(WorkflowContextData workflowContext) {
64+
return removeProcessInstance(workflowContext);
4965
}
5066

51-
protected void removeProcessInstance(WorkflowContextData workflowContext) {
52-
doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext);
67+
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
68+
return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext)
69+
.thenRun(() -> futuresMap.remove(workflowContext.instanceData().id()));
5370
}
5471

5572
@Override
56-
public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {
57-
// not recording
73+
public CompletableFuture<Void> taskStarted(
74+
WorkflowContextData workflowContext, TaskContextData taskContext) {
75+
return CompletableFuture.completedFuture(null);
5876
}
5977

6078
@Override
61-
public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) {
62-
doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext);
79+
public CompletableFuture<Void> taskRetried(
80+
WorkflowContextData workflowContext, TaskContextData taskContext) {
81+
return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext);
6382
}
6483

6584
@Override
66-
public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) {
67-
doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext);
85+
public CompletableFuture<Void> taskCompleted(
86+
WorkflowContextData workflowContext, TaskContextData taskContext) {
87+
return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext);
6888
}
6989

7090
@Override
71-
public void suspended(WorkflowContextData workflowContext) {
72-
doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext);
91+
public CompletableFuture<Void> suspended(WorkflowContextData workflowContext) {
92+
return doTransaction(
93+
t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext);
7394
}
7495

7596
@Override
76-
public void resumed(WorkflowContextData workflowContext) {
77-
doTransaction(t -> t.clearStatus(workflowContext), workflowContext);
97+
public CompletableFuture<Void> resumed(WorkflowContextData workflowContext) {
98+
return doTransaction(t -> t.clearStatus(workflowContext), workflowContext);
7899
}
79100

80-
private void doTransaction(
81-
Consumer<PersistenceInstanceTransaction> operations, WorkflowContextData context) {
101+
private CompletableFuture<Void> doTransaction(
102+
Consumer<PersistenceInstanceTransaction> operation, WorkflowContextData context) {
103+
final ExecutorService service =
104+
this.executorService.orElse(context.definition().application().executorService());
105+
final Runnable runnable = () -> executeTransaction(operation, context.definition());
106+
return futuresMap.compute(
107+
context.instanceData().id(),
108+
(k, v) ->
109+
v == null
110+
? CompletableFuture.runAsync(runnable, service)
111+
: v.thenRunAsync(runnable, service));
112+
}
113+
114+
private void executeTransaction(
115+
Consumer<PersistenceInstanceTransaction> operation, WorkflowDefinitionData definition) {
82116
PersistenceInstanceTransaction transaction = store.begin();
83117
try {
84-
operations.accept(transaction);
85-
transaction.commit(context.definition());
118+
operation.accept(transaction);
119+
transaction.commit(definition);
86120
} catch (Exception ex) {
87-
transaction.rollback(context.definition());
121+
transaction.rollback(definition);
88122
throw ex;
89123
}
90124
}
125+
126+
@Override
127+
public void close() {
128+
futuresMap.clear();
129+
executorService.ifPresent(
130+
e -> {
131+
try {
132+
e.awaitTermination(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
133+
e.shutdown();
134+
} catch (InterruptedException ex) {
135+
Thread.currentThread().interrupt();
136+
}
137+
});
138+
}
91139
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
19+
1820
public class PersistenceInstanceHandlers implements AutoCloseable {
1921

2022
private final PersistenceInstanceWriter writer;
@@ -35,5 +37,8 @@ public PersistenceInstanceReader reader() {
3537
}
3638

3739
@Override
38-
public void close() {}
40+
public void close() {
41+
safeClose(writer);
42+
safeClose(reader);
43+
}
3944
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Optional;
2121
import java.util.stream.Stream;
2222

23-
public interface PersistenceInstanceReader {
23+
public interface PersistenceInstanceReader extends AutoCloseable {
2424

2525
default Stream<WorkflowInstance> scanAll(WorkflowDefinition definition) {
2626
return scanAll(definition, definition.application().id());

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,28 @@
1717

1818
import io.serverlessworkflow.impl.TaskContextData;
1919
import io.serverlessworkflow.impl.WorkflowContextData;
20+
import java.util.concurrent.CompletableFuture;
2021

21-
public interface PersistenceInstanceWriter {
22+
public interface PersistenceInstanceWriter extends AutoCloseable {
2223

23-
void started(WorkflowContextData workflowContext);
24+
CompletableFuture<Void> started(WorkflowContextData workflowContext);
2425

25-
void completed(WorkflowContextData workflowContext);
26+
CompletableFuture<Void> completed(WorkflowContextData workflowContext);
2627

27-
void failed(WorkflowContextData workflowContext, Throwable ex);
28+
CompletableFuture<Void> failed(WorkflowContextData workflowContext, Throwable ex);
2829

29-
void aborted(WorkflowContextData workflowContext);
30+
CompletableFuture<Void> aborted(WorkflowContextData workflowContext);
3031

31-
void suspended(WorkflowContextData workflowContext);
32+
CompletableFuture<Void> suspended(WorkflowContextData workflowContext);
3233

33-
void resumed(WorkflowContextData workflowContext);
34+
CompletableFuture<Void> resumed(WorkflowContextData workflowContext);
3435

35-
void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext);
36+
CompletableFuture<Void> taskRetried(
37+
WorkflowContextData workflowContext, TaskContextData taskContext);
3638

37-
void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext);
39+
CompletableFuture<Void> taskStarted(
40+
WorkflowContextData workflowContext, TaskContextData taskContext);
3841

39-
void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext);
42+
CompletableFuture<Void> taskCompleted(
43+
WorkflowContextData workflowContext, TaskContextData taskContext);
4044
}

impl/persistence/mvstore/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ This document explains how to enable persistence using MVStore as underlying per
88
To enable MVStore persistence, users should at least do the following things:
99

1010
- Initialize a MVStorePersistenceStore instance, passing the path of the file containing the persisted information
11-
- Pass this MVStorePersitenceStore as argument of BytesMapPersistenceInstanceHandlers.builder. This will create PersistenceInstanceWriter and PersistenceInstanceReader.
11+
- Pass this MVStorePersitenceStore as argument of DefaultPersistenceInstanceHandlers.from. This will create PersistenceInstanceWriter and PersistenceInstanceReader.
1212
- Use the PersistenceInstanceWriter created in the previous step to decorate the existing WorkflowApplication builder.
1313

1414
The code will look like this
1515

1616
----
1717
try (PersistenceInstanceHandlers handlers =
18-
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db"))
18+
DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore("test.db"))
1919
.build();
2020
WorkflowApplication application =
2121
PersistenceApplicationBuilder.builder(
@@ -33,7 +33,7 @@ If user wants to resume execution of all previously existing instances (typicall
3333
Once retrieved, calling `start` method will resume the execution after the latest completed task before the running JVM was stopped.
3434

3535
----
36-
handlers.reader().readAll(definition).values().forEach(WorkflowInstance::start);
36+
handlers.reader().scanAll(definition).forEach(WorkflowInstance::start);
3737
----
3838

3939
---

impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.time.Instant;
4242
import java.util.Map;
4343
import java.util.Optional;
44+
import java.util.concurrent.Executors;
4445
import org.junit.jupiter.api.AfterAll;
4546
import org.junit.jupiter.api.AfterEach;
4647
import org.junit.jupiter.api.BeforeAll;
@@ -67,7 +68,10 @@ static void init() throws IOException {
6768

6869
@BeforeEach
6970
void setup() {
70-
handlers = DefaultPersistenceInstanceHandlers.from(persistenceStore());
71+
handlers =
72+
DefaultPersistenceInstanceHandlers.builder(persistenceStore())
73+
.withExecutorService(Executors.newSingleThreadExecutor())
74+
.build();
7175
context = app.modelFactory().fromNull();
7276
workflowContext = mock(WorkflowContext.class);
7377
workflowInstance = mock(WorkflowInstance.class);
@@ -117,8 +121,8 @@ void testWorkflowInstance() throws InterruptedException {
117121

118122
final Map<String, Object> completedMap = Map.of("name", "fulanito");
119123

120-
handlers.writer().started(workflowContext);
121-
handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries));
124+
handlers.writer().started(workflowContext).join();
125+
handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)).join();
122126
Optional<WorkflowInstance> optional = handlers.reader().find(definition, workflowInstance.id());
123127
assertThat(optional).isPresent();
124128
WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow();
@@ -135,7 +139,10 @@ void testWorkflowInstance() throws InterruptedException {
135139
assertThat(retryAttempt.getValue()).isEqualTo(numRetries);
136140

137141
// task completed
138-
handlers.writer().taskCompleted(workflowContext, completedTaskContext(position, completedMap));
142+
handlers
143+
.writer()
144+
.taskCompleted(workflowContext, completedTaskContext(position, completedMap))
145+
.join();
139146
instance =
140147
(WorkflowPersistenceInstance)
141148
handlers.reader().find(definition, workflowInstance.id()).orElseThrow();
@@ -157,7 +164,7 @@ void testWorkflowInstance() throws InterruptedException {
157164
assertThat(transition.getValue().isEndNode()).isTrue();
158165

159166
// workflow completed
160-
handlers.writer().completed(workflowContext);
167+
handlers.writer().completed(workflowContext).join();
161168
assertThat(handlers.reader().find(definition, workflowInstance.id())).isEmpty();
162169
}
163170
}

impl/test/db-samples/running.db

8 KB
Binary file not shown.

impl/test/db-samples/running_v1.db

8 KB
Binary file not shown.

0 commit comments

Comments
 (0)