Skip to content

Commit 91d6256

Browse files
committed
[Fix #1136] Serialize DB writes
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent ba24a4f commit 91d6256

File tree

12 files changed

+139
-52
lines changed

12 files changed

+139
-52
lines changed

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,43 @@
1717

1818
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
1919

20+
import java.util.Optional;
21+
import java.util.concurrent.ExecutorService;
22+
2023
public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers {
2124

22-
private final PersistenceInstanceStore store;
25+
public static class Builder {
26+
27+
private final PersistenceInstanceStore store;
28+
private ExecutorService executorService;
2329

24-
public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) {
25-
return new DefaultPersistenceInstanceHandlers(
26-
new DefaultPersistenceInstanceWriter(store),
27-
new DefaultPersistenceInstanceReader(store),
28-
store);
30+
private Builder(PersistenceInstanceStore store) {
31+
this.store = store;
32+
}
33+
34+
public Builder withExecutorService(ExecutorService executorService) {
35+
this.executorService = executorService;
36+
return this;
37+
}
38+
39+
public PersistenceInstanceHandlers build() {
40+
return new DefaultPersistenceInstanceHandlers(
41+
new DefaultPersistenceInstanceWriter(store, Optional.ofNullable(executorService)),
42+
new DefaultPersistenceInstanceReader(store),
43+
store);
44+
}
2945
}
3046

47+
public static Builder builder(PersistenceInstanceStore store) {
48+
return new Builder(store);
49+
}
50+
51+
public static PersistenceInstanceHandlers from(PersistenceInstanceStore store) {
52+
return new Builder(store).build();
53+
}
54+
55+
private final PersistenceInstanceStore store;
56+
3157
private DefaultPersistenceInstanceHandlers(
3258
PersistenceInstanceWriter writer,
3359
PersistenceInstanceReader reader,
@@ -38,6 +64,7 @@ private DefaultPersistenceInstanceHandlers(
3864

3965
@Override
4066
public void close() {
67+
super.close();
4168
safeClose(store);
4269
}
4370
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,7 @@ public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition, String ap
5555
.onClose(() -> transaction.commit(definition))
5656
.map(v -> new WorkflowPersistenceInstance(definition, v));
5757
}
58+
59+
@Override
60+
public void close() {}
5861
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,75 +17,116 @@
1717

1818
import io.serverlessworkflow.impl.TaskContextData;
1919
import io.serverlessworkflow.impl.WorkflowContextData;
20+
import io.serverlessworkflow.impl.WorkflowDefinitionData;
2021
import io.serverlessworkflow.impl.WorkflowStatus;
22+
import java.util.Map;
23+
import java.util.Optional;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.TimeUnit;
2128
import java.util.function.Consumer;
2229

2330
public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter {
2431

2532
private final PersistenceInstanceStore store;
33+
private final Map<String, CompletableFuture<Void>> futuresMap = new ConcurrentHashMap<>();
34+
private final Optional<ExecutorService> executorService;
2635

27-
protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) {
36+
protected DefaultPersistenceInstanceWriter(
37+
PersistenceInstanceStore store, Optional<ExecutorService> executorService) {
2838
this.store = store;
39+
this.executorService = executorService;
2940
}
3041

3142
@Override
32-
public void started(WorkflowContextData workflowContext) {
33-
doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
43+
public CompletableFuture<Void> started(WorkflowContextData workflowContext) {
44+
return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
3445
}
3546

3647
@Override
37-
public void completed(WorkflowContextData workflowContext) {
38-
removeProcessInstance(workflowContext);
48+
public CompletableFuture<Void> completed(WorkflowContextData workflowContext) {
49+
return removeProcessInstance(workflowContext);
3950
}
4051

4152
@Override
42-
public void failed(WorkflowContextData workflowContext, Throwable ex) {
43-
removeProcessInstance(workflowContext);
53+
public CompletableFuture<Void> failed(WorkflowContextData workflowContext, Throwable ex) {
54+
return removeProcessInstance(workflowContext);
4455
}
4556

4657
@Override
47-
public void aborted(WorkflowContextData workflowContext) {
48-
removeProcessInstance(workflowContext);
58+
public CompletableFuture<Void> aborted(WorkflowContextData workflowContext) {
59+
return removeProcessInstance(workflowContext);
4960
}
5061

51-
protected void removeProcessInstance(WorkflowContextData workflowContext) {
52-
doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext);
62+
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
63+
return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext);
5364
}
5465

5566
@Override
56-
public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {
57-
// not recording
67+
public CompletableFuture<Void> taskStarted(
68+
WorkflowContextData workflowContext, TaskContextData taskContext) {
69+
return CompletableFuture.completedFuture(null);
5870
}
5971

6072
@Override
61-
public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) {
62-
doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext);
73+
public CompletableFuture<Void> taskRetried(
74+
WorkflowContextData workflowContext, TaskContextData taskContext) {
75+
return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext);
6376
}
6477

6578
@Override
66-
public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) {
67-
doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext);
79+
public CompletableFuture<Void> taskCompleted(
80+
WorkflowContextData workflowContext, TaskContextData taskContext) {
81+
return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext);
6882
}
6983

7084
@Override
71-
public void suspended(WorkflowContextData workflowContext) {
72-
doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext);
85+
public CompletableFuture<Void> suspended(WorkflowContextData workflowContext) {
86+
return doTransaction(
87+
t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext);
7388
}
7489

7590
@Override
76-
public void resumed(WorkflowContextData workflowContext) {
77-
doTransaction(t -> t.clearStatus(workflowContext), workflowContext);
91+
public CompletableFuture<Void> resumed(WorkflowContextData workflowContext) {
92+
return doTransaction(t -> t.clearStatus(workflowContext), workflowContext);
7893
}
7994

80-
private void doTransaction(
81-
Consumer<PersistenceInstanceTransaction> operations, WorkflowContextData context) {
95+
private CompletableFuture<Void> doTransaction(
96+
Consumer<PersistenceInstanceTransaction> operation, WorkflowContextData context) {
97+
final ExecutorService service =
98+
this.executorService.orElse(context.definition().application().executorService());
99+
final Runnable runnable = () -> executeTransaction(operation, context.definition());
100+
return futuresMap.compute(
101+
context.instanceData().id(),
102+
(k, v) ->
103+
v == null
104+
? CompletableFuture.runAsync(runnable, service)
105+
: v.thenRunAsync(runnable, service));
106+
}
107+
108+
private void executeTransaction(
109+
Consumer<PersistenceInstanceTransaction> operation, WorkflowDefinitionData definition) {
82110
PersistenceInstanceTransaction transaction = store.begin();
83111
try {
84-
operations.accept(transaction);
85-
transaction.commit(context.definition());
112+
operation.accept(transaction);
113+
transaction.commit(definition);
86114
} catch (Exception ex) {
87-
transaction.rollback(context.definition());
115+
transaction.rollback(definition);
88116
throw ex;
89117
}
90118
}
119+
120+
@Override
121+
public void close() {
122+
executorService.ifPresent(
123+
e -> {
124+
try {
125+
e.shutdown();
126+
e.awaitTermination(1, TimeUnit.SECONDS);
127+
} catch (InterruptedException ex) {
128+
Thread.currentThread().interrupt();
129+
}
130+
});
131+
}
91132
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18+
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
19+
1820
public class PersistenceInstanceHandlers implements AutoCloseable {
1921

2022
private final PersistenceInstanceWriter writer;
@@ -35,5 +37,8 @@ public PersistenceInstanceReader reader() {
3537
}
3638

3739
@Override
38-
public void close() {}
40+
public void close() {
41+
safeClose(writer);
42+
safeClose(reader);
43+
}
3944
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Optional;
2121
import java.util.stream.Stream;
2222

23-
public interface PersistenceInstanceReader {
23+
public interface PersistenceInstanceReader extends AutoCloseable {
2424

2525
default Stream<WorkflowInstance> scanAll(WorkflowDefinition definition) {
2626
return scanAll(definition, definition.application().id());

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,28 @@
1717

1818
import io.serverlessworkflow.impl.TaskContextData;
1919
import io.serverlessworkflow.impl.WorkflowContextData;
20+
import java.util.concurrent.CompletableFuture;
2021

21-
public interface PersistenceInstanceWriter {
22+
public interface PersistenceInstanceWriter extends AutoCloseable {
2223

23-
void started(WorkflowContextData workflowContext);
24+
CompletableFuture<Void> started(WorkflowContextData workflowContext);
2425

25-
void completed(WorkflowContextData workflowContext);
26+
CompletableFuture<Void> completed(WorkflowContextData workflowContext);
2627

27-
void failed(WorkflowContextData workflowContext, Throwable ex);
28+
CompletableFuture<Void> failed(WorkflowContextData workflowContext, Throwable ex);
2829

29-
void aborted(WorkflowContextData workflowContext);
30+
CompletableFuture<Void> aborted(WorkflowContextData workflowContext);
3031

31-
void suspended(WorkflowContextData workflowContext);
32+
CompletableFuture<Void> suspended(WorkflowContextData workflowContext);
3233

33-
void resumed(WorkflowContextData workflowContext);
34+
CompletableFuture<Void> resumed(WorkflowContextData workflowContext);
3435

35-
void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext);
36+
CompletableFuture<Void> taskRetried(
37+
WorkflowContextData workflowContext, TaskContextData taskContext);
3638

37-
void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext);
39+
CompletableFuture<Void> taskStarted(
40+
WorkflowContextData workflowContext, TaskContextData taskContext);
3841

39-
void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext);
42+
CompletableFuture<Void> taskCompleted(
43+
WorkflowContextData workflowContext, TaskContextData taskContext);
4044
}

impl/persistence/mvstore/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ This document explains how to enable persistence using MVStore as underlying per
88
To enable MVStore persistence, users should at least do the following things:
99

1010
- Initialize a MVStorePersistenceStore instance, passing the path of the file containing the persisted information
11-
- Pass this MVStorePersitenceStore as argument of BytesMapPersistenceInstanceHandlers.builder. This will create PersistenceInstanceWriter and PersistenceInstanceReader.
11+
- Pass this MVStorePersitenceStore as argument of DefaultPersistenceInstanceHandlers.from. This will create PersistenceInstanceWriter and PersistenceInstanceReader.
1212
- Use the PersistenceInstanceWriter created in the previous step to decorate the existing WorkflowApplication builder.
1313

1414
The code will look like this
1515

1616
----
1717
try (PersistenceInstanceHandlers handlers =
18-
BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db"))
18+
DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore("test.db"))
1919
.build();
2020
WorkflowApplication application =
2121
PersistenceApplicationBuilder.builder(
@@ -33,7 +33,7 @@ If user wants to resume execution of all previously existing instances (typicall
3333
Once retrieved, calling `start` method will resume the execution after the latest completed task before the running JVM was stopped.
3434

3535
----
36-
handlers.reader().readAll(definition).values().forEach(WorkflowInstance::start);
36+
handlers.reader().scanAll(definition).forEach(WorkflowInstance::start);
3737
----
3838

3939
---

impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.time.Instant;
4242
import java.util.Map;
4343
import java.util.Optional;
44+
import java.util.concurrent.Executors;
4445
import org.junit.jupiter.api.AfterAll;
4546
import org.junit.jupiter.api.AfterEach;
4647
import org.junit.jupiter.api.BeforeAll;
@@ -67,7 +68,10 @@ static void init() throws IOException {
6768

6869
@BeforeEach
6970
void setup() {
70-
handlers = DefaultPersistenceInstanceHandlers.from(persistenceStore());
71+
handlers =
72+
DefaultPersistenceInstanceHandlers.builder(persistenceStore())
73+
.withExecutorService(Executors.newSingleThreadExecutor())
74+
.build();
7175
context = app.modelFactory().fromNull();
7276
workflowContext = mock(WorkflowContext.class);
7377
workflowInstance = mock(WorkflowInstance.class);
@@ -117,8 +121,8 @@ void testWorkflowInstance() throws InterruptedException {
117121

118122
final Map<String, Object> completedMap = Map.of("name", "fulanito");
119123

120-
handlers.writer().started(workflowContext);
121-
handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries));
124+
handlers.writer().started(workflowContext).join();
125+
handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)).join();
122126
Optional<WorkflowInstance> optional = handlers.reader().find(definition, workflowInstance.id());
123127
assertThat(optional).isPresent();
124128
WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow();
@@ -135,7 +139,10 @@ void testWorkflowInstance() throws InterruptedException {
135139
assertThat(retryAttempt.getValue()).isEqualTo(numRetries);
136140

137141
// task completed
138-
handlers.writer().taskCompleted(workflowContext, completedTaskContext(position, completedMap));
142+
handlers
143+
.writer()
144+
.taskCompleted(workflowContext, completedTaskContext(position, completedMap))
145+
.join();
139146
instance =
140147
(WorkflowPersistenceInstance)
141148
handlers.reader().find(definition, workflowInstance.id()).orElseThrow();
@@ -157,7 +164,7 @@ void testWorkflowInstance() throws InterruptedException {
157164
assertThat(transition.getValue().isEndNode()).isTrue();
158165

159166
// workflow completed
160-
handlers.writer().completed(workflowContext);
167+
handlers.writer().completed(workflowContext).join();
161168
assertThat(handlers.reader().find(definition, workflowInstance.id())).isEmpty();
162169
}
163170
}

impl/test/db-samples/running.db

8 KB
Binary file not shown.

impl/test/db-samples/running_v1.db

8 KB
Binary file not shown.

0 commit comments

Comments
 (0)