Skip to content

Commit bb52a64

Browse files
fjtiradoCopilot
andauthored
[Fix #1283] Async Java Function Call executor (#1285)
[Fix #1283] Async Java Function Call executor Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 42e89a8 commit bb52a64

34 files changed

+533
-291
lines changed

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ public <T, V> FuncCallTaskBuilder function(Function<T, V> function) {
4646
}
4747

4848
public <T, V> FuncCallTaskBuilder function(Function<T, V> function, Class<T> argClass) {
49-
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass));
49+
return function(function, argClass, null);
50+
}
51+
52+
public <T, V> FuncCallTaskBuilder function(
53+
Function<T, V> function, Class<T> argClass, Class<V> returnClass) {
54+
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, returnClass));
5055
super.setTask(this.callTaskJava.getCallJava());
5156
return this;
5257
}
@@ -56,7 +61,12 @@ public <T, V> FuncCallTaskBuilder function(ContextFunction<T, V> function) {
5661
}
5762

5863
public <T, V> FuncCallTaskBuilder function(ContextFunction<T, V> function, Class<T> argClass) {
59-
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass));
64+
return function(function, argClass, null);
65+
}
66+
67+
public <T, V> FuncCallTaskBuilder function(
68+
ContextFunction<T, V> function, Class<T> argClass, Class<V> returnClass) {
69+
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, returnClass));
6070
super.setTask(this.callTaskJava.getCallJava());
6171
return this;
6272
}
@@ -66,7 +76,12 @@ public <T, V> FuncCallTaskBuilder function(FilterFunction<T, V> function) {
6676
}
6777

6878
public <T, V> FuncCallTaskBuilder function(FilterFunction<T, V> function, Class<T> argClass) {
69-
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass));
79+
return function(function, argClass, null);
80+
}
81+
82+
public <T, V> FuncCallTaskBuilder function(
83+
FilterFunction<T, V> function, Class<T> argClass, Class<V> outputClass) {
84+
this.callTaskJava = new CallTaskJava(CallJava.function(function, argClass, outputClass));
7085
super.setTask(this.callTaskJava.getCallJava());
7186
return this;
7287
}

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitEventPropertiesBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
package io.serverlessworkflow.fluent.func;
1717

1818
import io.cloudevents.CloudEventData;
19+
import io.serverlessworkflow.api.reflection.func.SerializableFunction;
1920
import io.serverlessworkflow.api.types.func.ContextFunction;
2021
import io.serverlessworkflow.api.types.func.EventDataFunction;
2122
import io.serverlessworkflow.api.types.func.FilterFunction;
22-
import io.serverlessworkflow.fluent.func.dsl.SerializableFunction;
2323
import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder;
2424
import java.util.function.Function;
2525

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,20 @@ public <T, V> FuncForkTaskBuilder branch(String name, Function<T, V> function) {
5656

5757
public <T, V> FuncForkTaskBuilder branch(
5858
String name, Function<T, V> function, Class<T> argParam) {
59+
return branch(name, function, argParam, null);
60+
}
61+
62+
public <T, V> FuncForkTaskBuilder branch(
63+
String name, Function<T, V> function, Class<T> argParam, Class<V> returnClass) {
5964
if (name == null || name.isBlank()) {
6065
name = "branch-" + this.items.size();
6166
}
6267
this.items.add(
6368
new TaskItem(
6469
name,
65-
new Task().withCallTask(new CallTaskJava(CallJava.function(function, argParam)))));
70+
new Task()
71+
.withCallTask(
72+
new CallTaskJava(CallJava.function(function, argParam, returnClass)))));
6673
return this;
6774
}
6875

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package io.serverlessworkflow.fluent.func.dsl;
1717

18+
import io.serverlessworkflow.api.reflection.func.ReflectionUtils;
19+
import io.serverlessworkflow.api.reflection.func.SerializableFunction;
20+
import io.serverlessworkflow.api.reflection.func.SerializablePredicate;
1821
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
1922
import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder;
2023
import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder;

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,59 +29,64 @@ public final class FuncCallStep<T, R> extends Step<FuncCallStep<T, R>, FuncCallT
2929
private final ContextFunction<T, R> ctxFn;
3030
private final FilterFunction<T, R> filterFn;
3131
private final Class<T> argClass;
32+
private final Class<R> returnClass;
3233

