Skip to content

Commit a8dcc2a

Browse files
committed
[Fix #1406] Allow adding data into lifecycle CloudEvents
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 5c0cf06 commit a8dcc2a

26 files changed

Lines changed: 923 additions & 162 deletions

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
4040
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
4141
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListenerAdapter;
42+
import io.serverlessworkflow.impl.lifecycle.ce.DefaultLifeCycleCloudEventFactory;
43+
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowLifeCycleCloudEventFactory;
4244
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
4345
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
4446
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
@@ -96,6 +98,7 @@ public class WorkflowApplication implements AutoCloseable {
9698
private final Collection<CallableTaskProxyBuilder> callableProxyBuilders;
9799
private final CloudEventPredicateFactory cloudEventPredicateFactory;
98100
private final AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;
101+
private final WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory;
99102

100103
private WorkflowApplication(Builder builder) {
101104
this.taskFactory = builder.taskFactory;
@@ -126,6 +129,7 @@ private WorkflowApplication(Builder builder) {
126129
this.callableProxyBuilders = builder.callableProxyBuilders;
127130
this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory;
128131
this.allStrategyCorrelationInfoFactory = builder.allStrategyCorrelationInfoFactory;
132+
this.lifeCycleCloudEventFactory = builder.lifeCycleCloudEventFactory;
129133
}
130134

131135
public TaskExecutorFactory taskFactory() {
@@ -245,6 +249,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
245249
private URI defaultCatalogURI;
246250
private CloudEventPredicateFactory cloudEventPredicateFactory;
247251
private AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;
252+
private WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory;
248253

249254
private Builder() {
250255
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
@@ -383,6 +388,12 @@ public Builder withAllStrategyCorrelationInfoFactory(
383388
return this;
384389
}
385390

391+
public Builder withLifeCycleCloudEventFactory(
392+
WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory) {
393+
this.lifeCycleCloudEventFactory = lifeCycleCloudEventFactory;
394+
return this;
395+
}
396+
386397
public WorkflowApplication build() {
387398

388399
if (modelFactory == null) {
@@ -443,6 +454,9 @@ public WorkflowApplication build() {
443454
loadFirst(CloudEventPredicateFactory.class)
444455
.orElseGet(() -> new DefaultCloudEventPredicateFactory());
445456
}
457+
if (lifeCycleCloudEventFactory == null) {
458+
lifeCycleCloudEventFactory = new DefaultLifeCycleCloudEventFactory();
459+
}
446460
if (allStrategyCorrelationInfoFactory == null) {
447461
allStrategyCorrelationInfoFactory =
448462
definition -> InMemoryAllStrategyCorrelationInfo.instance();
@@ -579,4 +593,8 @@ public Collection<CallableTaskProxyBuilder> callableProxyBuilders() {
579593
public AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory() {
580594
return allStrategyCorrelationInfoFactory;
581595
}
596+
597+
public WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory() {
598+
return lifeCycleCloudEventFactory;
599+
}
582600
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java

Lines changed: 74 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_STARTED;
3030
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_STATUS_CHANGED;
3131
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_SUSPENDED;
32-
import static io.serverlessworkflow.impl.WorkflowError.error;
33-
import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref;
3432

3533
import io.cloudevents.CloudEvent;
3634
import io.cloudevents.CloudEventData;
@@ -82,202 +80,177 @@ public static Collection<String> getLifeCycleTypes() {
8280
WORKFLOW_STATUS_CHANGED);
8381
}
8482

83+
private WorkflowLifeCycleCloudEventFactory lifeCycleFactory(WorkflowEvent ev) {
84+
return ev.workflowContext().definition().application().lifeCycleCloudEventFactory();
85+
}
86+
8587
@Override
8688
public void onTaskStarted(TaskStartedEvent event) {
89+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
8790
publish(
8891
event,
8992
ev ->
90-
builder()
91-
.withData(
92-
cloudEventData(
93-
new TaskStartedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
94-
this::convert))
95-
.withType(TASK_STARTED)
96-
.build());
93+
factory.build(
94+
builder()
95+
.withData(cloudEventData(factory.build(event), this::convert))
96+
.withType(TASK_STARTED)));
9797
}
9898

9999
@Override
100100
public void onTaskRetried(TaskRetriedEvent event) {
101+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
101102
publish(
102103
event,
103104
ev ->
104-
builder()
105-
.withData(
106-
cloudEventData(
107-
new TaskRetriedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
108-
this::convert))
109-
.withType(TASK_STARTED)
110-
.build());
105+
factory.build(
106+
builder()
107+
.withData(cloudEventData(factory.build(event), this::convert))
108+
.withType(TASK_RETRIED)));
111109
}
112110

113111
@Override
114112
public void onTaskCompleted(TaskCompletedEvent event) {
113+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
115114
publish(
116115
event,
117116
ev ->
118-
builder()
119-
.withData(
120-
cloudEventData(
121-
new TaskCompletedCEData(
122-
id(ev), pos(ev), ref(ev), ev.eventDate(), output(ev)),
123-
this::convert))
124-
.withType(TASK_COMPLETED)
125-
.build());
117+
factory.build(
118+
builder()
119+
.withData(cloudEventData(factory.build(event), this::convert))
120+
.withType(TASK_COMPLETED)));
126121
}
127122

128123
@Override
129124
public void onTaskSuspended(TaskSuspendedEvent event) {
125+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
130126
publish(
131127
event,
132128
ev ->
133-
builder()
134-
.withData(
135-
cloudEventData(
136-
new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
137-
this::convert))
138-
.withType(TASK_SUSPENDED)
139-
.build());
129+
factory.build(
130+
builder()
131+
.withData(cloudEventData(factory.build(event), this::convert))
132+
.withType(TASK_SUSPENDED)));
140133
}
141134

142135
@Override
143136
public void onTaskResumed(TaskResumedEvent event) {
137+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
144138
publish(
145139
event,
146140
ev ->
147-
builder()
148-
.withData(
149-
cloudEventData(
150-
new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
151-
this::convert))
152-
.withType(TASK_RESUMED)
153-
.build());
141+
factory.build(
142+
builder()
143+
.withData(cloudEventData(factory.build(event), this::convert))
144+
.withType(TASK_RESUMED)));
154145
}
155146

156147
@Override
157148
public void onTaskCancelled(TaskCancelledEvent event) {
149+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
158150
publish(
159151
event,
160152
ev ->
161-
builder()
162-
.withData(
163-
cloudEventData(
164-
new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
165-
this::convert))
166-
.withType(TASK_CANCELLED)
167-
.build());
153+
factory.build(
154+
builder()
155+
.withData(cloudEventData(factory.build(event), this::convert))
156+
.withType(TASK_CANCELLED)));
168157
}
169158

170159
@Override
171160
public void onTaskFailed(TaskFailedEvent event) {
161+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
172162
publish(
173163
event,
174164
ev ->
175-
builder()
176-
.withData(
177-
cloudEventData(
178-
new TaskFailedCEData(id(ev), pos(ev), ref(ev), ev.eventDate(), error(ev)),
179-
this::convert))
180-
.withType(TASK_FAULTED)
181-
.build());
165+
factory.build(
166+
builder()
167+
.withData(cloudEventData(factory.build(event), this::convert))
168+
.withType(TASK_FAULTED)));
182169
}
183170

184171
@Override
185172
public void onWorkflowStarted(WorkflowStartedEvent event) {
173+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
186174
publish(
187175
event,
188176
ev ->
189-
builder()
190-
.withData(
191-
cloudEventData(
192-
new WorkflowStartedCEData(id(ev), ref(ev), ev.eventDate()), this::convert))
193-
.withType(WORKFLOW_STARTED)
194-
.build());
177+
factory.build(
178+
builder()
179+
.withData(cloudEventData(factory.build(event), this::convert))
180+
.withType(WORKFLOW_STARTED)));
195181
}
196182

