Skip to content

Commit 0f073af

Browse files
wangyb-AAlex Wang
andauthored
Remove duplicate complete furture for concurrency op (#239)
Co-authored-by: Alex Wang <wangyb@amazon.com>
1 parent 84e9bdf commit 0f073af

4 files changed

Lines changed: 40 additions & 14 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ protected boolean canComplete() {
248248
}
249249

250250
private void handleComplete() {
251+
// We do not complete the futrure here, the furture is completed via checkpoint
251252
if (isOperationCompleted()) {
252253
return;
253254
}
@@ -256,9 +257,6 @@ private void handleComplete() {
256257
} else {
257258
handleFailure(completionStatus);
258259
}
259-
synchronized (completionFuture) {
260-
completionFuture.complete(null);
261-
}
262260
}
263261

264262
/**

sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ protected <R> ChildContextOperation<R> createItem(
8484
protected void handleSuccess() {
8585
if (skipCheckpoint) {
8686
// Do not send checkpoint during replay
87+
markAlreadyCompleted();
8788
return;
8889
}
8990
sendOperationUpdate(OperationUpdate.builder()

sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,21 @@ public void execute() {
259259
@Override
260260
protected void handleSuccess() {
261261
successHandled = true;
262+
// Simulate the checkpoint ACK that a real subclass would receive after sendOperationUpdate.
263+
// This drives completionFuture to completion so waitForOperationCompletion() unblocks.
264+
onCheckpointComplete(Operation.builder()
265+
.id(getOperationId())
266+
.status(OperationStatus.SUCCEEDED)
267+
.build());
262268
}
263269

264270
@Override
265271
protected void handleFailure(ConcurrencyCompletionStatus completionStatus) {
266272
failureHandled = true;
273+
onCheckpointComplete(Operation.builder()
274+
.id(getOperationId())
275+
.status(OperationStatus.SUCCEEDED) // always success for parallel
276+
.build());
267277
}
268278

269279
@Override

sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.lang.reflect.Field;
1212
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.ConcurrentHashMap;
1314
import java.util.concurrent.Executors;
1415
import java.util.concurrent.atomic.AtomicInteger;
1516
import org.junit.jupiter.api.BeforeEach;
@@ -69,25 +70,41 @@ void setUp() {
6970
when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet());
7071
when(executionManager.getOperationAndUpdateReplayState(anyString())).thenReturn(null);
7172

72-
// Simulate the real backend: when a SUCCEED checkpoint is sent for the parallel op,
73-
// make getOperationAndUpdateReplayState return a SUCCEEDED operation so waitForOperationCompletion() can find
74-
// it.
73+
// Capture registered operations so we can drive onCheckpointComplete callbacks.
74+
var registeredOps = new ConcurrentHashMap<String, BaseDurableOperation<?>>();
75+
doAnswer(inv -> {
76+
BaseDurableOperation<?> op = inv.getArgument(0);
77+
registeredOps.put(op.getOperationId(), op);
78+
return null;
79+
})
80+
.when(executionManager)
81+
.registerOperation(any());
82+
83+
// Simulate the real backend for all sendOperationUpdate calls:
84+
// - For SUCCEED on the parallel op: update the stub and fire onCheckpointComplete to unblock join().
85+
// - For everything else (START, child checkpoints): just return a completed future.
7586
var succeededParallelOp = Operation.builder()
7687
.id(OPERATION_ID)
7788
.name("test-parallel")
7889
.type(OperationType.CONTEXT)
7990
.subType(OperationSubType.PARALLEL.getValue())
8091
.status(OperationStatus.SUCCEEDED)
8192
.build();
82-
when(executionManager.sendOperationUpdate(argThat(u -> u != null
83-
&& u.id() != null
84-
&& u.id().equals(OPERATION_ID)
85-
&& u.action() == OperationAction.SUCCEED)))
86-
.thenAnswer(inv -> {
87-
when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID))
88-
.thenReturn(succeededParallelOp);
93+
doAnswer(inv -> {
94+
var update = (software.amazon.awssdk.services.lambda.model.OperationUpdate) inv.getArgument(0);
95+
96+
if (OPERATION_ID.equals(update.id()) && update.action() == OperationAction.SUCCEED) {
97+
when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID))
98+
.thenReturn(succeededParallelOp);
99+
var op = registeredOps.get(OPERATION_ID);
100+
if (op != null) {
101+
op.onCheckpointComplete(succeededParallelOp);
102+
}
103+
}
89104
return CompletableFuture.completedFuture(null);
90-
});
105+
})
106+
.when(executionManager)
107+
.sendOperationUpdate(any());
91108
}
92109

93110
private ParallelOperation<Void> createOperation(int maxConcurrency, int minSuccessful, int toleratedFailureCount) {

0 commit comments

Comments
 (0)