diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 489e60206..c4d4f11c3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -444,7 +444,8 @@ public WorkflowApplication build() { .orElseGet(() -> new DefaultCloudEventPredicateFactory()); } if (allStrategyCorrelationInfoFactory == null) { - allStrategyCorrelationInfoFactory = definition -> new InMemoryAllStrategyCorrelationInfo(); + allStrategyCorrelationInfoFactory = + definition -> InMemoryAllStrategyCorrelationInfo.instance(); } if (defaultCatalogURI == null) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index a1b92d047..11f8f0d8c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; public interface WorkflowInstance extends WorkflowInstanceData { CompletableFuture start(); @@ -49,4 +50,6 @@ public interface WorkflowInstance extends WorkflowInstanceData { boolean cancel(); boolean resume(); + + T addMetadataIfAbsent(String key, Supplier supplier); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java index f283f148f..daa5e4496 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.impl; import java.time.Instant; +import java.util.Optional; public interface WorkflowInstanceData { String id(); @@ -29,4 +30,6 @@ public interface WorkflowInstanceData { WorkflowStatus status(); WorkflowModel context(); + + Optional findMetadata(String key, Class objectClass); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index 8f67e83c2..027de7e4f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -350,9 +350,16 @@ public void addCancelable(CompletableFuture cancelable) { } } - public T additionalObject(String key, Supplier supplier) { + @Override + public T addMetadataIfAbsent(String key, Supplier supplier) { return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get()); } + @Override + public Optional findMetadata(String key, Class objectClass) { + Object value = additionalObjects.get(key); + return objectClass.isInstance(value) ? Optional.of(objectClass.cast(value)) : Optional.empty(); + } + public void restoreContext(WorkflowContext workflow, TaskContext context) {} } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java index e5f1646fb..c91914edd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractInputBuffer.java @@ -15,7 +15,10 @@ */ package io.serverlessworkflow.impl.marshaller; +import java.net.URI; import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -112,6 +115,12 @@ public Object readObject() { case CUSTOM: return readCustomObject(); + case URI: + return URI.create(readString()); + + case OFFSET_DATE_TIME: + return OffsetDateTime.ofInstant(readInstant(), ZoneOffset.of(readString())); + default: throw new IllegalStateException("Unsupported type " + type); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java index 6816b31fd..75fae0341 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/AbstractOutputBuffer.java @@ -16,7 +16,9 @@ package io.serverlessworkflow.impl.marshaller; import io.serverlessworkflow.impl.WorkflowModel; +import java.net.URI; import java.time.Instant; +import java.time.OffsetDateTime; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -100,9 +102,16 @@ public WorkflowOutputBuffer writeObject(Object object) { } else if (object instanceof Instant value) { writeType(Type.INSTANT); writeInstant(value); - } else if (object instanceof byte[] bytes) { + } else if (object instanceof OffsetDateTime value) { + writeType(Type.OFFSET_DATE_TIME); + writeInstant(value.toInstant()); + writeString(value.getOffset().toString()); + } else if (object instanceof URI value) { + writeType(Type.URI); + writeString(value.toString()); + } else if (object instanceof byte[] value) { writeType(Type.BYTES); - writeBytes(bytes); + writeBytes(value); } else { internalWriteObject(object); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java index d758eec18..347d682f8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/MarshallingUtils.java @@ -15,14 +15,21 @@ */ package io.serverlessworkflow.impl.marshaller; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import io.serverlessworkflow.impl.WorkflowModel; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; import java.lang.reflect.Modifier; +import java.net.URI; import java.time.Instant; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Function; import org.slf4j.Logger; @@ -71,6 +78,58 @@ public static byte[] writeString(WorkflowBufferFactory factory, String value) { return writeValue(factory, value, (b, v) -> b.writeString(v)); } + public static byte[] writeCloudEventExtensions(WorkflowBufferFactory factory, CloudEvent event) { + try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + WorkflowOutputBuffer out = factory.output(bytesOut)) { + writeCloudEventExtensions(out, event); + return bytesOut.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static void writeCloudEventExtensions(WorkflowOutputBuffer out, CloudEvent event) { + Set extensionNames = event.getExtensionNames(); + out.writeInt(extensionNames.size()); + for (String extensionName : extensionNames) { + out.writeString(extensionName); + out.writeObject(event.getExtension(extensionName)); + } + } + + public static CloudEventBuilder readCloudEventExtensions( + WorkflowBufferFactory factory, byte[] value, CloudEventBuilder builder) { + try (ByteArrayInputStream bytesInt = new ByteArrayInputStream(value); + WorkflowInputBuffer in = factory.input(bytesInt)) { + return readCloudEventExtenstions(in, value, builder); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static CloudEventBuilder readCloudEventExtenstions( + WorkflowInputBuffer in, byte[] value, CloudEventBuilder builder) { + int size = in.readInt(); + while (size-- > 0) { + String extensionName = in.readString(); + Object extensionValue = in.readObject(); + if (extensionValue instanceof Number extValue) { + builder.withExtension(extensionName, extValue); + } else if (extensionValue instanceof String extValue) { + builder.withExtension(extensionName, extValue); + } else if (extensionValue instanceof Boolean extValue) { + builder.withExtension(extensionName, extValue); + } else if (extensionValue instanceof byte[] extValue) { + builder.withExtension(extensionName, extValue); + } else if (extensionValue instanceof OffsetDateTime extValue) { + builder.withExtension(extensionName, extValue); + } else if (extensionValue instanceof URI extValue) { + builder.withExtension(extensionName, extValue); + } + } + return builder; + } + public static String readString(WorkflowBufferFactory factory, byte[] value) { return readValue(factory, value, WorkflowInputBuffer::readString); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java index cf52b7abb..b11570a69 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/marshaller/Type.java @@ -29,5 +29,7 @@ enum Type { MAP, COLLECTION, NULL, - CUSTOM + CUSTOM, + OFFSET_DATE_TIME, + URI } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java index e9e73f025..5400da5b2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/AllStrategyCorrelationInfo.java @@ -16,15 +16,22 @@ package io.serverlessworkflow.impl.scheduler; import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.events.EventRegistrationBuilder; import java.util.Collection; +import java.util.Map; import java.util.function.Consumer; public interface AllStrategyCorrelationInfo extends AutoCloseable { - void correlate( - EventRegistrationBuilder reg, CloudEvent event, Consumer> starter); - void register(EventRegistrationBuilder reg); + void init( + Collection reg, + Consumer> starter); + + void correlate(EventRegistrationBuilder reg, CloudEvent event); + + default void addMetadata( + WorkflowInstance instance, Map events) {} default void close() {} } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java index 0df825694..f7fcf85a0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java @@ -16,7 +16,7 @@ package io.serverlessworkflow.impl.scheduler; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowInstance; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -83,10 +83,10 @@ protected CronResolverIntanceRunner(WorkflowDefinition definition) { } @Override - public void accept(WorkflowModel model) { + public void accept(WorkflowInstance instance) { if (!cancelled.get()) { scheduleNext(); - super.accept(model); + super.accept(instance); } } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java index 4fcf7a9d1..540a94814 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/InMemoryAllStrategyCorrelationInfo.java @@ -26,38 +26,50 @@ public class InMemoryAllStrategyCorrelationInfo implements AllStrategyCorrelationInfo { + private static class InMemoryAllStrategyCorrelationInfoHolder { + private static InMemoryAllStrategyCorrelationInfo INSTANCE = + new InMemoryAllStrategyCorrelationInfo(); + } + + public static AllStrategyCorrelationInfo instance() { + return InMemoryAllStrategyCorrelationInfoHolder.INSTANCE; + } + + private InMemoryAllStrategyCorrelationInfo() {} + private Map> correlatedEvents; + private Consumer> starter; @Override - public void correlate( - EventRegistrationBuilder reg, CloudEvent event, Consumer> starter) { - Collection collection = new ArrayList<>(); + public void correlate(EventRegistrationBuilder reg, CloudEvent event) { + Map result = new HashMap<>(); // to minimize the critical section, conversion is done later, here we are // performing just collection, if any synchronized (correlatedEvents) { correlatedEvents.get(reg).add(event); - Collection> events = correlatedEvents.values(); - if (satisfyCondition(events)) { - for (List values : events) { - collection.add(values.remove(0)); + if (satisfyCondition(correlatedEvents)) { + for (java.util.Map.Entry> values : + correlatedEvents.entrySet()) { + result.put(values.getKey(), values.getValue().remove(0)); } } } - if (!collection.isEmpty()) { - starter.accept(collection); + if (!result.isEmpty()) { + starter.accept(result); } } @Override - public void register(EventRegistrationBuilder reg) { - if (correlatedEvents == null) { - correlatedEvents = new HashMap<>(); - } - correlatedEvents.put(reg, new ArrayList()); + public void init( + Collection regs, + Consumer> starter) { + correlatedEvents = new HashMap<>(); + this.starter = starter; + regs.forEach(reg -> correlatedEvents.put(reg, new ArrayList())); } - private boolean satisfyCondition(Collection> events) { - for (List values : events) { + private boolean satisfyCondition(Map> events) { + for (List values : events.values()) { if (values.isEmpty()) { return false; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java index 6443c4667..09c0931b2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java @@ -19,13 +19,16 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowModelCollection; import io.serverlessworkflow.impl.events.EventConsumer; import io.serverlessworkflow.impl.events.EventRegistration; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; import java.util.ArrayList; import java.util.Collection; +import java.util.Map; import java.util.function.Function; public class ScheduledEventConsumer implements AutoCloseable { @@ -53,19 +56,15 @@ public ScheduledEventConsumer( && builderInfo.registrations().registrations().size() > 1) { this.allStrategyCorrelationInfo = definition.application().allStrategyCorrelationInfoFactory().apply(definition); - builderInfo - .registrations() - .registrations() - .forEach( - reg -> { - allStrategyCorrelationInfo.register(reg); - registrations.add( - eventConsumer.register( - reg, - ce -> - allStrategyCorrelationInfo.correlate( - reg, (CloudEvent) ce, this::start))); - }); + Collection registrationBuilders = + builderInfo.registrations().registrations(); + allStrategyCorrelationInfo.init(registrationBuilders, this::start); + registrationBuilders.forEach( + reg -> { + registrations.add( + eventConsumer.register( + reg, ce -> allStrategyCorrelationInfo.correlate(reg, (CloudEvent) ce))); + }); } else { builderInfo .registrations() @@ -78,13 +77,15 @@ public ScheduledEventConsumer( protected void start(CloudEvent ce) { WorkflowModelCollection model = definition.application().modelFactory().createCollection(); model.add(converter.apply(ce)); - instanceRunner.accept(model); + instanceRunner.accept(definition.instance(model)); } - protected void start(Collection ces) { + protected void start(Map ces) { WorkflowModelCollection model = definition.application().modelFactory().createCollection(); - ces.forEach(ce -> model.add(converter.apply(ce))); - instanceRunner.accept(model); + ces.values().forEach(ce -> model.add(converter.apply(ce))); + WorkflowInstance instance = definition.instance(model); + allStrategyCorrelationInfo.addMetadata(instance, ces); + instanceRunner.accept(instance); } public void close() { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java index 9371e52f1..ef75af05d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledInstanceRunnable.java @@ -20,7 +20,7 @@ import io.serverlessworkflow.impl.WorkflowModel; import java.util.function.Consumer; -public class ScheduledInstanceRunnable implements Runnable, Consumer { +public class ScheduledInstanceRunnable implements Runnable, Consumer { protected final WorkflowDefinition definition; @@ -30,16 +30,20 @@ public ScheduledInstanceRunnable(WorkflowDefinition definition) { @Override public void run() { - accept(definition.application().modelFactory().fromNull()); + accept(definition.instance()); } @Override - public void accept(WorkflowModel model) { - runScheduledInstance(definition, model); + public void accept(WorkflowInstance instance) { + runScheduledInstance(definition, instance); } public static void runScheduledInstance(WorkflowDefinition definition, WorkflowModel model) { - WorkflowInstance instance = definition.instance(model); + runScheduledInstance(definition, definition.instance(model)); + } + + private static void runScheduledInstance( + WorkflowDefinition definition, WorkflowInstance instance) { definition.addScheduledInstance(instance); definition.application().executorService().execute(() -> instance.start()); } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java new file mode 100644 index 000000000..8e915313c --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAllStrategyCorrelationInfo.java @@ -0,0 +1,161 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfo; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractAllStrategyCorrelationInfo implements AllStrategyCorrelationInfo { + + protected final WorkflowDefinition definition; + private final PersistenceExecutor executor; + private final Map reg2IdMapping = new HashMap<>(); + private final Map id2RegMapping = new HashMap<>(); + + private static final Logger logger = + LoggerFactory.getLogger(AbstractAllStrategyCorrelationInfo.class); + + private int counter; + private CompletableFuture>> + completableFuture; + private Consumer> starter; + + public AbstractAllStrategyCorrelationInfo( + WorkflowDefinition definition, PersistenceExecutor executor) { + this.definition = definition; + this.executor = executor; + this.completableFuture = CompletableFuture.completedFuture(List.of()); + } + + @Override + public void correlate(EventRegistrationBuilder reg, CloudEvent event) { + queueCorrelation(operations -> eventAdded(operations, reg2IdMapping.get(reg), event), starter); + } + + @Override + public void init( + Collection regs, + Consumer> starter) { + regs.forEach( + reg -> { + String id = generateIdFromReg(reg); + id2RegMapping.put(id, reg); + reg2IdMapping.put(reg, id); + }); + this.starter = starter; + queueCorrelation(operations -> startupCheck(operations), starter); + } + + private void queueCorrelation( + Function>> + function, + Consumer> starter) { + synchronized (this) { + this.completableFuture = + completableFuture.thenCompose( + v -> executor.execute(() -> doTransaction(function), definition)); + completableFuture.thenAccept(events -> events.forEach(starter)); + } + } + + private Collection> eventAdded( + CorrelationOperations operations, String reg, CloudEvent event) { + logger.debug( + "Received event {} for definition {} and registration {}", event, definition.id(), reg); + operations.storeEvent(reg, event); + return checkCorrelation(operations); + } + + private Collection> startupCheck( + CorrelationOperations operations) { + operations.clearProcessed(); + return checkCorrelation(operations); + } + + private final Collection> checkCorrelation( + CorrelationOperations operations) { + Map> events = operations.retrieveEvents(id2RegMapping.keySet()); + logger.debug("Stored CloudEvents for definition {} are {}", definition.id(), events); + if (events.isEmpty()) { + return List.of(); + } + Collection> result = new ArrayList<>(); + boolean notDone = true; + while (notDone) { + Map row = new HashMap<>(); + for (Entry> item : events.entrySet()) { + List list = item.getValue(); + if (list.isEmpty()) { + notDone = false; + break; + } + CloudEvent retrieved = list.remove(0); + row.put(id2RegMapping.get(item.getKey()), retrieved); + } + if (notDone) { + result.add(row); + } + } + if (!result.isEmpty()) { + Map> processed = new HashMap<>(); + for (Map item : result) { + for (Entry entry : item.entrySet()) { + processed + .computeIfAbsent(reg2IdMapping.get(entry.getKey()), k -> new ArrayList<>()) + .add(entry.getValue().getId()); + } + } + operations.markAsProcessed(processed); + } + return result; + } + + public void addMetadata( + WorkflowInstance instance, Map events) { + logger.debug("Starting instance {} with events {}", instance.id(), events); + instance.addMetadataIfAbsent( + AbstractPersistenceInstanceWriter.CLOUD_EVENT_IDS, + () -> + events.entrySet().stream() + .collect( + Collectors.toMap( + e -> reg2IdMapping.get(e.getKey()), v -> v.getValue().getId()))); + } + + protected abstract Collection> doTransaction( + Function>> + function); + + protected String generateIdFromReg(EventRegistrationBuilder reg) { + final String separator = ":"; + return definition.id().toString(separator) + separator + ++counter; + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java index 55d44ee43..74ca8ef18 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java @@ -15,16 +15,24 @@ */ package io.serverlessworkflow.impl.persistence; -import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinitionData; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; public abstract class AbstractAsyncPersistenceExecutor implements PersistenceExecutor { @Override - public CompletableFuture execute(Runnable runnable, WorkflowContextData context) { + public CompletableFuture execute(Runnable runnable, WorkflowDefinitionData definition) { + return CompletableFuture.runAsync( - runnable, executorService().orElse(context.definition().application().executorService())); + runnable, executorService().orElse(definition.application().executorService())); + } + + @Override + public CompletableFuture execute(Supplier runnable, WorkflowDefinitionData definition) { + return CompletableFuture.supplyAsync( + runnable, executorService().orElse(definition.application().executorService())); } protected abstract Optional executorService(); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java index 17b760346..dea3e1848 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java @@ -18,14 +18,25 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; public abstract class AbstractPersistenceInstanceWriter implements PersistenceInstanceWriter { + public static final String CLOUD_EVENT_IDS = "CloudEventIds"; + @Override public CompletableFuture started(WorkflowContextData workflowContext) { - return doStartInstance(t -> t.writeInstanceData(workflowContext), workflowContext); + return doStartInstance( + t -> { + t.writeInstanceData(workflowContext); + workflowContext + .instanceData() + .findMetadata(CLOUD_EVENT_IDS, Map.class) + .ifPresent(c -> t.removeCloudEvents(c)); + }, + workflowContext); } @Override diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java new file mode 100644 index 000000000..7aeb7a8f9 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/CorrelationOperations.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.cloudevents.CloudEvent; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +interface CorrelationOperations { + + default Map> retrieveEvents(Collection targetRegIds) { + return Map.of(); + } + + default void storeEvent(String regId, CloudEvent event) {} + + default void markAsProcessed(Map> regCeIds) {} + + default void clearProcessed() {} + + default void removeCloudEvents(Map ids) {} +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java index 1420b1917..29731f0f7 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -19,9 +19,14 @@ import io.serverlessworkflow.impl.WorkflowInstance; import java.util.Optional; import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DefaultPersistenceInstanceReader extends AbstractPersistenceInstanceReader { + private static final Logger logger = + LoggerFactory.getLogger(DefaultPersistenceInstanceReader.class); + private final PersistenceInstanceStore store; protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) { @@ -36,7 +41,11 @@ public Optional find(WorkflowDefinition definition, String ins transaction.commit(definition); return instance; } catch (Exception ex) { - transaction.rollback(definition); + try { + transaction.rollback(definition); + } catch (Exception rollEx) { + logger.warn("Exception during rollback. Ignoring it", rollEx); + } throw ex; } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java new file mode 100644 index 000000000..1b5a77b5d --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfo.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; + +class OperationAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo { + + private final PersistenceInstanceOperations operations; + + public OperationAllStrategyCorrelationInfo( + WorkflowDefinition definition, + PersistenceExecutor executor, + PersistenceInstanceOperations operations) { + super(definition, executor); + this.operations = operations; + } + + @Override + protected Collection> doTransaction( + Function>> + function) { + return function.apply(operations); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfoFactory.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfoFactory.java new file mode 100644 index 000000000..3b829b07e --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/OperationAllStrategyCorrelationInfoFactory.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfo; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory; + +public class OperationAllStrategyCorrelationInfoFactory + implements AllStrategyCorrelationInfoFactory { + + private final PersistenceExecutor executor; + private final PersistenceInstanceOperations operations; + + public OperationAllStrategyCorrelationInfoFactory( + PersistenceExecutor executor, PersistenceInstanceOperations operations) { + this.executor = executor; + this.operations = operations; + } + + @Override + public AllStrategyCorrelationInfo apply(WorkflowDefinition definition) { + return new OperationAllStrategyCorrelationInfo(definition, executor, operations); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceAllStrategyCorrelationInfoFactories.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceAllStrategyCorrelationInfoFactories.java new file mode 100644 index 000000000..7929aabaa --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceAllStrategyCorrelationInfoFactories.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory; + +public class PersistenceAllStrategyCorrelationInfoFactories { + + private PersistenceAllStrategyCorrelationInfoFactories() {} + + public static AllStrategyCorrelationInfoFactory from( + PersistenceExecutor executor, PersistenceInstanceStore store) { + return new StoreAllStrategyCorrelationInfoFactory(executor, store); + } + + public static AllStrategyCorrelationInfoFactory from( + PersistenceExecutor executor, PersistenceInstanceOperations operations) { + return new OperationAllStrategyCorrelationInfoFactory(executor, operations); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceApplicationBuilder.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceApplicationBuilder.java index a0e6c8450..954f38c82 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceApplicationBuilder.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceApplicationBuilder.java @@ -25,6 +25,11 @@ public static PersistenceApplicationBuilder builder( return new PersistenceApplicationBuilder(builder, writer); } + public static PersistenceApplicationBuilder builder( + WorkflowApplication.Builder builder, PersistenceInstanceHandlers handler) { + return new PersistenceApplicationBuilder(builder, handler.writer()); + } + private final WorkflowApplication.Builder appBuilder; protected PersistenceApplicationBuilder(Builder appBuilder, PersistenceInstanceWriter writer) { diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java index 8d7703764..879bd52bb 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceExecutor.java @@ -15,11 +15,14 @@ */ package io.serverlessworkflow.impl.persistence; -import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinitionData; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; public interface PersistenceExecutor extends AutoCloseable { - CompletableFuture execute(Runnable runnable, WorkflowContextData context); + CompletableFuture execute(Supplier runnable, WorkflowDefinitionData definition); + + CompletableFuture execute(Runnable runnable, WorkflowDefinitionData definition); default void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceOperations.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceOperations.java index 264831b62..a4aebe2fb 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceOperations.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceOperations.java @@ -22,7 +22,7 @@ import java.util.Optional; import java.util.stream.Stream; -public interface PersistenceInstanceOperations { +public interface PersistenceInstanceOperations extends CorrelationOperations { void writeInstanceData(WorkflowContextData workflowContext); void writeRetryTask(WorkflowContextData workflowContext, TaskContextData taskContext); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfo.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfo.java new file mode 100644 index 000000000..3a148a249 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfo.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class StoreAllStrategyCorrelationInfo extends AbstractAllStrategyCorrelationInfo { + + private static final Logger logger = + LoggerFactory.getLogger(StoreAllStrategyCorrelationInfo.class); + + private final PersistenceInstanceStore store; + + public StoreAllStrategyCorrelationInfo( + WorkflowDefinition definition, PersistenceExecutor executor, PersistenceInstanceStore store) { + super(definition, executor); + this.store = store; + } + + @Override + protected Collection> doTransaction( + Function>> + function) { + PersistenceInstanceTransaction transaction = store.begin(); + Collection> result; + try { + result = function.apply(transaction); + transaction.commit(definition); + } catch (Exception ex) { + try { + transaction.rollback(definition); + } catch (Exception rollEx) { + logger.warn("Exception during rollback. Ignoring it", rollEx); + } + throw ex; + } + return result; + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfoFactory.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfoFactory.java new file mode 100644 index 000000000..a0ae93ca7 --- /dev/null +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/StoreAllStrategyCorrelationInfoFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfo; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory; + +class StoreAllStrategyCorrelationInfoFactory implements AllStrategyCorrelationInfoFactory { + + private final PersistenceExecutor executor; + private final PersistenceInstanceStore store; + + public StoreAllStrategyCorrelationInfoFactory( + PersistenceExecutor executor, PersistenceInstanceStore store) { + this.executor = executor; + this.store = store; + } + + @Override + public AllStrategyCorrelationInfo apply(WorkflowDefinition definition) { + return new StoreAllStrategyCorrelationInfo(definition, executor, store); + } +} diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java index 9495aeb41..67650c482 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/TransactedPersistenceInstanceWriter.java @@ -27,21 +27,21 @@ public abstract class TransactedPersistenceInstanceWriter protected CompletableFuture doTransaction( Consumer operation, WorkflowContextData context) { return persistenceExecutor() - .execute(() -> doTransaction(operation, context.definition()), context); + .execute(() -> doTransaction(operation, context.definition()), context.definition()); } @Override protected CompletableFuture doStartInstance( Consumer operation, WorkflowContextData context) { return persistenceExecutor() - .execute(() -> doTransaction(operation, context.definition()), context); + .execute(() -> doTransaction(operation, context.definition()), context.definition()); } @Override protected CompletableFuture doCompleteInstance( Consumer operation, WorkflowContextData context) { return persistenceExecutor() - .execute(() -> doTransaction(operation, context.definition()), context); + .execute(() -> doTransaction(operation, context.definition()), context.definition()); } protected abstract void doTransaction( diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java index 949f5fe71..e76acaa9b 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BigMapInstanceTransaction.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.impl.persistence.bigmap; +import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; @@ -26,13 +27,17 @@ import io.serverlessworkflow.impl.persistence.PersistenceInstanceTransaction; import io.serverlessworkflow.impl.persistence.PersistenceTaskInfo; import io.serverlessworkflow.impl.persistence.PersistenceWorkflowInfo; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; -public abstract class BigMapInstanceTransaction +public abstract class BigMapInstanceTransaction implements PersistenceInstanceTransaction { @Override @@ -108,6 +113,42 @@ public void clearStatus(WorkflowContextData workflowContext) { clearStatus(workflowContext.definition(), key(workflowContext)); } + public Map> retrieveEvents(Collection targetRegIds) { + Map> result = new HashMap<>(); + targetRegIds.forEach( + regId -> { + Map processedCes = processedCloudEvents(regId); + Map ces = cloudEvents(regId); + result.put( + regId, + ces.values().stream() + .map(this::unmarshallCloudEvent) + .filter(ce -> !processedCes.containsKey(ce.getId())) + .collect(Collectors.toCollection(ArrayList::new))); + }); + return result; + } + + public void storeEvent(String regId, CloudEvent event) { + cloudEvents(regId).put(event.getId(), marshallCloudEvent(event)); + } + + public void markAsProcessed(Map> regCeIds) { + regCeIds.forEach( + (k, v) -> { + Map processed = processedCloudEvents(k); + v.forEach(id -> processed.put(id, processedValue())); + }); + } + + public void clearProcessed() { + deleteAllProcessedMaps(); + } + + public void removeCloudEvents(Map ids) { + ids.forEach((k, v) -> cloudEvents(k).remove(v)); + } + private void clearStatus(WorkflowDefinitionData definition, String key) { status(definition).remove(key); } @@ -133,10 +174,20 @@ private String key(WorkflowContextData workflowContext) { protected abstract Map instanceData(WorkflowDefinitionData definition); - protected abstract Map status(WorkflowDefinitionData workflowContext); + protected abstract Map status(WorkflowDefinitionData definition); protected abstract Map tasks(String instanceId); + protected abstract Map cloudEvents(String regId); + + protected abstract Map processedCloudEvents(String regId); + + protected abstract C marshallCloudEvent(CloudEvent event); + + protected abstract CloudEvent unmarshallCloudEvent(C eventData); + + protected abstract P processedValue(); + protected abstract V marshallInstance(WorkflowInstanceData instance); protected abstract T marshallTaskCompleted( @@ -158,4 +209,6 @@ protected abstract T marshallTaskRetried( protected abstract String unmarshallApplicationId(A a); protected abstract void removeTasks(String key); + + protected abstract void deleteAllProcessedMaps(); } diff --git a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java index 3435550a8..ae8be0a41 100644 --- a/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java +++ b/impl/persistence/bigmap/src/main/java/io/serverlessworkflow/impl/persistence/bigmap/BytesMapInstanceTransaction.java @@ -15,6 +15,9 @@ */ package io.serverlessworkflow.impl.persistence.bigmap; +import io.cloudevents.CloudEvent; +import io.cloudevents.SpecVersion; +import io.cloudevents.core.builder.CloudEventBuilder; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContextData; import io.serverlessworkflow.impl.WorkflowInstanceData; @@ -33,13 +36,17 @@ import io.serverlessworkflow.impl.persistence.RetriedTaskInfo; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; public abstract class BytesMapInstanceTransaction - extends BigMapInstanceTransaction { + extends BigMapInstanceTransaction { private static final byte VERSION_0 = 0; private static final byte VERSION_1 = 1; private static final byte VERSION_2 = 2; + private static final byte[] PROCESSED_VALUE = new byte[] {1}; private final WorkflowBufferFactory factory; @@ -49,8 +56,9 @@ protected BytesMapInstanceTransaction(WorkflowBufferFactory factory) { @Override protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskContext taskContext) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (WorkflowOutputBuffer writer = factory.output(bytes)) { + + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + WorkflowOutputBuffer writer = factory.output(bytes)) { writer.writeByte(VERSION_2); writer.writeEnum(TaskStatus.COMPLETED); writer.writeInstant(taskContext.completedAt()); @@ -66,29 +74,36 @@ protected byte[] marshallTaskCompleted(WorkflowContextData contextData, TaskCont writer.writeString(next.position().jsonPointer()); } writer.writeInt(taskContext.iteration()); + return bytes.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); } - return bytes.toByteArray(); } @Override protected byte[] marshallStatus(WorkflowStatus status) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (WorkflowOutputBuffer writer = factory.output(bytes)) { + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + WorkflowOutputBuffer writer = factory.output(bytes)) { writer.writeByte(VERSION_0); writer.writeEnum(status); + return bytes.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); } - return bytes.toByteArray(); } @Override protected byte[] marshallInstance(WorkflowInstanceData instance) { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - try (WorkflowOutputBuffer writer = factory.output(bytes)) { + + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + WorkflowOutputBuffer writer = factory.output(bytes)) { writer.writeByte(VERSION_0); writer.writeInstant(instance.startedAt()); writeModel(writer, instance.input()); + return bytes.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); } - return bytes.toByteArray(); } protected void writeModel(WorkflowOutputBuffer writer, WorkflowModel model) { @@ -183,4 +198,45 @@ protected WorkflowStatus unmarshallStatus(byte[] statusData) { return buffer.readEnum(WorkflowStatus.class); } } + + protected byte[] marshallCloudEvent(CloudEvent event) { + try (ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + WorkflowOutputBuffer writer = factory.output(bytes)) { + writer.writeEnum(event.getSpecVersion()); + writer.writeString(event.getId()); + writer.writeString(event.getType()); + writer.writeString(event.getSource().toString()); + writer.writeObject(event.getSubject()); + writer.writeObject(event.getDataSchema()); + writer.writeObject(event.getDataContentType()); + writer.writeObject(event.getData() == null ? null : event.getData().toBytes()); + MarshallingUtils.writeCloudEventExtensions(writer, event); + return bytes.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected CloudEvent unmarshallCloudEvent(byte[] eventData) { + try (ByteArrayInputStream bytes = new ByteArrayInputStream(eventData); + WorkflowInputBuffer reader = factory.input(bytes)) { + CloudEventBuilder builder = + CloudEventBuilder.fromSpecVersion(reader.readEnum(SpecVersion.class)); + builder.withId(reader.readString()); + builder.withType(reader.readString()); + builder.withSource(URI.create(reader.readString())); + builder.withSubject((String) reader.readObject()); + builder.withDataSchema((URI) reader.readObject()); + builder.withDataContentType((String) reader.readObject()); + builder.withData((byte[]) reader.readObject()); + MarshallingUtils.readCloudEventExtenstions(reader, eventData, builder); + return builder.build(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected byte[] processedValue() { + return PROCESSED_VALUE; + } } diff --git a/impl/persistence/mvstore/pom.xml b/impl/persistence/mvstore/pom.xml index ba8ce88c9..b6f45c7f9 100644 --- a/impl/persistence/mvstore/pom.xml +++ b/impl/persistence/mvstore/pom.xml @@ -20,5 +20,10 @@ io.serverlessworkflow serverlessworkflow-persistence-tests + + ch.qos.logback + logback-classic + test + \ No newline at end of file diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java index 6add6a994..298115a7c 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStorePersistenceStore.java @@ -23,7 +23,8 @@ import org.h2.mvstore.tx.TransactionStore; public class MVStorePersistenceStore implements PersistenceInstanceStore { - private final TransactionStore mvStore; + private final TransactionStore transactionStore; + private final MVStore mvStore; private WorkflowBufferFactory factory; public MVStorePersistenceStore(String dbName) { @@ -31,7 +32,8 @@ public MVStorePersistenceStore(String dbName) { } public MVStorePersistenceStore(String dbName, WorkflowBufferFactory factory) { - this.mvStore = new TransactionStore(MVStore.open(dbName)); + this.mvStore = MVStore.open(dbName); + this.transactionStore = new TransactionStore(mvStore); this.factory = factory; } @@ -41,7 +43,7 @@ public void close() { } @Override - public BigMapInstanceTransaction begin() { - return new MVStoreTransaction(mvStore.begin(), factory); + public BigMapInstanceTransaction begin() { + return new MVStoreTransaction(mvStore, transactionStore.begin(), factory); } } diff --git a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java index c54c80b40..aa2c0c83b 100644 --- a/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java +++ b/impl/persistence/mvstore/src/main/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreTransaction.java @@ -21,18 +21,22 @@ import io.serverlessworkflow.impl.marshaller.WorkflowBufferFactory; import io.serverlessworkflow.impl.persistence.bigmap.BytesMapInstanceTransaction; import java.util.Map; +import org.h2.mvstore.MVStore; import org.h2.mvstore.tx.Transaction; import org.h2.mvstore.tx.TransactionMap; public class MVStoreTransaction extends BytesMapInstanceTransaction { protected static final String ID_SEPARATOR = "-"; + private static final String PROCESSED_PREFIX = "PROCESSED" + ID_SEPARATOR; private final Transaction transaction; + private final MVStore store; - public MVStoreTransaction(Transaction transaction, WorkflowBufferFactory factory) { + public MVStoreTransaction(MVStore store, Transaction transaction, WorkflowBufferFactory factory) { super(factory); this.transaction = transaction; + this.store = store; } protected static String identifier(Workflow workflow, String sep) { @@ -87,4 +91,21 @@ public void rollback(WorkflowDefinitionData definition) { protected Map applicationData() { return transaction.openMap("APPLICATION"); } + + @Override + protected Map cloudEvents(String regId) { + return transaction.openMap("CLOUDEVENTS" + ID_SEPARATOR + regId); + } + + @Override + protected Map processedCloudEvents(String regId) { + return transaction.openMap(PROCESSED_PREFIX + regId); + } + + @Override + protected void deleteAllProcessedMaps() { + store.getMapNames().stream() + .filter(s -> s.startsWith(PROCESSED_PREFIX)) + .forEach(s -> transaction.removeMap(transaction.openMap(s))); + } } diff --git a/impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreCorrelationTest.java b/impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreCorrelationTest.java new file mode 100644 index 000000000..62a28c1b3 --- /dev/null +++ b/impl/persistence/mvstore/src/test/java/io/serverlessworkflow/impl/persistence/mvstore/MVStoreCorrelationTest.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.mvstore; + +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; +import io.serverlessworkflow.impl.persistence.test.AbstractStoreCorrelationPersistenceTest; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.AfterEach; + +public class MVStoreCorrelationTest extends AbstractStoreCorrelationPersistenceTest { + + private static final String DB_NAME = "dbtest.db"; + + @Override + protected PersistenceInstanceStore persistenceStore() { + return new MVStorePersistenceStore(DB_NAME); + } + + @AfterEach + void destroy() throws IOException { + Files.delete(Path.of(DB_NAME)); + } +} diff --git a/impl/persistence/tests/pom.xml b/impl/persistence/tests/pom.xml index 5f9141f15..636ee1e0d 100644 --- a/impl/persistence/tests/pom.xml +++ b/impl/persistence/tests/pom.xml @@ -28,6 +28,11 @@ junit-jupiter-params compile + + org.awaitility + awaitility + compile + org.assertj assertj-core diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractCorrelationPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractCorrelationPersistenceTest.java new file mode 100644 index 000000000..4b4ea53ed --- /dev/null +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractCorrelationPersistenceTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.InMemoryEvents; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import io.serverlessworkflow.impl.persistence.AsyncPersistenceExecutor; +import io.serverlessworkflow.impl.persistence.PersistenceExecutor; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public abstract class AbstractCorrelationPersistenceTest { + + private WorkflowApplication app; + private WorkflowDefinition definition; + + private InMemoryEvents inMemoryEvents; + + @BeforeEach + void setup() throws IOException { + inMemoryEvents = new InMemoryEvents(); + app = + WorkflowApplication.builder() + .withEventConsumer(inMemoryEvents) + .withEventPublisher(inMemoryEvents) + .withAllStrategyCorrelationInfoFactory(getAllStrategyCorrelationInfoFactory()) + .build(); + definition = app.workflowDefinition(readWorkflowFromClasspath("listen-start-all.yaml")); + } + + protected abstract AllStrategyCorrelationInfoFactory getAllStrategyCorrelationInfoFactory(); + + protected PersistenceExecutor persistenceExecutor() { + return new AsyncPersistenceExecutor(); + } + + @Test + void testAllStrategy() { + Collection instances = definition.scheduledInstances(); + inMemoryEvents.publish(buildCloudEvent(Map.of("name", "Javierito"))); + inMemoryEvents.publish(buildCloudEvent(Map.of("name", "Fulanito"))); + await() + .pollDelay(Duration.ofMillis(50)) + .atMost(Duration.ofSeconds(maxSeconds2Wait())) + .until( + () -> + instances.stream().filter(i -> i.status() == WorkflowStatus.COMPLETED).count() + == 1); + assertThat((Collection) assertThat(instances).singleElement().actual().output().asJavaObject()) + .containsExactlyInAnyOrder("Javierito", "Fulanito"); + } + + private static int idCounter; + + private static CloudEvent buildCloudEvent(Object data) { + return CloudEventBuilder.v1() + .withId(Integer.toString(++idCounter)) + .withType("com.example.hospital.events.patients.recover") + .withSource(URI.create("http://www.fakejavieritotest.com")) + .withData(JsonCloudEventData.wrap(JsonUtils.fromValue(data))) + .build(); + } + + protected int maxSeconds2Wait() { + return 2; + } + + @AfterEach + void close() { + app.close(); + } +} diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractOperationsCorrelationPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractOperationsCorrelationPersistenceTest.java new file mode 100644 index 000000000..57699082f --- /dev/null +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractOperationsCorrelationPersistenceTest.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.test; + +import io.serverlessworkflow.impl.persistence.PersistenceAllStrategyCorrelationInfoFactories; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceOperations; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory; + +public abstract class AbstractOperationsCorrelationPersistenceTest + extends AbstractCorrelationPersistenceTest { + + @Override + protected AllStrategyCorrelationInfoFactory getAllStrategyCorrelationInfoFactory() { + return PersistenceAllStrategyCorrelationInfoFactories.from( + persistenceExecutor(), persistenceOperations()); + } + + protected abstract PersistenceInstanceOperations persistenceOperations(); +} diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractStoreCorrelationPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractStoreCorrelationPersistenceTest.java new file mode 100644 index 000000000..80c479bd3 --- /dev/null +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractStoreCorrelationPersistenceTest.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.persistence.test; + +import io.serverlessworkflow.impl.persistence.PersistenceAllStrategyCorrelationInfoFactories; +import io.serverlessworkflow.impl.persistence.PersistenceInstanceStore; +import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory; + +public abstract class AbstractStoreCorrelationPersistenceTest + extends AbstractCorrelationPersistenceTest { + + @Override + protected AllStrategyCorrelationInfoFactory getAllStrategyCorrelationInfoFactory() { + return PersistenceAllStrategyCorrelationInfoFactories.from( + persistenceExecutor(), persistenceStore()); + } + + protected abstract PersistenceInstanceStore persistenceStore(); +} diff --git a/impl/persistence/tests/src/main/resources/listen-start-all.yaml b/impl/persistence/tests/src/main/resources/listen-start-all.yaml new file mode 100644 index 000000000..77a44548d --- /dev/null +++ b/impl/persistence/tests/src/main/resources/listen-start-all.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.1' + namespace: test + name: event-driven-schedule-all + version: '0.1.0' +schedule: + on: + all: + - with: + type: com.example.hospital.events.patients.recover + data: ${.name == "Javierito"} + - with: + type: com.example.hospital.events.patients.recover + data: ${.name == "Fulanito"} +do: + - recovered: + set: ${[$workflow.input[]|.data.name]} + \ No newline at end of file