197183
@Override
198184
public void onWorkflowSuspended(WorkflowSuspendedEvent event) {
185+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
199186
publish(
200187
event,
201188
ev ->
202-
builder()
203-
.withData(
204-
cloudEventData(
205-
new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()),
206-
this::convert))
207-
.withType(WORKFLOW_SUSPENDED)
208-
.build());
189+
factory.build(
190+
builder()
191+
.withData(cloudEventData(factory.build(event), this::convert))
192+
.withType(WORKFLOW_SUSPENDED)));
209193
}
210194

211195
@Override
212196
public void onWorkflowCancelled(WorkflowCancelledEvent event) {
197+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
213198
publish(
214199
event,
215200
ev ->
216-
builder()
217-
.withData(
218-
cloudEventData(
219-
new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()),
220-
this::convert))
221-
.withType(WORKFLOW_CANCELLED)
222-
.build());
201+
factory.build(
202+
builder()
203+
.withData(cloudEventData(factory.build(event), this::convert))
204+
.withType(WORKFLOW_CANCELLED)));
223205
}
224206

225207
@Override
226208
public void onWorkflowResumed(WorkflowResumedEvent event) {
209+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
227210
publish(
228211
event,
229212
ev ->
230-
builder()
231-
.withData(
232-
cloudEventData(
233-
new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert))
234-
.withType(WORKFLOW_RESUMED)
235-
.build());
213+
factory.build(
214+
builder()
215+
.withData(cloudEventData(factory.build(event), this::convert))
216+
.withType(WORKFLOW_RESUMED)));
236217
}
237218

