Skip to content

Commit dfdcf6a

Browse files
committed
[Fix #1395] AllStrategyCorrelation persistence
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent ad1eb92 commit dfdcf6a

16 files changed

Lines changed: 246 additions & 23 deletions

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.util.concurrent.CompletableFuture;
19+
import java.util.function.Supplier;
1920

2021
public interface WorkflowInstance extends WorkflowInstanceData {
2122
CompletableFuture<WorkflowModel> start();
@@ -49,4 +50,6 @@ public interface WorkflowInstance extends WorkflowInstanceData {
4950
boolean cancel();
5051

5152
boolean resume();
53+
54+
<T> T addMetadataIfAbsent(String key, Supplier<T> supplier);
5255
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.time.Instant;
19+
import java.util.Optional;
1920

2021
public interface WorkflowInstanceData {
2122
String id();
@@ -29,4 +30,6 @@ public interface WorkflowInstanceData {
2930
WorkflowStatus status();
3031

3132
WorkflowModel context();
33+
34+
<T> Optional<T> findMetadata(String key, Class<T> objectClass);
3235
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
public class WorkflowMutableInstance implements WorkflowInstance {
4343

44+
public static final String CLOUD_EVENT_IDS = "CloudEventIds";
45+
4446
private final AtomicReference<WorkflowStatus> status;
4547
protected final String id;
4648
protected final WorkflowModel input;
@@ -350,9 +352,16 @@ public void addCancelable(CompletableFuture<?> cancelable) {
350352
}
351353
}
352354

353-
public <T> T additionalObject(String key, Supplier<T> supplier) {
355+
@Override
356+
public <T> T addMetadataIfAbsent(String key, Supplier<T> supplier) {
354357
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
355358
}
356359

360+
@Override
361+
public <T> Optional<T> findMetadata(String key, Class<T> objectClass) {
362+
Object value = additionalObjects.get(key);
363+
return objectClass.isInstance(value) ? Optional.of(objectClass.cast(value)) : Optional.empty();
364+
}
365+
357366
public void restoreContext(WorkflowContext workflow, TaskContext context) {}
358367
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package io.serverlessworkflow.impl.scheduler;
1717

1818
import io.serverlessworkflow.impl.WorkflowDefinition;
19-
import io.serverlessworkflow.impl.WorkflowModel;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.ScheduledExecutorService;
2222
import java.util.concurrent.ScheduledFuture;
@@ -83,10 +83,10 @@ protected CronResolverIntanceRunner(WorkflowDefinition definition) {
8383
}
8484

8585
@Override
86-
public void accept(WorkflowModel model) {
86+
public void accept(WorkflowInstance instance) {
8787
if (!cancelled.get()) {
8888
scheduleNext();
89-
super.accept(model);
89+
super.accept(instance);
9090
}
9191
}
9292
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import io.cloudevents.CloudEvent;
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
22+
import io.serverlessworkflow.impl.WorkflowInstance;
2223
import io.serverlessworkflow.impl.WorkflowModel;
2324
import io.serverlessworkflow.impl.WorkflowModelCollection;
25+
import io.serverlessworkflow.impl.WorkflowMutableInstance;
2426
import io.serverlessworkflow.impl.events.EventConsumer;
2527
import io.serverlessworkflow.impl.events.EventRegistration;
2628
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
@@ -78,13 +80,17 @@ public ScheduledEventConsumer(
7880
protected void start(CloudEvent ce) {
7981
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
8082
model.add(converter.apply(ce));
81-
instanceRunner.accept(model);
83+
instanceRunner.accept(definition.instance(model));
8284
}
8385

8486
protected void start(Collection<CloudEvent> ces) {
8587
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
8688
ces.forEach(ce -> model.add(converter.apply(ce)));
87-
instanceRunner.accept(model);
89+
WorkflowInstance instance = definition.instance(model);
90+
instance.addMetadataIfAbsent(
91+
WorkflowMutableInstance.CLOUD_EVENT_IDS,
92+
() -> ces.stream().map(CloudEvent::getId).toList());
93+
instanceRunner.accept(instance);
8894
}
8995

9096
public void close() {

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.serverlessworkflow.impl.WorkflowModel;
2121
import java.util.function.Consumer;
2222

23-
public class ScheduledInstanceRunnable implements Runnable, Consumer<WorkflowModel> {
23+
public class ScheduledInstanceRunnable implements Runnable, Consumer<WorkflowInstance> {
2424

2525
protected final WorkflowDefinition definition;
2626

@@ -30,16 +30,20 @@ public ScheduledInstanceRunnable(WorkflowDefinition definition) {
3030

3131
@Override
3232
public void run() {
33-
accept(definition.application().modelFactory().fromNull());
33+
accept(definition.instance());
3434
}
3535

3636
@Override
37-
public void accept(WorkflowModel model) {
38-
runScheduledInstance(definition, model);
37+
public void accept(WorkflowInstance instance) {
38+
runScheduledInstance(definition, instance);
3939
}
4040

4141
public static void runScheduledInstance(WorkflowDefinition definition, WorkflowModel model) {
42-
WorkflowInstance instance = definition.instance(model);
42+
runScheduledInstance(definition, definition.instance(model));
43+
}
44+
45+
private static void runScheduledInstance(
46+
WorkflowDefinition definition, WorkflowInstance instance) {
4347
definition.addScheduledInstance(instance);
4448
definition.application().executorService().execute(() -> instance.start());
4549
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.WorkflowDefinition;
20+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
21+
import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfo;
22+
import java.util.Collection;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import java.util.function.Consumer;
26+
27+
public abstract class AbstractAllStrategyCorrelationInfo implements AllStrategyCorrelationInfo {
28+
29+
protected final WorkflowDefinition definition;
30+
private final PersistenceExecutor executor;
31+
private final Map<EventRegistrationBuilder, String> id2RegMapping = new HashMap<>();
32+
33+
private int counter;
34+
35+
public AbstractAllStrategyCorrelationInfo(
36+
WorkflowDefinition definition, PersistenceExecutor executor) {
37+
this.definition = definition;
38+
this.executor = executor;
39+
}
40+
41+
@Override
42+
public void correlate(
43+
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter) {
44+
executor
45+
.execute(() -> operation(id2RegMapping.get(reg), event), definition)
46+
.thenAccept(events -> events.forEach(starter));
47+
}
48+
49+
@Override
50+
public void register(EventRegistrationBuilder reg) {
51+
id2RegMapping.put(reg, generateIdFromReg(reg));
52+
}
53+
54+
protected Collection<Collection<CloudEvent>> operation(
55+
PersistenceInstanceOperations operations, String reg, CloudEvent event) {
56+
return operations.correlateEvent(reg, event, id2RegMapping.values());
57+
}
58+
59+
protected abstract Collection<Collection<CloudEvent>> operation(String reg, CloudEvent event);
60+
61+
protected String generateIdFromReg(EventRegistrationBuilder reg) {
62+
final String separator = ":";
63+
return definition.id().toString(separator) + separator + ++counter;
64+
}
65+
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,24 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18-
import io.serverlessworkflow.impl.WorkflowContextData;
18+
import io.serverlessworkflow.impl.WorkflowDefinitionData;
1919
import java.util.Optional;
2020
import java.util.concurrent.CompletableFuture;
2121
import java.util.concurrent.ExecutorService;
22+
import java.util.function.Supplier;
2223

2324
public abstract class AbstractAsyncPersistenceExecutor implements PersistenceExecutor {
2425
@Override
25-
public CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context) {
26+
public CompletableFuture<Void> execute(Runnable runnable, WorkflowDefinitionData definition) {
27+
2628
return CompletableFuture.runAsync(
27-
runnable, executorService().orElse(context.definition().application().executorService()));
29+
runnable, executorService().orElse(definition.application().executorService()));
30+
}
31+
32+
@Override
33+
public <T> CompletableFuture<T> execute(Supplier<T> runnable, WorkflowDefinitionData definition) {
34+
return CompletableFuture.supplyAsync(
35+
runnable, executorService().orElse(definition.application().executorService()));
2836
}
2937

3038
protected abstract Optional<ExecutorService> executorService();

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,25 @@
1717

1818
import io.serverlessworkflow.impl.TaskContextData;
1919
import io.serverlessworkflow.impl.WorkflowContextData;
20+
import io.serverlessworkflow.impl.WorkflowMutableInstance;
2021
import io.serverlessworkflow.impl.WorkflowStatus;
22+
import java.util.Collection;
2123
import java.util.concurrent.CompletableFuture;
2224
import java.util.function.Consumer;
2325

2426
public abstract class AbstractPersistenceInstanceWriter implements PersistenceInstanceWriter {
2527

2628
@Override
2729
public CompletableFuture<Void> started(WorkflowContextData workflowContext) {
28-
return doStartInstance(t -> t.writeInstanceData(workflowContext), workflowContext);
30+
return doStartInstance(
31+
t -> {
32+
t.writeInstanceData(workflowContext);
33+
workflowContext
34+
.instanceData()
35+
.findMetadata(WorkflowMutableInstance.CLOUD_EVENT_IDS, Collection.class)
36+
.ifPresent(c -> t.removeCloudEvents(c));
37+
},
38+
workflowContext);
2939
}
3040

3141
@Override

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919
import io.serverlessworkflow.impl.WorkflowInstance;
2020
import java.util.Optional;
2121
import java.util.stream.Stream;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2224

2325
public class DefaultPersistenceInstanceReader extends AbstractPersistenceInstanceReader {
2426

27+
private static final Logger logger =
28+
LoggerFactory.getLogger(DefaultPersistenceInstanceReader.class);
29+
2530
private final PersistenceInstanceStore store;
2631

2732
protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) {
@@ -36,7 +41,11 @@ public Optional<WorkflowInstance> find(WorkflowDefinition definition, String ins
3641
transaction.commit(definition);
3742
return instance;
3843
} catch (Exception ex) {
39-
transaction.rollback(definition);
44+
try {
45+
transaction.rollback(definition);
46+
} catch (Exception rollEx) {
47+
logger.warn("Exception during rollback. Ignoring it", rollEx);
48+
}
4049
throw ex;
4150
}
4251
}

0 commit comments

Comments
 (0)