From 599b252d17e287cf8df2bd231977c85e3eefa8ed Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Wed, 18 Mar 2026 17:54:34 -0700 Subject: [PATCH 1/3] Do not checkpoint when replay --- .../parallel/ParallelWithWaitExample.java | 2 + .../durable/operation/ParallelOperation.java | 11 ++- .../operation/ParallelOperationTest.java | 94 +++++++++++++++++++ 3 files changed, 105 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java index 82c5b5fb..a25464b4 100644 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/parallel/ParallelWithWaitExample.java @@ -62,6 +62,8 @@ public Output handleRequest(Input input, DurableContext context) { var deliveries = futures.stream().map(DurableFuture::get).toList(); logger.info("All {} notifications delivered", deliveries.size()); + // Test replay + context.wait("wait for finalization", Duration.ofSeconds(5)); return new Output(deliveries); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java index e06b1d12..439c3183 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java @@ -42,6 +42,8 @@ */ public class ParallelOperation extends ConcurrencyOperation { + private boolean replaying = false; + public ParallelOperation( OperationIdentifier operationIdentifier, TypeToken resultTypeToken, @@ -79,6 +81,10 @@ protected ChildContextOperation createItem( @Override protected void handleSuccess() { + if (replaying) { + // Do not send checkpoint during replay + return; + } sendOperationUpdate(OperationUpdate.builder() .action(OperationAction.SUCCEED) .subType(getSubType().getValue()) @@ -99,8 +105,9 @@ protected void start() { @Override protected void replay(Operation existing) { - // Always replay child branches for parallel - start(); + // No-op: child branches handle their own replay via ChildContextOperation.replay(). + // Set replaying=true so handleSuccess() skips re-checkpointing the already-completed parallel context. + replaying = true; } @Override diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index 62fccce7..e537a54e 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -199,6 +199,100 @@ void contextHierarchy_branchesUseParallelContextAsParent() throws Exception { assertNotNull(childOp); } + // ===== Replay ===== + + @Test + void replay_doesNotSendStartCheckpoint() throws Exception { + // Simulate the parallel operation already existing in the service (STARTED status) + when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)) + .thenReturn(Operation.builder() + .id(OPERATION_ID) + .name("test-parallel") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL.getValue()) + .status(OperationStatus.STARTED) + .build()); + // Both branches already succeeded + when(executionManager.getOperationAndUpdateReplayState("child-1")) + .thenReturn(Operation.builder() + .id("child-1") + .name("branch-1") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"r1\"").build()) + .build()); + when(executionManager.getOperationAndUpdateReplayState("child-2")) + .thenReturn(Operation.builder() + .id("child-2") + .name("branch-2") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"r2\"").build()) + .build()); + + var op = createOperation(-1, -1, 0); + setOperationIdGenerator(op, mockIdGenerator); + op.execute(); + op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES); + op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES); + + runJoin(op); + + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START)); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); + } + + @Test + void replay_doesNotSendSucceedCheckpointWhenParallelAlreadySucceeded() throws Exception { + when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)) + .thenReturn(Operation.builder() + .id(OPERATION_ID) + .name("test-parallel") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL.getValue()) + .status(OperationStatus.SUCCEEDED) + .build()); + when(executionManager.getOperationAndUpdateReplayState("child-1")) + .thenReturn(Operation.builder() + .id("child-1") + .name("branch-1") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"r1\"").build()) + .build()); + when(executionManager.getOperationAndUpdateReplayState("child-2")) + .thenReturn(Operation.builder() + .id("child-2") + .name("branch-2") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL_BRANCH.getValue()) + .status(OperationStatus.SUCCEEDED) + .contextDetails( + ContextDetails.builder().result("\"r2\"").build()) + .build()); + + var op = createOperation(-1, -1, 0); + setOperationIdGenerator(op, mockIdGenerator); + op.execute(); + op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES); + op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES); + + runJoin(op); + + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START)); + verify(executionManager, never()) + .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); + } + // ===== handleFailure still sends SUCCEED ===== @Test From c2743503bebe3fa7ee33d5945a37846260afb9e1 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Wed, 18 Mar 2026 18:48:50 -0700 Subject: [PATCH 2/3] Update checkpoint condition --- .../amazon/lambda/durable/operation/ParallelOperation.java | 7 ++++--- .../lambda/durable/operation/ParallelOperationTest.java | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java index 439c3183..a0c08b65 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java @@ -12,6 +12,7 @@ import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.exception.ConcurrencyExecutionException; +import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.model.OperationSubType; @@ -42,7 +43,7 @@ */ public class ParallelOperation extends ConcurrencyOperation { - private boolean replaying = false; + private boolean skipCheckpoint = false; public ParallelOperation( OperationIdentifier operationIdentifier, @@ -81,7 +82,7 @@ protected ChildContextOperation createItem( @Override protected void handleSuccess() { - if (replaying) { + if (skipCheckpoint) { // Do not send checkpoint during replay return; } @@ -107,7 +108,7 @@ protected void start() { protected void replay(Operation existing) { // No-op: child branches handle their own replay via ChildContextOperation.replay(). // Set replaying=true so handleSuccess() skips re-checkpointing the already-completed parallel context. - replaying = true; + skipCheckpoint = ExecutionManager.isTerminalStatus(existing.status()); } @Override diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index e537a54e..a2b07312 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -244,7 +244,7 @@ void replay_doesNotSendStartCheckpoint() throws Exception { verify(executionManager, never()) .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START)); - verify(executionManager, never()) + verify(executionManager, times(1)) .sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); } From 837c6743dc5431d6f4b2326c05bb0777d749e9cd Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Thu, 19 Mar 2026 13:19:56 -0700 Subject: [PATCH 3/3] Remove runJoin(), add mock backend to fix tests --- .../operation/ConcurrencyOperationTest.java | 26 ++++++----- .../operation/ParallelOperationTest.java | 43 +++++++++++-------- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java index bf6179c2..6ba8bcd7 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java @@ -3,10 +3,12 @@ package software.amazon.lambda.durable.operation; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -70,6 +72,18 @@ void setUp() { when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet()); // All child operations are NOT in replay when(executionManager.getOperationAndUpdateReplayState(anyString())).thenReturn(null); + // Simulate the real backend: the parent concurrency operation is available in storage after completion + // so that waitForOperationCompletion() can find it. TestConcurrencyOperation.handleSuccess/Failure are no-ops + // (no checkpoint sent), so we stub this unconditionally for OPERATION_ID. + when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)) + .thenReturn(Operation.builder() + .id(OPERATION_ID) + .name("test-concurrency") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL.getValue()) + .status(OperationStatus.SUCCEEDED) + .build()); + when(executionManager.sendOperationUpdate(any())).thenReturn(CompletableFuture.completedFuture(null)); } private TestConcurrencyOperation createOperation(int maxConcurrency, int minSuccessful, int toleratedFailureCount) @@ -138,7 +152,7 @@ void allChildrenAlreadySucceed_callsHandleSuccess() throws Exception { TypeToken.get(String.class), SER_DES); - runJoin(op); + op.exposedJoin(); assertTrue(op.isSuccessHandled()); assertFalse(op.isFailureHandled()); @@ -171,7 +185,7 @@ void singleChildAlreadySucceeds_fullCycle() throws Exception { TypeToken.get(String.class), SER_DES); - runJoin(op); + op.exposedJoin(); assertTrue(op.isSuccessHandled()); assertEquals(1, op.getSucceededCount()); @@ -191,14 +205,6 @@ void addItem_usesRootChildContextAsParent() throws Exception { assertSame(childContext, op.getLastParentContext()); } - // ===== Helpers ===== - - private void runJoin(TestConcurrencyOperation op) throws InterruptedException { - var t = new Thread(op::exposedJoin); - t.start(); - t.join(2000); - } - // ===== Test subclass ===== static class TestConcurrencyOperation extends ConcurrencyOperation { diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index a2b07312..86d5784f 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -9,6 +9,7 @@ import static org.mockito.Mockito.*; import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; @@ -67,6 +68,26 @@ void setUp() { mockIdGenerator = mock(OperationIdGenerator.class); when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet()); when(executionManager.getOperationAndUpdateReplayState(anyString())).thenReturn(null); + + // Simulate the real backend: when a SUCCEED checkpoint is sent for the parallel op, + // make getOperationAndUpdateReplayState return a SUCCEEDED operation so waitForOperationCompletion() can find + // it. + var succeededParallelOp = Operation.builder() + .id(OPERATION_ID) + .name("test-parallel") + .type(OperationType.CONTEXT) + .subType(OperationSubType.PARALLEL.getValue()) + .status(OperationStatus.SUCCEEDED) + .build(); + when(executionManager.sendOperationUpdate(argThat(u -> u != null + && u.id() != null + && u.id().equals(OPERATION_ID) + && u.action() == OperationAction.SUCCEED))) + .thenAnswer(inv -> { + when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)) + .thenReturn(succeededParallelOp); + return CompletableFuture.completedFuture(null); + }); } private ParallelOperation createOperation(int maxConcurrency, int minSuccessful, int toleratedFailureCount) { @@ -153,7 +174,7 @@ void handleSuccess_sendsSucceedCheckpoint() throws Exception { op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES); op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES); - runJoin(op); + op.get(); verify(executionManager).sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); } @@ -179,7 +200,7 @@ void minSuccessful_joinCompletesWhenThresholdMet() throws Exception { op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES); // Should not throw - assertDoesNotThrow(() -> runJoin(op)); + op.get(); assertEquals(1, op.getSucceededCount()); } @@ -240,7 +261,7 @@ void replay_doesNotSendStartCheckpoint() throws Exception { op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES); op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES); - runJoin(op); + op.get(); verify(executionManager, never()) .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START)); @@ -285,7 +306,7 @@ void replay_doesNotSendSucceedCheckpointWhenParallelAlreadySucceeded() throws Ex op.addItem("branch-1", ctx -> "r1", TypeToken.get(String.class), SER_DES); op.addItem("branch-2", ctx -> "r2", TypeToken.get(String.class), SER_DES); - runJoin(op); + op.get(); verify(executionManager, never()) .sendOperationUpdate(argThat(update -> update.action() == OperationAction.START)); @@ -318,22 +339,10 @@ void handleFailure_sendsSucceedCheckpointEvenWhenFailureToleranceExceeded() thro TypeToken.get(String.class), SER_DES); - runJoin(op); + op.get(); verify(executionManager).sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED)); verify(executionManager, never()) .sendOperationUpdate(argThat(update -> update.action() == OperationAction.FAIL)); } - - // ===== Helpers ===== - - private void runJoin(ParallelOperation op) throws InterruptedException { - var t = new Thread(op::get); - t.start(); - t.join(2000); - if (t.isAlive()) { - t.interrupt(); - fail("join() did not complete within 2 seconds"); - } - } }