Skip to content

Commit 42e89a8

Browse files
Fix #1275 - Improve CloudEvent and CloudEventData handling (#1282)
* Fix workflow test Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Enable any type of collection cast on *ModelCollection Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Add tests for jacksonModelFactory Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Incorporate review comments Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Fix sync block Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Isolating as method on utils; refactor ObjectMapperFactoryProvider to lazy init mapper Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> * Adding missing check Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com> --------- Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com>
1 parent 20873ad commit 42e89a8

17 files changed

Lines changed: 829 additions & 121 deletions

File tree

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
import io.serverlessworkflow.api.types.func.ContextPredicate;
2121
import io.serverlessworkflow.api.types.func.EventDataPredicate;
2222
import io.serverlessworkflow.api.types.func.FilterPredicate;
23+
import io.serverlessworkflow.api.types.func.TypedContextPredicate;
24+
import io.serverlessworkflow.api.types.func.TypedFilterPredicate;
25+
import io.serverlessworkflow.api.types.func.TypedPredicate;
2326
import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder;
27+
import io.serverlessworkflow.impl.events.DefaultCloudEventPredicate;
2428
import java.util.function.Predicate;
2529

2630
public class FuncEventFilterPropertiesBuilder
@@ -50,20 +54,23 @@ public FuncEventFilterPropertiesBuilder data(FilterPredicate<CloudEventData> pre
5054
}
5155

5256
public FuncEventFilterPropertiesBuilder envelope(Predicate<CloudEvent> predicate) {
53-
this.eventProperties.setData(
54-
new EventDataPredicate().withPredicate(predicate, CloudEvent.class));
57+
this.eventProperties.setAdditionalProperty(
58+
DefaultCloudEventPredicate.ENVELOPE_PREDICATE,
59+
new TypedPredicate<>(predicate, CloudEvent.class));
5560
return this;
5661
}
5762

5863
public FuncEventFilterPropertiesBuilder envelope(ContextPredicate<CloudEvent> predicate) {
59-
this.eventProperties.setData(
60-
new EventDataPredicate().withPredicate(predicate, CloudEvent.class));
64+
this.eventProperties.setAdditionalProperty(
65+
DefaultCloudEventPredicate.ENVELOPE_PREDICATE,
66+
new TypedContextPredicate<>(predicate, CloudEvent.class));
6167
return this;
6268
}
6369

6470
public FuncEventFilterPropertiesBuilder envelope(FilterPredicate<CloudEvent> predicate) {
65-
this.eventProperties.setData(
66-
new EventDataPredicate().withPredicate(predicate, CloudEvent.class));
71+
this.eventProperties.setAdditionalProperty(
72+
DefaultCloudEventPredicate.ENVELOPE_PREDICATE,
73+
new TypedFilterPredicate<>(predicate, CloudEvent.class));
6774
return this;
6875
}
6976
}