238219
@Override
239220
public void onWorkflowCompleted(WorkflowCompletedEvent event) {
221+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
240222
publish(
241223
event,
242224
ev ->
243-
builder()
244-
.withData(
245-
cloudEventData(
246-
new WorkflowCompletedCEData(
247-
id(ev), ref(ev), ev.eventDate(), from(event.output())),
248-
this::convert))
249-
.withType(WORKFLOW_COMPLETED)
250-
.build());
225+
factory.build(
226+
builder()
227+
.withData(cloudEventData(factory.build(event), this::convert))
228+
.withType(WORKFLOW_COMPLETED)));
251229
}
252230

253231
@Override
254232
public void onWorkflowFailed(WorkflowFailedEvent event) {
233+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
255234
publish(
256235
event,
257236
ev ->
258-
builder()
259-
.withData(
260-
cloudEventData(
261-
new WorkflowFailedCEData(id(ev), ref(ev), ev.eventDate(), error(ev)),
262-
this::convert))
263-
.withType(WORKFLOW_FAULTED)
264-
.build());
237+
factory.build(
238+
builder()
239+
.withData(cloudEventData(factory.build(event), this::convert))
240+
.withType(WORKFLOW_FAULTED)));
265241
}
266242

267243
@Override
268244
public void onWorkflowStatusChanged(WorkflowStatusEvent event) {
269245
if (appl(event).isStatusChangePublishingEnabled()) {
246+
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
270247
publish(
271248
event,
272249
ev ->
273-
builder()
274-
.withData(
275-
cloudEventData(
276-
new WorkflowStatusCEDataEvent(
277-
id(ev), ref(ev), ev.eventDate(), ev.status().toString()),
278-
this::convert))
279-
.withType(WORKFLOW_STATUS_CHANGED)
280-
.build());
250+
factory.build(
251+
builder()
252+
.withData(cloudEventData(factory.build(event), this::convert))
253+
.withType(WORKFLOW_STATUS_CHANGED)));
281254
}
282255
}
283256

@@ -368,14 +341,6 @@ private static WorkflowApplication appl(WorkflowEvent ev) {
368341
return ev.workflowContext().definition().application();
369342
}
370343

371-
private static String id(WorkflowEvent ev) {
372-
return ev.workflowContext().instanceData().id();
373-
}
374-
375-
private static String pos(TaskEvent ev) {
376-
return ev.taskContext().position().jsonPointer();
377-
}
378-
379344
private static Object output(TaskEvent ev) {
380345
return from(ev.taskContext().output());
381346
}

0 commit comments

Comments
 (0)