From 405ec89ac603151970e1dddcb52a84b830cee795 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Thu, 19 Mar 2026 14:47:18 -0700 Subject: [PATCH] Remove duplicate complete furture for concurrency op --- .../operation/ConcurrencyOperation.java | 4 +- .../durable/operation/ParallelOperation.java | 1 + .../operation/ConcurrencyOperationTest.java | 10 +++++ .../operation/ParallelOperationTest.java | 39 +++++++++++++------ 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java index b0127450..f9b1b720 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java @@ -248,6 +248,7 @@ protected boolean canComplete() { } private void handleComplete() { + // We do not complete the futrure here, the furture is completed via checkpoint if (isOperationCompleted()) { return; } @@ -256,9 +257,6 @@ private void handleComplete() { } else { handleFailure(completionStatus); } - synchronized (completionFuture) { - completionFuture.complete(null); - } } /** diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java index a0c08b65..64e93779 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java @@ -84,6 +84,7 @@ protected ChildContextOperation createItem( protected void handleSuccess() { if (skipCheckpoint) { // Do not send checkpoint during replay + markAlreadyCompleted(); return; } sendOperationUpdate(OperationUpdate.builder() diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java index 6ba8bcd7..f0aa4c23 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java @@ -259,11 +259,21 @@ public void execute() { @Override protected void handleSuccess() { successHandled = true; + // Simulate the checkpoint ACK that a real subclass would receive after sendOperationUpdate. + // This drives completionFuture to completion so waitForOperationCompletion() unblocks. + onCheckpointComplete(Operation.builder() + .id(getOperationId()) + .status(OperationStatus.SUCCEEDED) + .build()); } @Override protected void handleFailure(ConcurrencyCompletionStatus completionStatus) { failureHandled = true; + onCheckpointComplete(Operation.builder() + .id(getOperationId()) + .status(OperationStatus.SUCCEEDED) // always success for parallel + .build()); } @Override diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index 86d5784f..5b335065 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -10,6 +10,7 @@ import java.lang.reflect.Field; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; @@ -69,9 +70,19 @@ void setUp() { when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet()); when(executionManager.getOperationAndUpdateReplayState(anyString())).thenReturn(null); - // Simulate the real backend: when a SUCCEED checkpoint is sent for the parallel op, - // make getOperationAndUpdateReplayState return a SUCCEEDED operation so waitForOperationCompletion() can find - // it. + // Capture registered operations so we can drive onCheckpointComplete callbacks. + var registeredOps = new ConcurrentHashMap>(); + doAnswer(inv -> { + BaseDurableOperation op = inv.getArgument(0); + registeredOps.put(op.getOperationId(), op); + return null; + }) + .when(executionManager) + .registerOperation(any()); + + // Simulate the real backend for all sendOperationUpdate calls: + // - For SUCCEED on the parallel op: update the stub and fire onCheckpointComplete to unblock join(). + // - For everything else (START, child checkpoints): just return a completed future. var succeededParallelOp = Operation.builder() .id(OPERATION_ID) .name("test-parallel") @@ -79,15 +90,21 @@ void setUp() { .subType(OperationSubType.PARALLEL.getValue()) .status(OperationStatus.SUCCEEDED) .build(); - when(executionManager.sendOperationUpdate(argThat(u -> u != null - && u.id() != null - && u.id().equals(OPERATION_ID) - && u.action() == OperationAction.SUCCEED))) - .thenAnswer(inv -> { - when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)) - .thenReturn(succeededParallelOp); + doAnswer(inv -> { + var update = (software.amazon.awssdk.services.lambda.model.OperationUpdate) inv.getArgument(0); + + if (OPERATION_ID.equals(update.id()) && update.action() == OperationAction.SUCCEED) { + when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)) + .thenReturn(succeededParallelOp); + var op = registeredOps.get(OPERATION_ID); + if (op != null) { + op.onCheckpointComplete(succeededParallelOp); + } + } return CompletableFuture.completedFuture(null); - }); + }) + .when(executionManager) + .sendOperationUpdate(any()); } private ParallelOperation createOperation(int maxConcurrency, int minSuccessful, int toleratedFailureCount) {