experimental/lambda/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<groupId>io.serverlessworkflow</groupId>
2222
<artifactId>serverlessworkflow-impl-core</artifactId>
2323
</dependency>
24-
<dependency>
24+
<dependency>
2525
<groupId>io.serverlessworkflow</groupId>
2626
<artifactId>serverlessworkflow-impl-jq</artifactId>
2727
</dependency>
@@ -50,6 +50,11 @@
5050
<artifactId>assertj-core</artifactId>
5151
<scope>test</scope>
5252
</dependency>
53+
<dependency>
54+
<groupId>org.awaitility</groupId>
55+
<artifactId>awaitility</artifactId>
56+
<scope>test</scope>
57+
</dependency>
5358
<dependency>
5459
<groupId>ch.qos.logback</groupId>
5560
<artifactId>logback-classic</artifactId>
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverless.workflow.impl.executors.func;
17+
18+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume;
19+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consumed;
20+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emitJson;
21+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function;
22+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen;
23+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse;
24+
import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.to;
25+
import static org.awaitility.Awaitility.await;
26+
27+
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import io.cloudevents.CloudEvent;
29+
import io.cloudevents.CloudEventData;
30+
import io.cloudevents.core.builder.CloudEventBuilder;
31+
import io.serverlessworkflow.api.types.Workflow;
32+
import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder;
33+
import io.serverlessworkflow.impl.WorkflowApplication;
34+
import io.serverlessworkflow.impl.WorkflowDefinition;
35+
import io.serverlessworkflow.impl.WorkflowInstance;
36+
import io.serverlessworkflow.impl.WorkflowModel;
37+
import io.serverlessworkflow.impl.WorkflowStatus;
38+
import io.serverlessworkflow.impl.events.EventPublisher;
39+
import java.net.URI;
40+
import java.nio.charset.StandardCharsets;
41+
import java.time.Duration;
42+
import java.util.List;
43+
import java.util.concurrent.CompletableFuture;
44+
import org.junit.jupiter.api.Test;
45+
46+
public class EventFilteringTest {
47+
48+
private static final ObjectMapper MAPPER = new ObjectMapper();
49+
50+
// --- Mock Service Methods (replacing Quarkus Agents) ---
51+
public NewsletterDraft writeDraft(NewsletterRequest req) {
52+
return new NewsletterDraft("Draft: " + req.topic(), "Initial body...");
53+
}
54+
55+
public NewsletterDraft editDraft(HumanReview review) {
56+
return new NewsletterDraft("Edited Draft", "Fixed based on: " + review.notes());
57+
}
58+
59+
public void sendEmail(NewsletterDraft draft) {
60+
// Simulates MailService.send
61+
}
62+
63+
@Test
64+
public void testIntelligentNewsletterApprovalPath() throws Exception {
65+
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
66+
67+
Workflow workflow =
68+
FuncWorkflowBuilder.workflow("intelligent-newsletter")
69+
.tasks(
70+
function("draftAgent", this::writeDraft).exportAsTaskOutput(),
71+
emitJson("draftReady", "org.acme.email.review.required", NewsletterDraft.class),
72+
listen(
73+
"waitHumanReview",
74+
to().one(
75+
consumed("org.acme.newsletter.review.done")
76+
.extensionByInstanceId("instanceid")))
77+
.outputAs(
78+
(List<CloudEventData> events) -> {
79+
try {
80+
return MAPPER.readValue(events.get(0).toBytes(), HumanReview.class);
81+
} catch (Exception e) {
82+
throw new RuntimeException("Failed to deserialize HumanReview", e);
83+
}
84+
}),
85+
switchWhenOrElse(
86+
h -> HumanReview.NEEDS_REVISION.equals(h.status()),
87+
"humanEditorAgent",
88+
"sendNewsletter",
89+
HumanReview.class),
90+
function("humanEditorAgent", this::editDraft)
91+
.exportAsTaskOutput()
92+
.then("draftReady"),
93+
consume("sendNewsletter", this::sendEmail)
94+
.inputFrom(
95+
(payload, wfc, tfc) ->
96+
payload instanceof HumanReview ? wfc.context() : payload))
97+
.build();
98+
99+
WorkflowDefinition definition = app.workflowDefinition(workflow);
100+
WorkflowInstance instance = definition.instance(new NewsletterRequest("Tech Stocks"));
101+
CompletableFuture<WorkflowModel> future = instance.start();
102+
103+
// Wait for it to hit the listen state
104+
await()
105+
.atMost(Duration.ofSeconds(5))
106+
.until(() -> instance.status() == WorkflowStatus.WAITING);
107+
108+
EventPublisher publisher = app.eventPublishers().iterator().next();
109+
110+
// --- THE NEGATIVE TEST: Fire an event with the WRONG instance id ---
111+
CloudEvent maliciousEvent =
112+
CloudEventBuilder.v1()
113+
.withId("event-wrong")
114+
.withSource(URI.create("test:/human-editor"))
115+
.withType("org.acme.newsletter.review.done")
116+
.withExtension("instanceid", "SOME-OTHER-ID-12345") // Does not match instance.id()
117+
.withData(
118+
"application/json",
119+
"{\"status\":\"APPROVED\", \"notes\":\"Malicious approval\"}"
120+
.getBytes(StandardCharsets.UTF_8))
121+
.build();
122+
123+
publisher.publish(maliciousEvent).toCompletableFuture().join();
124+
125+
await()
126+
.pollDelay(Duration.ofMillis(250))
127+
.atMost(Duration.ofMillis(500))
128+
.until(() -> instance.status() == WorkflowStatus.WAITING && !future.isDone());
129+
130+
// --- THE POSITIVE TEST: Fire the CORRECT event ---
131+
CloudEvent humanReviewEvent =
132+
CloudEventBuilder.v1()
133+
.withId("event-123")
134+
.withSource(URI.create("test:/human-editor"))
135+
.withType("org.acme.newsletter.review.done")
136+
.withExtension("instanceid", instance.id()) // Matches exactly
137+
.withData(
138+
"application/json",
139+
"{\"status\":\"APPROVED\", \"notes\":\"Looks good\"}"
140+
.getBytes(StandardCharsets.UTF_8))
141+
.build();
142+
143+
publisher.publish(humanReviewEvent).toCompletableFuture().join();
144+
145+
// Assert successful completion
146+
await()
147+
.atMost(Duration.ofSeconds(5))
148+
.until(() -> instance.status() == WorkflowStatus.COMPLETED);
149+
}
150+
}
151+
152+
// --- Mock Domain Models ---
153+
public record NewsletterRequest(String topic) {}
154+
155+
public record NewsletterDraft(String title, String body) {}
156+
157+
public record HumanReview(String status, String notes) {
158+
public static final String NEEDS_REVISION = "NEEDS_REVISION";
159+
public static final String APPROVED = "APPROVED";
160+
}
161+
}

