Skip to content

Commit cfb07fb

Browse files
authored
[Fix #1415] Fixing nested try behaviour (#1416)
Signed-off-by: fjtirado <ftirados@ibm.com>
1 parent 90be8a0 commit cfb07fb

9 files changed

Lines changed: 164 additions & 30 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class TaskContext implements TaskContextData {
4040
private short retryAttempt;
4141
private int iteration;
4242
private AuthorizationDescriptor authorization;
43+
private Optional<Short> tryRetryCount = Optional.empty();
4344

4445
public TaskContext(
4546
WorkflowModel input,
@@ -69,7 +70,8 @@ private TaskContext(
6970
this.input = input;
7071
this.output = output;
7172
this.rawOutput = rawOutput;
72-
this.retryAttempt = parentContext.map(TaskContext::retryAttempt).orElse((short) 0);
73+
this.retryAttempt =
74+
parentContext.map(ctx -> ctx.tryRetryCount.orElse(ctx.retryAttempt())).orElse((short) 0);
7375
this.contextVariables =
7476
parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new);
7577
}
@@ -179,6 +181,14 @@ public void retryAttempt(short retryAttempt) {
179181
this.retryAttempt = retryAttempt;
180182
}
181183

184+
public void tryRetryCount(short tryRetryCount) {
185+
this.tryRetryCount = Optional.of(tryRetryCount);
186+
}
187+
188+
public Optional<Short> tryRetryCount() {
189+
return tryRetryCount;
190+
}
191+
182192
public boolean isRetrying() {
183193
return retryAttempt > 0;
184194
}
@@ -194,16 +204,19 @@ public void iteration(int iteration) {
194204

195205
@Override
196206
public String toString() {
197-
return "TaskContext [position="
198-
+ position
199-
+ ", startedAt="
200-
+ startedAt
201-
+ ", taskName="
202-
+ taskName
203-
+ ", completedAt="
204-
+ completedAt
205-
+ ", retryAttempt="
206-
+ retryAttempt
207-
+ "]";
207+
StringBuilder sb =
208+
new StringBuilder(
209+
"TaskContext [position="
210+
+ position
211+
+ ", startedAt="
212+
+ startedAt
213+
+ ", taskName="
214+
+ taskName
215+
+ ", completedAt="
216+
+ completedAt
217+
+ ", retryAttempt="
218+
+ retryAttempt);
219+
tryRetryCount.ifPresent(s -> sb.append(", tryRetryCount=").append(s));
220+
return sb.append("]").toString();
208221
}
209222
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ protected CompletableFuture<WorkflowModel> internalExecute(
157157

158158
private CompletableFuture<WorkflowModel> doIt(
159159
WorkflowContext workflow, TaskContext taskContext, WorkflowModel model) {
160+
retryIntervalExecutor.ifPresent(r -> r.init(workflow, taskContext, model));
160161
return TaskExecutorHelper.processTaskList(
161162
taskExecutor, workflow, Optional.of(taskContext), model)
162163
.exceptionallyCompose(e -> handleException(e, workflow, taskContext));

impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/DefaultRetryExecutor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,23 @@ public DefaultRetryExecutor(
4646
@Override
4747
public Optional<CompletableFuture<WorkflowModel>> retry(
4848
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
49-
short numAttempts = taskContext.retryAttempt();
49+
short numAttempts = taskContext.tryRetryCount().orElseThrow();
5050
if (numAttempts++ < maxAttempts
5151
&& WorkflowUtils.whenExceptTest(
5252
whenFilter, exceptFilter, workflowContext, taskContext, model)) {
53-
taskContext.retryAttempt(numAttempts);
53+
taskContext.tryRetryCount(numAttempts);
5454
Duration delay = intervalFunction.apply(workflowContext, taskContext, model, numAttempts);
5555
CompletableFuture<WorkflowModel> completable = new CompletableFuture<>();
5656
completable.completeOnTimeout(model, delay.toMillis(), TimeUnit.MILLISECONDS);
5757
return Optional.of(completable);
5858
}
5959
return Optional.empty();
6060
}
61+
62+
@Override
63+
public void init(WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
64+
if (taskContext.tryRetryCount().isEmpty()) {
65+
taskContext.tryRetryCount((short) 0);
66+
}
67+
}
6168
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/retry/RetryExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import java.util.concurrent.CompletableFuture;
2323

2424
public interface RetryExecutor {
25+
26+
void init(WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model);
27+
2528
Optional<CompletableFuture<WorkflowModel>> retry(
2629
WorkflowContext worfklowContext, TaskContext taskContext, WorkflowModel model);
2730
}

impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,11 @@ public void onTaskFailed(TaskFailedEvent ev) {
100100
@Override
101101
public void onTaskRetried(TaskRetriedEvent ev) {
102102
logger.info(
103-
"Task {} retried at {}, position {}",
103+
"Task {} retried at {}, position {}, retried attempt {}",
104104
ev.taskContext().taskName(),
105105
ev.eventDate(),
106-
ev.taskContext().position());
106+
ev.taskContext().position(),
107+
ev.taskContext().retryAttempt());
107108
}
108109

109110
@Override

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/WorkflowPersistenceInstance.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.serverlessworkflow.impl.persistence;
1717

18+
import io.serverlessworkflow.api.types.TryTask;
1819
import io.serverlessworkflow.impl.TaskContext;
1920
import io.serverlessworkflow.impl.WorkflowContext;
2021
import io.serverlessworkflow.impl.WorkflowDefinition;
@@ -23,6 +24,7 @@
2324
import io.serverlessworkflow.impl.WorkflowMutableInstance;
2425
import io.serverlessworkflow.impl.WorkflowStatus;
2526
import io.serverlessworkflow.impl.executors.TransitionInfo;
27+
import java.util.Optional;
2628
import java.util.concurrent.CompletableFuture;
2729

2830
public class WorkflowPersistenceInstance extends WorkflowMutableInstance {
@@ -82,6 +84,17 @@ public void restoreContext(WorkflowContext workflow, TaskContext context) {
8284
if (context.retryAttempt() == 0) {
8385
context.retryAttempt(retriedTaskInfo.retryAttempt());
8486
}
87+
Optional<TaskContext> searchContext = context.parent();
88+
while (searchContext.isPresent()) {
89+
TaskContext tryContext = searchContext.orElseThrow();
90+
if (tryContext.task() instanceof TryTask) {
91+
if (tryContext.tryRetryCount().isEmpty()) {
92+
tryContext.tryRetryCount(retriedTaskInfo.retryAttempt());
93+
}
94+
break;
95+
}
96+
searchContext = tryContext.parent();
97+
}
8598
}
8699
}
87100
}

impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractHandlerPersistenceTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.mockito.Mockito.verify;
2222
import static org.mockito.Mockito.when;
2323

24+
import io.serverlessworkflow.api.types.TaskBase;
25+
import io.serverlessworkflow.api.types.TryTask;
2426
import io.serverlessworkflow.impl.TaskContext;
2527
import io.serverlessworkflow.impl.TaskContextData;
2628
import io.serverlessworkflow.impl.WorkflowApplication;
@@ -140,10 +142,16 @@ void testWorkflowInstance() throws InterruptedException {
140142
WorkflowContext updateWContext = mock(WorkflowContext.class);
141143
TaskContext updateTContext = mock(TaskContext.class);
142144
when(updateTContext.position()).thenReturn(position1);
145+
TaskContext parentContext = mock(TaskContext.class);
146+
TaskBase taskBase = mock(TryTask.class);
147+
when(parentContext.task()).thenReturn(taskBase);
148+
when(updateTContext.parent()).thenReturn(Optional.of(parentContext));
143149
instance.restoreContext(updateWContext, updateTContext);
144150
ArgumentCaptor<Short> retryAttempt = ArgumentCaptor.forClass(Short.class);
145151
verify(updateTContext).retryAttempt(retryAttempt.capture());
146152
assertThat(retryAttempt.getValue()).isEqualTo(numRetries);
153+
verify(parentContext).tryRetryCount(retryAttempt.capture());
154+
assertThat(retryAttempt.getValue()).isEqualTo(numRetries);
147155

148156
// task completed
149157
handlers

impl/test/src/test/java/io/serverlessworkflow/impl/test/RetryTimeoutTest.java

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,70 @@
1818
import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21-
import static org.junit.Assert.assertThat;
2221

2322
import com.fasterxml.jackson.databind.JsonNode;
23+
import io.serverlessworkflow.api.types.TryTask;
2424
import io.serverlessworkflow.impl.WorkflowApplication;
2525
import io.serverlessworkflow.impl.WorkflowException;
2626
import io.serverlessworkflow.impl.WorkflowModel;
2727
import io.serverlessworkflow.impl.jackson.JsonUtils;
28+
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
29+
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
30+
import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener;
31+
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
2832
import java.io.IOException;
2933
import java.time.Duration;
3034
import java.util.Map;
35+
import java.util.Set;
3136
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.ConcurrentHashMap;
3238
import okhttp3.mockwebserver.MockResponse;
3339
import okhttp3.mockwebserver.MockWebServer;
3440
import org.awaitility.Awaitility;
35-
import org.junit.jupiter.api.AfterAll;
3641
import org.junit.jupiter.api.AfterEach;
37-
import org.junit.jupiter.api.BeforeAll;
3842
import org.junit.jupiter.api.BeforeEach;
3943
import org.junit.jupiter.api.Test;
4044
import org.junit.jupiter.params.ParameterizedTest;
4145
import org.junit.jupiter.params.provider.ValueSource;
4246

4347
public class RetryTimeoutTest {
4448

45-
private static WorkflowApplication app;
49+
private WorkflowApplication app;
50+
private RetryListener retryListener;
4651
private MockWebServer apiServer;
4752

48-
@BeforeAll
49-
static void init() {
50-
app = WorkflowApplication.builder().build();
51-
}
52-
53-
@AfterAll
54-
static void cleanup() {
55-
app.close();
56-
}
57-
5853
@BeforeEach
5954
void setUp() throws IOException {
6055
apiServer = new MockWebServer();
6156
apiServer.start(9797);
57+
retryListener = new RetryListener();
58+
app =
59+
WorkflowApplication.builder()
60+
.withListener(retryListener)
61+
.withListener(new TraceExecutionListener())
62+
.build();
6263
}
6364

6465
@AfterEach
6566
void tearDown() throws IOException {
6667
apiServer.shutdown();
68+
app.close();
69+
}
70+
71+
private class RetryListener implements WorkflowExecutionListener {
72+
73+
private Map<String, Short> taskRetried = new ConcurrentHashMap<>();
74+
private Set<Short> contexts = ConcurrentHashMap.newKeySet();
75+
76+
public void onTaskRetried(TaskRetriedEvent ev) {
77+
taskRetried.put(ev.taskContext().position().jsonPointer(), ev.taskContext().retryAttempt());
78+
}
79+
80+
public void onTaskCompleted(TaskCompletedEvent ev) {
81+
if (ev.taskContext().task() instanceof TryTask) {
82+
contexts.add(ev.taskContext().retryAttempt());
83+
}
84+
}
6785
}
6886

6987
@ParameterizedTest
@@ -88,6 +106,37 @@ void testRetry(String path) throws IOException {
88106
Awaitility.await()
89107
.atMost(Duration.ofSeconds(1))
90108
.until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result));
109+
assertThat(retryListener.taskRetried).hasSize(1);
110+
assertThat(retryListener.taskRetried.get("do/0/tryGetPet/do/0/getPet")).isEqualTo((short) 2);
111+
assertThat(retryListener.contexts).containsOnly((short) 0);
112+
}
113+
114+
@Test
115+
void testNestedRetry() throws IOException {
116+
final JsonNode result = JsonUtils.mapper().createObjectNode().put("name", "Javierito");
117+
apiServer.enqueue(new MockResponse().setResponseCode(404));
118+
apiServer.enqueue(new MockResponse().setResponseCode(404));
119+
apiServer.enqueue(new MockResponse().setResponseCode(404));
120+
apiServer.enqueue(new MockResponse().setResponseCode(404));
121+
apiServer.enqueue(new MockResponse().setResponseCode(404));
122+
apiServer.enqueue(new MockResponse().setResponseCode(404));
123+
apiServer.enqueue(new MockResponse().setResponseCode(500));
124+
apiServer.enqueue(
125+
new MockResponse()
126+
.setResponseCode(200)
127+
.setHeader("Content-Type", "application/json")
128+
.setBody(JsonUtils.mapper().writeValueAsString(result)));
129+
CompletableFuture<WorkflowModel> future =
130+
app.workflowDefinition(
131+
readWorkflowFromClasspath("workflows-samples/nested-try-catch-retry-inline.yaml"))
132+
.instance(Map.of("delay", 0.01))
133+
.start();
134+
Awaitility.await()
135+
.atMost(Duration.ofSeconds(1))
136+
.until(() -> future.join().as(JsonNode.class).orElseThrow().equals(result));
137+
assertThat(retryListener.taskRetried).hasSize(2);
138+
assertThat(retryListener.taskRetried.values()).containsExactlyInAnyOrder((short) 5, (short) 2);
139+
assertThat(retryListener.contexts).containsExactlyInAnyOrder((short) 0, (short) 2);
91140
}
92141

93142
@Test
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
document:
2+
dsl: '1.0.0'
3+
namespace: test
4+
name: nested-try-catch-retry-inline
5+
version: '0.1.0'
6+
do:
7+
- tryServerError:
8+
try:
9+
- tryCommunication:
10+
try:
11+
- getPet:
12+
call: http
13+
with:
14+
method: get
15+
endpoint: http://localhost:9797
16+
redirect: true
17+
catch:
18+
errors:
19+
with:
20+
type: https://serverlessworkflow.io/spec/1.0.0/errors/communication
21+
status: 404
22+
retry:
23+
delay: ${"PT\(.delay)S"}
24+
backoff:
25+
exponential: {}
26+
limit:
27+
attempt:
28+
count: 5
29+
catch:
30+
errors:
31+
with:
32+
type: https://serverlessworkflow.io/spec/1.0.0/errors/communication
33+
retry:
34+
delay: ${"PT\(.delay)S"}
35+
backoff:
36+
exponential: {}
37+
limit:
38+
attempt:
39+
count: 2

0 commit comments

Comments
 (0)