Skip to content

Commit da9c2a2

Browse files
committed
[Fix #1395] persistence helper classes for allcorrelation
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent ad1eb92 commit da9c2a2

30 files changed

Lines changed: 663 additions & 38 deletions

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.util.concurrent.CompletableFuture;
19+
import java.util.function.Supplier;
1920

2021
public interface WorkflowInstance extends WorkflowInstanceData {
2122
CompletableFuture<WorkflowModel> start();
@@ -49,4 +50,6 @@ public interface WorkflowInstance extends WorkflowInstanceData {
4950
boolean cancel();
5051

5152
boolean resume();
53+
54+
<T> T addMetadataIfAbsent(String key, Supplier<T> supplier);
5255
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.serverlessworkflow.impl;
1717

1818
import java.time.Instant;
19+
import java.util.Optional;
1920

2021
public interface WorkflowInstanceData {
2122
String id();
@@ -29,4 +30,6 @@ public interface WorkflowInstanceData {
2930
WorkflowStatus status();
3031

3132
WorkflowModel context();
33+
34+
<T> Optional<T> findMetadata(String key, Class<T> objectClass);
3235
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,16 @@ public void addCancelable(CompletableFuture<?> cancelable) {
350350
}
351351
}
352352

353-
public <T> T additionalObject(String key, Supplier<T> supplier) {
353+
@Override
354+
public <T> T addMetadataIfAbsent(String key, Supplier<T> supplier) {
354355
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
355356
}
356357

358+
@Override
359+
public <T> Optional<T> findMetadata(String key, Class<T> objectClass) {
360+
Object value = additionalObjects.get(key);
361+
return objectClass.isInstance(value) ? Optional.of(objectClass.cast(value)) : Optional.empty();
362+
}
363+
357364
public void restoreContext(WorkflowContext workflow, TaskContext context) {}
358365
}

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package io.serverlessworkflow.impl.marshaller;
1717

18+
import java.net.URI;
1819
import java.time.Instant;
20+
import java.time.OffsetDateTime;
1921
import java.util.ArrayList;
2022
import java.util.Collection;
2123
import java.util.LinkedHashMap;
@@ -112,6 +114,12 @@ public Object readObject() {
112114
case CUSTOM:
113115
return readCustomObject();
114116

117+
case URI:
118+
return URI.create(readString());
119+
120+
case OFFSET_DATE_TIME:
121+
return OffsetDateTime.from(readInstant());
122+
115123
default:
116124
throw new IllegalStateException("Unsupported type " + type);
117125
}

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package io.serverlessworkflow.impl.marshaller;
1717

1818
import io.serverlessworkflow.impl.WorkflowModel;
19+
import java.net.URI;
1920
import java.time.Instant;
21+
import java.time.OffsetDateTime;
2022
import java.util.Collection;
2123
import java.util.Map;
2224
import java.util.concurrent.ConcurrentHashMap;
@@ -100,6 +102,12 @@ public WorkflowOutputBuffer writeObject(Object object) {
100102
} else if (object instanceof Instant value) {
101103
writeType(Type.INSTANT);
102104
writeInstant(value);
105+
} else if (object instanceof OffsetDateTime time) {
106+
writeType(Type.OFFSET_DATE_TIME);
107+
writeInstant(time.toInstant());
108+
} else if (object instanceof URI uri) {
109+
writeType(Type.URI);
110+
writeString(uri.toString());
103111
} else if (object instanceof byte[] bytes) {
104112
writeType(Type.BYTES);
105113
writeBytes(bytes);

impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,7 @@ enum Type {
2929
MAP,
3030
COLLECTION,
3131
NULL,
32-
CUSTOM
32+
CUSTOM,
33+
OFFSET_DATE_TIME,
34+
URI
3335
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,21 @@
1616
package io.serverlessworkflow.impl.scheduler;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
1920
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
20-
import java.util.Collection;
21+
import java.util.Map;
2122
import java.util.function.Consumer;
2223

2324
public interface AllStrategyCorrelationInfo extends AutoCloseable {
2425
void correlate(
25-
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter);
26+
EventRegistrationBuilder reg,
27+
CloudEvent event,
28+
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter);
2629

2730
void register(EventRegistrationBuilder reg);
2831

32+
default void addMetadata(
33+
WorkflowInstance instance, Map<EventRegistrationBuilder, CloudEvent> events) {}
34+
2935
default void close() {}
3036
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package io.serverlessworkflow.impl.scheduler;
1717

1818
import io.serverlessworkflow.impl.WorkflowDefinition;
19-
import io.serverlessworkflow.impl.WorkflowModel;
19+
import io.serverlessworkflow.impl.WorkflowInstance;
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.ScheduledExecutorService;
2222
import java.util.concurrent.ScheduledFuture;
@@ -83,10 +83,10 @@ protected CronResolverIntanceRunner(WorkflowDefinition definition) {
8383
}
8484

8585
@Override
86-
public void accept(WorkflowModel model) {
86+
public void accept(WorkflowInstance instance) {
8787
if (!cancelled.get()) {
8888
scheduleNext();
89-
super.accept(model);
89+
super.accept(instance);
9090
}
9191
}
9292
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
2020
import java.util.ArrayList;
21-
import java.util.Collection;
2221
import java.util.HashMap;
2322
import java.util.List;
2423
import java.util.Map;
@@ -30,21 +29,23 @@ public class InMemoryAllStrategyCorrelationInfo implements AllStrategyCorrelatio
3029

3130
@Override
3231
public void correlate(
33-
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter) {
34-
Collection<CloudEvent> collection = new ArrayList<>();
32+
EventRegistrationBuilder reg,
33+
CloudEvent event,
34+
Consumer<Map<EventRegistrationBuilder, CloudEvent>> starter) {
35+
Map<EventRegistrationBuilder, CloudEvent> result = new HashMap<>();
3536
// to minimize the critical section, conversion is done later, here we are
3637
// performing just collection, if any
3738
synchronized (correlatedEvents) {
3839
correlatedEvents.get(reg).add(event);
39-
Collection<List<CloudEvent>> events = correlatedEvents.values();
40-
if (satisfyCondition(events)) {
41-
for (List<CloudEvent> values : events) {
42-
collection.add(values.remove(0));
40+
if (satisfyCondition(correlatedEvents)) {
41+
for (java.util.Map.Entry<EventRegistrationBuilder, List<CloudEvent>> values :
42+
correlatedEvents.entrySet()) {
43+
result.put(values.getKey(), values.getValue().remove(0));
4344
}
4445
}
4546
}
46-
if (!collection.isEmpty()) {
47-
starter.accept(collection);
47+
if (!result.isEmpty()) {
48+
starter.accept(result);
4849
}
4950
}
5051

@@ -56,8 +57,8 @@ public void register(EventRegistrationBuilder reg) {
5657
correlatedEvents.put(reg, new ArrayList<CloudEvent>());
5758
}
5859

59-
private boolean satisfyCondition(Collection<List<CloudEvent>> events) {
60-
for (List<CloudEvent> values : events) {
60+
private boolean satisfyCondition(Map<EventRegistrationBuilder, List<CloudEvent>> events) {
61+
for (List<CloudEvent> values : events.values()) {
6162
if (values.isEmpty()) {
6263
return false;
6364
}

impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919

2020
import io.cloudevents.CloudEvent;
2121
import io.serverlessworkflow.impl.WorkflowDefinition;
22+
import io.serverlessworkflow.impl.WorkflowInstance;
2223
import io.serverlessworkflow.impl.WorkflowModel;
2324
import io.serverlessworkflow.impl.WorkflowModelCollection;
2425
import io.serverlessworkflow.impl.events.EventConsumer;
2526
import io.serverlessworkflow.impl.events.EventRegistration;
27+
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
2628
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
2729
import java.util.ArrayList;
2830
import java.util.Collection;
31+
import java.util.Map;
2932
import java.util.function.Function;
3033

3134
public class ScheduledEventConsumer implements AutoCloseable {
@@ -78,13 +81,15 @@ public ScheduledEventConsumer(
7881
protected void start(CloudEvent ce) {
7982
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
8083
model.add(converter.apply(ce));
81-
instanceRunner.accept(model);
84+
instanceRunner.accept(definition.instance(model));
8285
}
8386

84-
protected void start(Collection<CloudEvent> ces) {
87+
protected void start(Map<EventRegistrationBuilder, CloudEvent> ces) {
8588
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
86-
ces.forEach(ce -> model.add(converter.apply(ce)));
87-
instanceRunner.accept(model);
89+
ces.values().forEach(ce -> model.add(converter.apply(ce)));
90+
WorkflowInstance instance = definition.instance(model);
91+
allStrategyCorrelationInfo.addMetadata(instance, ces);
92+
instanceRunner.accept(instance);
8893
}
8994

9095
public void close() {

0 commit comments

Comments
 (0)