experimental/model/src/main/java/io/serverlessworkflow/impl/model/func/JavaModelCollection.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
*/
1616
package io.serverlessworkflow.impl.model.func;
1717

18+
import io.serverlessworkflow.impl.CollectionConversionUtils;
1819
import io.serverlessworkflow.impl.WorkflowModel;
1920
import io.serverlessworkflow.impl.WorkflowModelCollection;
2021
import java.util.ArrayList;
2122
import java.util.Collection;
2223
import java.util.Iterator;
24+
import java.util.List;
2325
import java.util.Optional;
2426

2527
public class JavaModelCollection implements Collection<WorkflowModel>, WorkflowModelCollection {
@@ -78,14 +80,22 @@ public Iterator<WorkflowModel> iterator() {
7880
return new ModelIterator(object.iterator());
7981
}
8082

83+
private List<WorkflowModel> toModelList() {
84+
List<WorkflowModel> models = new ArrayList<>(object.size());
85+
for (Object obj : object)
86+
models.add(obj instanceof WorkflowModel value ? value : nextItem(obj));
87+
88+
return models;
89+
}
90+
8191
@Override
8292
public Object[] toArray() {
83-
throw new UnsupportedOperationException("toArray is not supported yet");
93+
return toModelList().toArray();
8494
}
8595

8696
@Override
8797
public <T> T[] toArray(T[] a) {
88-
throw new UnsupportedOperationException("toArray is not supported yet");
98+
return toModelList().toArray(a);
8999
}
90100

91101
@Override
@@ -139,8 +149,11 @@ public Class<?> objectClass() {
139149

140150
@Override
141151
public <T> Optional<T> as(Class<T> clazz) {
142-
return object.getClass().isAssignableFrom(clazz)
143-
? Optional.of(clazz.cast(object))
144-
: Optional.empty();
152+
if (object == null) return Optional.empty();
153+
154+
if (clazz.isInstance(this)) return Optional.of(clazz.cast(this));
155+
if (clazz.isInstance(object)) return Optional.of(clazz.cast(object));
156+
157+
return CollectionConversionUtils.as(object, clazz);
145158
}
146159
}

experimental/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<artifactId>serverlessworkflow-impl-core</artifactId>
1616
<version>${project.version}</version>
1717
</dependency>
18-
<dependency>
18+
<dependency>
1919
<groupId>io.serverlessworkflow</groupId>
2020
<artifactId>serverlessworkflow-impl-jq</artifactId>
2121
<version>${project.version}</version>

experimental/test/pom.xml

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,54 @@
1-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2-
<modelVersion>4.0.0</modelVersion>
3-
<parent>
4-
<groupId>io.serverlessworkflow</groupId>
5-
<artifactId>serverlessworkflow-experimental</artifactId>
6-
<version>8.0.0-SNAPSHOT</version>
7-
</parent>
8-
<artifactId>serverlessworkflow-experimental-test</artifactId>
9-
<name>Serverless Workflow :: Experimental :: Test</name>
10-
<dependencies>
11-
<dependency>
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<parent>
5+
<groupId>io.serverlessworkflow</groupId>
6+
<artifactId>serverlessworkflow-experimental</artifactId>
7+
<version>8.0.0-SNAPSHOT</version>
8+
</parent>
9+
<artifactId>serverlessworkflow-experimental-test</artifactId>
10+
<name>Serverless Workflow :: Experimental :: Test</name>
11+
<dependencies>
12+
<dependency>
1213
<groupId>org.junit.jupiter</groupId>
1314
<artifactId>junit-jupiter-engine</artifactId>
1415
<scope>test</scope>
1516
</dependency>
1617
<dependency>
1718
<groupId>org.mockito</groupId>
1819
<artifactId>mockito-core</artifactId>
19-
<scope>test</scope>
20+
<scope>test</scope>
2021
</dependency>
21-
<dependency>
22+
<dependency>
2223
<groupId>org.assertj</groupId>
2324
<artifactId>assertj-core</artifactId>
24-
<scope>test</scope>
25+
<scope>test</scope>
2526
</dependency>
2627
<dependency>
27-
<groupId>io.serverlessworkflow</groupId>
28-
<artifactId>serverlessworkflow-experimental-fluent-func</artifactId>
29-
<scope>test</scope>
28+
<groupId>org.awaitility</groupId>
29+
<artifactId>awaitility</artifactId>
30+
<scope>test</scope>
3031
</dependency>
31-
<dependency>
32-
<groupId>io.serverlessworkflow</groupId>
33-
<artifactId>serverlessworkflow-experimental-lambda</artifactId>
34-
<scope>test</scope>
32+
<dependency>
33+
<groupId>ch.qos.logback</groupId>
34+
<artifactId>logback-classic</artifactId>
35+
<scope>test</scope>
3536
</dependency>
36-
<dependency>
37-
<groupId>io.serverlessworkflow</groupId>
38-
<artifactId>serverlessworkflow-impl-model</artifactId>
39-
<version>${project.version}</version>
40-
<scope>test</scope>
37+
<dependency>
38+
<groupId>io.serverlessworkflow</groupId>
39+
<artifactId>serverlessworkflow-experimental-fluent-func</artifactId>
40+
<scope>test</scope>
41+
</dependency>
42+
<dependency>
43+
<groupId>io.serverlessworkflow</groupId>
44+
<artifactId>serverlessworkflow-experimental-lambda</artifactId>
45+
<scope>test</scope>
46+
</dependency>
47+
<dependency>
48+
<groupId>io.serverlessworkflow</groupId>
49+
<artifactId>serverlessworkflow-impl-model</artifactId>
50+
<version>${project.version}</version>
51+
<scope>test</scope>
4152
</dependency>
42-
</dependencies>
53+
</dependencies>
4354
</project>

0 commit comments

Comments
 (0)