3334
/** Function<T,R> variant (unnamed). */
34-
FuncCallStep(Function<T, R> fn, Class<T> argClass) {
35-
this(null, fn, argClass);
35+
FuncCallStep(Function<T, R> fn, Class<T> argClass, Class<R> returnClass) {
36+
this(null, fn, argClass, returnClass);
3637
}
3738

3839
/** Function<T,R> variant (named). */
39-
FuncCallStep(String name, Function<T, R> fn, Class<T> argClass) {
40+
FuncCallStep(String name, Function<T, R> fn, Class<T> argClass, Class<R> returnClass) {
4041
this.name = name;
4142
this.fn = fn;
4243
this.ctxFn = null;
4344
this.filterFn = null;
4445
this.argClass = argClass;
46+
this.returnClass = returnClass;
4547
}
4648

4749
/** ContextFunction<T,R> variant (unnamed). */
48-
FuncCallStep(ContextFunction<T, R> ctxFn, Class<T> argClass) {
49-
this(null, ctxFn, argClass);
50+
FuncCallStep(ContextFunction<T, R> ctxFn, Class<T> argClass, Class<R> returnClass) {
51+
this(null, ctxFn, argClass, returnClass);
5052
}
5153

5254
/** ContextFunction<T,R> variant (named). */
53-
FuncCallStep(String name, ContextFunction<T, R> ctxFn, Class<T> argClass) {
55+
FuncCallStep(String name, ContextFunction<T, R> ctxFn, Class<T> argClass, Class<R> returnClass) {
5456
this.name = name;
5557
this.fn = null;
5658
this.ctxFn = ctxFn;
5759
this.filterFn = null;
5860
this.argClass = argClass;
61+
this.returnClass = returnClass;
5962
}
6063

6164
/** FilterFunction<T,R> variant (unnamed). */
62-
FuncCallStep(FilterFunction<T, R> filterFn, Class<T> argClass) {
63-
this(null, filterFn, argClass);
65+
FuncCallStep(FilterFunction<T, R> filterFn, Class<T> argClass, Class<R> returnClass) {
66+
this(null, filterFn, argClass, returnClass);
6467
}
6568

6669
/** FilterFunction<T,R> variant (named). */
67-
FuncCallStep(String name, FilterFunction<T, R> filterFn, Class<T> argClass) {
70+
FuncCallStep(
71+
String name, FilterFunction<T, R> filterFn, Class<T> argClass, Class<R> returnClass) {
6872
this.name = name;
6973
this.fn = null;
7074
this.ctxFn = null;
7175
this.filterFn = filterFn;
7276
this.argClass = argClass;
77+
this.returnClass = returnClass;
7378
}
7479

7580
@Override
7681
protected void configure(FuncTaskItemListBuilder list, Consumer<FuncCallTaskBuilder> post) {
7782
final Consumer<FuncCallTaskBuilder> apply =
7883
cb -> {
7984
if (ctxFn != null) {
80-
cb.function(ctxFn, argClass);
85+
cb.function(ctxFn, argClass, returnClass);
8186
} else if (filterFn != null) {
82-
cb.function(filterFn, argClass);
87+
cb.function(filterFn, argClass, returnClass);
8388
} else {
84-
cb.function(fn, argClass);
89+
cb.function(fn, argClass, returnClass);
8590
}
8691
post.accept(cb);
8792
};

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
package io.serverlessworkflow.fluent.func.dsl;
1717

1818
import io.cloudevents.CloudEventData;
19+
import io.serverlessworkflow.api.reflection.func.InstanceIdFunction;
20+
import io.serverlessworkflow.api.reflection.func.ReflectionUtils;
21+
import io.serverlessworkflow.api.reflection.func.SerializableConsumer;
22+
import io.serverlessworkflow.api.reflection.func.SerializableFunction;
23+
import io.serverlessworkflow.api.reflection.func.SerializablePredicate;
24+
import io.serverlessworkflow.api.reflection.func.UniqueIdBiFunction;
1925
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
2026
import io.serverlessworkflow.api.types.OAuth2AuthenticationData;
2127
import io.serverlessworkflow.api.types.func.ContextFunction;
@@ -103,7 +109,11 @@ public static <T, V> Consumer<FuncCallTaskBuilder> fn(
103109
* @return a consumer that configures a {@code FuncCallTaskBuilder}
104110
*/
105111
public static <T, V> Consumer<FuncCallTaskBuilder> fn(SerializableFunction<T, V> function) {
106-
return f -> f.function(function, ReflectionUtils.inferInputType(function));
112+
return f ->
113+
f.function(
114+
function,
115+
ReflectionUtils.inferInputType(function),
116+
ReflectionUtils.inferResultType(function));
107117
}
108118

109119
/**
@@ -348,11 +358,16 @@ public static <T, R> FuncCallStep<T, R> withContext(ContextFunction<T, R> fn) {
348358
*/
349359
public static <T, R> FuncCallStep<T, R> withContext(
350360
String name, ContextFunction<T, R> fn, Class<T> in) {
351-
return new FuncCallStep<>(name, fn, in);
361+
return withContext(name, fn, in, ReflectionUtils.inferResultType(fn));
362+
}
363+
364+
public static <T, R> FuncCallStep<T, R> withContext(
365+
String name, ContextFunction<T, R> fn, Class<T> in, Class<R> out) {
366+
return new FuncCallStep<>(name, fn, in, out);
352367
}
353368

354369
public static <T, R> FuncCallStep<T, R> withContext(String name, ContextFunction<T, R> fn) {
355-
return new FuncCallStep<>(name, fn, ReflectionUtils.inferInputType(fn));
370+
return withContext(name, fn, ReflectionUtils.inferInputType(fn));
356371
}
357372

358373
/**
@@ -384,7 +399,12 @@ public static <T, R> FuncCallStep<T, R> withFilter(FilterFunction<T, R> fn, Clas
384399
*/
385400
public static <T, R> FuncCallStep<T, R> withFilter(
386401
String name, FilterFunction<T, R> fn, Class<T> in) {
387-
return new FuncCallStep<>(name, fn, in);
402+
return withFilter(name, fn, in, ReflectionUtils.inferResultType(fn));
403+
}
404+
405+
public static <T, R> FuncCallStep<T, R> withFilter(
406+
String name, FilterFunction<T, R> fn, Class<T> in, Class<R> out) {
407+
return new FuncCallStep<>(name, fn, in, out);
388408
}
389409

390410
public static <T, R> FuncCallStep<T, R> withFilter(FilterFunction<T, R> fn) {
@@ -407,8 +427,13 @@ public static <T, R> FuncCallStep<T, R> withFilter(String name, FilterFunction<T
407427
*/
408428
public static <T, R> FuncCallStep<T, R> withInstanceId(
409429
String name, InstanceIdFunction<T, R> fn, Class<T> in) {
430+
return withInstanceId(name, fn, in, ReflectionUtils.inferResultType(fn));
431+
}
432+
433+
public static <T, R> FuncCallStep<T, R> withInstanceId(
434+
String name, InstanceIdFunction<T, R> fn, Class<T> in, Class<R> out) {
410435
ContextFunction<T, R> jcf = (payload, wctx) -> fn.apply(wctx.instanceData().id(), payload);
411-
return new FuncCallStep<>(name, jcf, in);
436+
return new FuncCallStep<>(name, jcf, in, out);
412437
}
413438

414439
/**
@@ -463,9 +488,14 @@ static String defaultUniqueId(WorkflowContextData wctx, TaskContextData tctx) {
463488
*/
464489
public static <T, R> FuncCallStep<T, R> withUniqueId(
465490
String name, UniqueIdBiFunction<T, R> fn, Class<T> in) {
491+
return withUniqueId(name, fn, in, ReflectionUtils.inferResultType(fn));
492+
}
493+
494+
public static <T, R> FuncCallStep<T, R> withUniqueId(
495+
String name, UniqueIdBiFunction<T, R> fn, Class<T> in, Class<R> out) {
466496
FilterFunction<T, R> jff =
467497
(payload, wctx, tctx) -> fn.apply(defaultUniqueId(wctx, tctx), payload);
468-
return new FuncCallStep<>(name, jff, in);
498+
return new FuncCallStep<>(name, jff, in, out);
469499
}
470500

471501
public static <T, R> FuncCallStep<T, R> withUniqueId(String name, UniqueIdBiFunction<T, R> fn) {
@@ -577,7 +607,23 @@ public static <T, R> FuncCallStep<T, R> agent(String name, UniqueIdBiFunction<T,
577607
* @return a call step which supports chaining (e.g., {@code .exportAs(...).when(...)})
578608
*/
579609
public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> inputClass) {
580-
return new FuncCallStep<>(fn, inputClass);
610+
return function(fn, inputClass, null);
611+
}
612+
613+
/**
614+
* Create a {@link FuncCallStep} that calls a simple Java {@link Function} with explicit input
615+
* type.
616+
*
617+
* @param fn the function to execute at runtime
618+
* @param inputClass expected input class for model conversion
619+
* @param outputClass expected outputClass class for model conversion
620+
* @param <T> input type
621+
* @param <R> result type
622+
* @return a call step which supports chaining (e.g., {@code .exportAs(...).when(...)})
623+
*/
624+
public static <T, R> FuncCallStep<T, R> function(
625+
Function<T, R> fn, Class<T> inputClass, Class<R> outputClass) {
626+
return new FuncCallStep<>(fn, inputClass, outputClass);
581627
}
582628

583629
/**
@@ -590,8 +636,8 @@ public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> inp
590636
* @return a call step
591637
*/
592638
public static <T, R> FuncCallStep<T, R> function(SerializableFunction<T, R> fn) {
593-
Class<T> inputClass = ReflectionUtils.inferInputType(fn);
594-
return new FuncCallStep<>(fn, inputClass);
639+
return new FuncCallStep<>(
640+
fn, ReflectionUtils.inferInputType(fn), ReflectionUtils.inferResultType(fn));
595641
}
596642

597643
/**
@@ -604,8 +650,8 @@ public static <T, R> FuncCallStep<T, R> function(SerializableFunction<T, R> fn)
604650
* @return a named call step
605651
*/
606652
public static <T, R> FuncCallStep<T, R> function(String name, SerializableFunction<T, R> fn) {
607-
Class<T> inputClass = ReflectionUtils.inferInputType(fn);
608-
return new FuncCallStep<>(name, fn, inputClass);
653+
return new FuncCallStep<>(
654+
name, fn, ReflectionUtils.inferInputType(fn), ReflectionUtils.inferResultType(fn));
609655
}
610656

611657
/**
@@ -620,7 +666,23 @@ public static <T, R> FuncCallStep<T, R> function(String name, SerializableFuncti
620666
*/
621667
public static <T, R> FuncCallStep<T, R> function(
622668
String name, Function<T, R> fn, Class<T> inputClass) {
623-
return new FuncCallStep<>(name, fn, inputClass);
669+
return new FuncCallStep<>(name, fn, inputClass, null);
670+
}
671+
672+
/**
673+
* Named variant of {@link #function(Function, Class)} with explicit input type.
674+
*
675+
* @param name task name
676+
* @param fn the function to execute
677+
* @param inputClass expected input class
678+
* @param outputClass expected output class
679+
* @param <T> input type
680+
* @param <R> output type
681+
* @return a named call step
682+
*/
683+
public static <T, R> FuncCallStep<T, R> function(
684+
String name, Function<T, R> fn, Class<T> inputClass, Class<R> outputClass) {
685+
return new FuncCallStep<>(name, fn, inputClass, outputClass);
624686
}
625687

626688
// ------------------ tasks ---------------- //

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.cloudevents.CloudEventData;
1919
import io.cloudevents.core.data.BytesCloudEventData;
2020
import io.cloudevents.core.data.PojoCloudEventData;
21+
import io.serverlessworkflow.api.reflection.func.ReflectionUtils;
22+
import io.serverlessworkflow.api.reflection.func.SerializableFunction;
2123
import io.serverlessworkflow.api.types.func.ContextFunction;
2224
import io.serverlessworkflow.api.types.func.EventDataFunction;
2325
import io.serverlessworkflow.fluent.func.FuncEmitEventPropertiesBuilder;

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.cloudevents.core.CloudEventUtils;
2222
import io.cloudevents.core.data.PojoCloudEventData;
2323
import io.cloudevents.jackson.PojoCloudEventDataMapper;
24+
import io.serverlessworkflow.api.reflection.func.SerializablePredicate;
2425
import io.serverlessworkflow.api.types.func.ContextPredicate;
2526
import io.serverlessworkflow.api.types.func.FilterPredicate;
2627
import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder;

experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package io.serverlessworkflow.fluent.func.dsl;
1717

18+
import io.serverlessworkflow.api.reflection.func.ReflectionUtils;
19+
import io.serverlessworkflow.api.reflection.func.SerializableFunction;
20+
import io.serverlessworkflow.api.reflection.func.SerializablePredicate;
1821
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
1922
import io.serverlessworkflow.api.types.func.ContextFunction;
2023
import io.serverlessworkflow.api.types.func.FilterFunction;

experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import static org.mockito.Mockito.mock;
2424
import static org.mockito.Mockito.when;
2525

26+
import io.serverlessworkflow.api.reflection.func.UniqueIdBiFunction;
2627
import io.serverlessworkflow.api.types.Task;
2728
import io.serverlessworkflow.api.types.TaskItem;
2829
import io.serverlessworkflow.api.types.Workflow;
2930
import io.serverlessworkflow.api.types.func.CallJava;
3031
import io.serverlessworkflow.api.types.func.FilterFunction;
31-
import io.serverlessworkflow.fluent.func.dsl.UniqueIdBiFunction;
3232
import io.serverlessworkflow.impl.TaskContextData;
3333
import io.serverlessworkflow.impl.WorkflowContextData;
3434
import io.serverlessworkflow.impl.WorkflowInstanceData;

0 commit comments

Comments
 (0)