diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/ParallelIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/ParallelIntegrationTest.java index d55f99cb7..341c283ce 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/ParallelIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/ParallelIntegrationTest.java @@ -76,7 +76,7 @@ void testParallelPartialFailure_failedBranchDoesNotPreventOthers() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { var config = ParallelConfig.builder().build(); var futures = new ArrayList>(); - var parallel = context.parallel("partial-fail", config); + ParallelDurableFuture parallel = context.parallel("partial-fail", config); try (parallel) { futures.add(parallel.branch("branch-a", String.class, ctx -> "A")); @@ -322,7 +322,7 @@ void testStepBeforeAndAfterParallel() { void testSequentialParallelBlocks() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { var futures1 = new ArrayList>(); - var parallel1 = + ParallelDurableFuture parallel1 = context.parallel("parallel-1", ParallelConfig.builder().build()); try (parallel1) { futures1.add(parallel1.branch("branch-a", String.class, ctx -> "A")); @@ -330,7 +330,7 @@ void testSequentialParallelBlocks() { } var futures2 = new ArrayList>(); - var parallel2 = + ParallelDurableFuture parallel2 = context.parallel("parallel-2", ParallelConfig.builder().build()); try (parallel2) { futures2.add(parallel2.branch("branch-x", String.class, ctx -> "x!")); @@ -354,7 +354,7 @@ void testParallelReplayWithFailedBranches() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { var config = ParallelConfig.builder().build(); var futures = new ArrayList>(); - var parallel = context.parallel("replay-fail-parallel", config); + ParallelDurableFuture parallel = context.parallel("replay-fail-parallel", config); try (parallel) { futures.add(parallel.branch("branch-ok", String.class, ctx -> { @@ -371,7 +371,7 @@ void testParallelReplayWithFailedBranches() { })); } - var result = parallel.get(); + parallel.get(); assertEquals("OK", futures.get(0).get()); assertEquals("OK2", futures.get(2).get()); return "done"; @@ -392,7 +392,7 @@ void testParallelWithSingleBranch() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { var config = ParallelConfig.builder().build(); var futures = new ArrayList>(); - var parallel = context.parallel("single-branch", config); + ParallelDurableFuture parallel = context.parallel("single-branch", config); try (parallel) { futures.add(parallel.branch( @@ -418,7 +418,7 @@ void testParallelWithWaitInsideBranches_replay() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { var config = ParallelConfig.builder().build(); var futures = new ArrayList>(); - var parallel = context.parallel("wait-replay-parallel", config); + ParallelDurableFuture parallel = context.parallel("wait-replay-parallel", config); try (parallel) { for (var item : List.of("a", "b")) { @@ -546,7 +546,7 @@ void testParallelWithToleratedFailureCount_earlyTermination() { .completionConfig(CompletionConfig.toleratedFailureCount(1)) .build(); var futures = new ArrayList>(); - var parallel = context.parallel("tolerated-fail", config); + ParallelDurableFuture parallel = context.parallel("tolerated-fail", config); try (parallel) { futures.add(parallel.branch("branch-ok", String.class, ctx -> "OK")); @@ -580,7 +580,7 @@ void testParallelWithMinSuccessful_earlyTermination() { .completionConfig(CompletionConfig.minSuccessful(2)) .build(); var futures = new ArrayList>(); - var parallel = context.parallel("min-successful", config); + ParallelDurableFuture parallel = context.parallel("min-successful", config); try (parallel) { for (var item : List.of("a", "b", "c", "d", "e")) { @@ -617,7 +617,7 @@ void testParallelWithAllSuccessful_stopsOnFirstFailure() { .completionConfig(CompletionConfig.allSuccessful()) .build(); var futures = new ArrayList>(); - var parallel = context.parallel("all-successful", config); + ParallelDurableFuture parallel = context.parallel("all-successful", config); try (parallel) { futures.add(parallel.branch("branch-ok1", String.class, ctx -> "OK1")); diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java index c73598cef..0e9814fbd 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java @@ -340,8 +340,7 @@ private DurableExecutionInput createDurableInput(I input) { .build(); // Load previous operations and include them in InitialExecutionState - var existingOps = - storage.getExecutionState(executionArn, "test-token", null).operations(); + var existingOps = storage.getAllOperations(); var allOps = new ArrayList<>(List.of(executionOp)); allOps.addAll(existingOps); diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/EventProcessor.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/EventProcessor.java index fb9244161..9cac1dd93 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/EventProcessor.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/EventProcessor.java @@ -5,29 +5,86 @@ import static software.amazon.awssdk.services.lambda.model.EventType.*; import java.time.Instant; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import software.amazon.awssdk.services.lambda.model.*; /** Generates Event objects from OperationUpdate for local testing. */ class EventProcessor { private final AtomicInteger eventId = new AtomicInteger(1); + private final List allEvents = new CopyOnWriteArrayList<>(); - Event processUpdate(OperationUpdate update, Operation operation) { + void processUpdate(OperationUpdate update, Operation operation) { var builder = Event.builder() .eventId(eventId.getAndIncrement()) .eventTimestamp(Instant.now()) .id(update.id()) .name(update.name()); - return switch (update.type()) { - case STEP -> buildStepEvent(builder, update, operation); - case WAIT -> buildWaitEvent(builder, update, operation); - case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation); - case EXECUTION -> buildExecutionEvent(builder, update); - case CALLBACK -> buildCallbackEvent(builder, update); - case CONTEXT -> buildContextEvent(builder, update); - default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type()); - }; + Event event = + switch (update.type()) { + case STEP -> buildStepEvent(builder, update, operation); + case WAIT -> buildWaitEvent(builder, update, operation); + case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation); + case EXECUTION -> buildExecutionEvent(builder, update); + case CALLBACK -> buildCallbackEvent(builder, update); + case CONTEXT -> buildContextEvent(builder, update); + default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type()); + }; + + allEvents.add(event); + } + + // process new status of an operation without an OperationUpdate + void processUpdate(Operation updatedOperation) { + var builder = Event.builder() + .eventId(eventId.getAndIncrement()) + .eventTimestamp(Instant.now()) + .id(updatedOperation.id()) + .name(updatedOperation.name()); + // support the statuses that don't have a corresponding OperationAction + switch (updatedOperation.status()) { + case STARTED -> { + // used by resetCheckpointToStarted + return; + } + case READY -> { + if (updatedOperation.type() == OperationType.STEP) { + // no event type for this case + return; + } else { + throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type()); + } + } + case TIMED_OUT -> { + switch (updatedOperation.type()) { + case EXECUTION -> builder.eventType(EXECUTION_TIMED_OUT); + case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_TIMED_OUT); + case CALLBACK -> builder.eventType(CALLBACK_TIMED_OUT); + default -> + throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type()); + } + } + case STOPPED -> { + switch (updatedOperation.type()) { + case EXECUTION -> builder.eventType(EXECUTION_STOPPED); + case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_STOPPED); + default -> + throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type()); + } + } + default -> throw new IllegalArgumentException("Unsupported operation status: " + updatedOperation.status()); + } + allEvents.add(builder.build()); + } + + List getAllEvents() { + return List.copyOf(allEvents); + } + + public List getEventsForOperation(String operationId) { + return allEvents.stream().filter(e -> e.id().equals(operationId)).toList(); } private Event buildStepEvent(Event.Builder builder, OperationUpdate update, Operation operation) { diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java index 9c691de02..847872f46 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java @@ -2,15 +2,22 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.testing.local; -import java.time.Instant; -import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import software.amazon.awssdk.services.lambda.model.*; +import software.amazon.awssdk.services.lambda.model.CheckpointDurableExecutionResponse; +import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState; +import software.amazon.awssdk.services.lambda.model.GetDurableExecutionStateResponse; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationAction; +import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationType; +import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.client.DurableExecutionClient; import software.amazon.lambda.durable.model.DurableExecutionOutput; @@ -23,10 +30,11 @@ * in memory, simulating the durable execution backend without AWS infrastructure. */ public class LocalMemoryExecutionClient implements DurableExecutionClient { - private final Map operations = new ConcurrentHashMap<>(); - private final List allEvents = new CopyOnWriteArrayList<>(); + // use LinkedHashMap to keep insertion order + private final Map existingOperations = Collections.synchronizedMap(new LinkedHashMap<>()); private final EventProcessor eventProcessor = new EventProcessor(); private final List operationUpdates = new CopyOnWriteArrayList<>(); + private final Map updatedOperations = new HashMap<>(); @Override public CheckpointDurableExecutionResponse checkpoint(String arn, String token, List updates) { @@ -35,19 +43,25 @@ public CheckpointDurableExecutionResponse checkpoint(String arn, String token, L var newToken = UUID.randomUUID().toString(); - return CheckpointDurableExecutionResponse.builder() - .checkpointToken(newToken) - .newExecutionState(CheckpointUpdatedExecutionState.builder() - .operations(operations.values()) - .build()) - .build(); + CheckpointDurableExecutionResponse response; + synchronized (updatedOperations) { + response = CheckpointDurableExecutionResponse.builder() + .checkpointToken(newToken) + .newExecutionState(CheckpointUpdatedExecutionState.builder() + .operations(updatedOperations.values()) + .build()) + .build(); + + // updatedOperations was copied into response, so clearing it is safe here + updatedOperations.clear(); + } + return response; } @Override public GetDurableExecutionStateResponse getExecutionState(String arn, String checkpointToken, String marker) { - return GetDurableExecutionStateResponse.builder() - .operations(operations.values()) - .build(); + // local runner doesn't use this API at all + throw new UnsupportedOperationException("getExecutionState is not supported"); } /** Get all operation updates that have been sent to this client. Useful for testing and verification. */ @@ -55,48 +69,26 @@ public List getOperationUpdates() { return List.copyOf(operationUpdates); } - /** Get all events in order. */ - public List getAllEvents() { - return List.copyOf(allEvents); - } - - /** Get events for a specific operation. */ - public List getEventsForOperation(String operationId) { - return allEvents.stream().filter(e -> operationId.equals(e.id())).toList(); - } - /** * Advance all operations (simulates time passing for retries/waits). * * @return true if any operations were advanced, false otherwise */ public boolean advanceTime() { - var replaced = new AtomicBoolean(false); - operations.replaceAll((key, op) -> { - // advance pending retries - if (op.status() == OperationStatus.PENDING) { - replaced.set(true); - return op.toBuilder().status(OperationStatus.READY).build(); + var hasOperationsAdvanced = new AtomicBoolean(false); + // forEach is safe as we're not adding or removing keys here + existingOperations.forEach((key, op) -> { + if (op.type() == OperationType.STEP && op.status() == OperationStatus.PENDING) { + applyResult(op, OperationResult.ready()); + hasOperationsAdvanced.set(true); } - // advance waits - if (op.status() == OperationStatus.STARTED && op.type() == OperationType.WAIT) { - var succeededOp = - op.toBuilder().status(OperationStatus.SUCCEEDED).build(); - // Generate WaitSucceeded event - var update = OperationUpdate.builder() - .id(op.id()) - .name(op.name()) - .type(OperationType.WAIT) - .action(OperationAction.SUCCEED) - .build(); - var event = eventProcessor.processUpdate(update, succeededOp); - allEvents.add(event); - replaced.set(true); - return succeededOp; + + if (op.type() == OperationType.WAIT && op.status() == OperationStatus.STARTED) { + applyResult(op, OperationResult.succeeded(null)); + hasOperationsAdvanced.set(true); } - return op; }); - return replaced.get(); + return hasOperationsAdvanced.get(); } /** Completes a chained invoke operation with the given result, simulating a child Lambda response. */ @@ -105,34 +97,15 @@ public void completeChainedInvoke(String name, OperationResult result) { if (op == null) { throw new IllegalStateException("Operation not found: " + name); } - if (op.type() == OperationType.CHAINED_INVOKE - && op.status() == OperationStatus.STARTED - && op.name().equals(name)) { - var newOp = op.toBuilder() - .status(result.operationStatus()) - .chainedInvokeDetails(ChainedInvokeDetails.builder() - .result(result.result()) - .error(result.error()) - .build()) - .build(); - var update = OperationUpdate.builder() - .id(op.id()) - .name(op.name()) - .type(OperationType.CHAINED_INVOKE) - .action( - result.operationStatus() == OperationStatus.SUCCEEDED - ? OperationAction.SUCCEED - : OperationAction.FAIL) - .build(); - var event = eventProcessor.processUpdate(update, newOp); - allEvents.add(event); - operations.put(compositeKey(op.parentId(), op.id()), newOp); + if (op.type() != OperationType.CHAINED_INVOKE || op.status() != OperationStatus.STARTED) { + throw new IllegalStateException("Operation is not a CHAINED_INVOKE or not in STARTED state"); } + applyResult(op, result); } /** Returns the operation with the given name, or null if not found. */ public Operation getOperationByName(String name) { - return operations.values().stream() + return existingOperations.values().stream() .filter(op -> name.equals(op.name())) .findFirst() .orElse(null); @@ -140,27 +113,21 @@ public Operation getOperationByName(String name) { /** Returns all operations currently stored. */ public List getAllOperations() { - return operations.values().stream().toList(); - } - - /** Clears all operations and events, resetting the client to its initial state. */ - public void reset() { - operations.clear(); - allEvents.clear(); + return existingOperations.values().stream().toList(); } /** Build TestResult from current state. */ public TestResult toTestResult(DurableExecutionOutput output, TypeToken resultType, SerDes serDes) { - var testOperations = operations.values().stream() + var testOperations = existingOperations.values().stream() .filter(op -> op.type() != OperationType.EXECUTION) - .map(op -> new TestOperation(op, getEventsForOperation(op.id()), serDes)) + .map(op -> new TestOperation(op, eventProcessor.getEventsForOperation(op.id()), serDes)) .toList(); return new TestResult<>( output.status(), output.result(), output.error(), testOperations, - new ArrayList<>(allEvents), + eventProcessor.getAllEvents(), resultType, serDes); } @@ -172,7 +139,7 @@ public void resetCheckpointToStarted(String stepName) { throw new IllegalStateException("Operation not found: " + stepName); } var startedOp = op.toBuilder().status(OperationStatus.STARTED).build(); - operations.put(compositeKey(op.parentId(), op.id()), startedOp); + updateOperation(null, startedOp); } /** Simulate fire-and-forget checkpoint loss by removing the operation entirely */ @@ -181,113 +148,16 @@ public void simulateFireAndForgetCheckpointLoss(String stepName) { if (op == null) { throw new IllegalStateException("Operation not found: " + stepName); } - operations.remove(compositeKey(op.parentId(), op.id())); - } - - private void applyUpdate(OperationUpdate update) { - var operation = toOperation(update); - var key = compositeKey(update.parentId(), update.id()); - operations.put(key, operation); - - var event = eventProcessor.processUpdate(update, operation); - allEvents.add(event); - } - - private static String compositeKey(String parentId, String operationId) { - return (parentId != null ? parentId : "") + ":" + operationId; - } - - private Operation toOperation(OperationUpdate update) { - var builder = Operation.builder() - .id(update.id()) - .name(update.name()) - .type(update.type()) - .subType(update.subType()) - .parentId(update.parentId()) - .status(deriveStatus(update.action())); - - switch (update.type()) { - case WAIT -> builder.waitDetails(buildWaitDetails(update)); - case STEP -> builder.stepDetails(buildStepDetails(update)); - case CALLBACK -> builder.callbackDetails(buildCallbackDetails(update)); - case EXECUTION -> {} // No details needed for EXECUTION operations - case CHAINED_INVOKE -> builder.chainedInvokeDetails(buildChainedInvokeDetails(update)); - case CONTEXT -> builder.contextDetails(buildContextDetails(update)); - case UNKNOWN_TO_SDK_VERSION -> - throw new UnsupportedOperationException("UNKNOWN_TO_SDK_VERSION not supported"); + existingOperations.remove(op.id()); + synchronized (updatedOperations) { + updatedOperations.remove(op.id()); } - - return builder.build(); - } - - private ChainedInvokeDetails buildChainedInvokeDetails(OperationUpdate update) { - if (update.chainedInvokeOptions() == null) { - return null; - } - return ChainedInvokeDetails.builder() - .result(update.payload()) - .error(update.error()) - .build(); } - private ContextDetails buildContextDetails(OperationUpdate update) { - var detailsBuilder = ContextDetails.builder().result(update.payload()).error(update.error()); - - if (update.contextOptions() != null - && Boolean.TRUE.equals(update.contextOptions().replayChildren())) { - detailsBuilder.replayChildren(true); - } - - return detailsBuilder.build(); - } - - private WaitDetails buildWaitDetails(OperationUpdate update) { - if (update.waitOptions() == null) return null; - - var scheduledEnd = Instant.now().plusSeconds(update.waitOptions().waitSeconds()); - return WaitDetails.builder().scheduledEndTimestamp(scheduledEnd).build(); - } - - private StepDetails buildStepDetails(OperationUpdate update) { - var key = compositeKey(update.parentId(), update.id()); - var existingOp = operations.get(key); - var existing = existingOp != null ? existingOp.stepDetails() : null; - - var detailsBuilder = existing != null ? existing.toBuilder() : StepDetails.builder(); - var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1; - - if (update.action() == OperationAction.FAIL) { - detailsBuilder.attempt(attempt).error(update.error()); - } - - if (update.action() == OperationAction.RETRY) { - detailsBuilder - .attempt(attempt) - .error(update.error()) - .nextAttemptTimestamp( - Instant.now().plusSeconds(update.stepOptions().nextAttemptDelaySeconds())); - } - - if (update.payload() != null) { - detailsBuilder.result(update.payload()); - } - - return detailsBuilder.build(); - } - - private CallbackDetails buildCallbackDetails(OperationUpdate update) { - var key = compositeKey(update.parentId(), update.id()); - var existingOp = operations.get(key); - var existing = existingOp != null ? existingOp.callbackDetails() : null; - - // Preserve existing callbackId, or generate new one on START - var callbackId = - existing != null ? existing.callbackId() : UUID.randomUUID().toString(); - - return CallbackDetails.builder() - .callbackId(callbackId) - .result(existing != null ? existing.result() : null) - .build(); + private void applyUpdate(OperationUpdate update) { + var existingOp = existingOperations.get(update.id()); + var updatedOp = OperationProcessor.applyUpdate(update, existingOp); + updateOperation(update, updatedOp); } /** Get callback ID for a named callback operation. */ @@ -305,32 +175,67 @@ public void completeCallback(String callbackId, OperationResult result) { if (op == null) { throw new IllegalStateException("Callback not found: " + callbackId); } - var updated = op.toBuilder() - .status(result.operationStatus()) - .callbackDetails(op.callbackDetails().toBuilder() - .result(result.result()) - .error(result.error()) - .build()) - .build(); - operations.put(compositeKey(op.parentId(), op.id()), updated); + if (op.type() != OperationType.CALLBACK || op.status() != OperationStatus.STARTED) { + throw new IllegalStateException("Operation is not a CALLBACK or not in STARTED state"); + } + + applyResult(op, result); + } + + private void applyResult(Operation op, OperationResult result) { + // derive a possible action from the target status + OperationAction action = deriveAction(result.operationStatus()); + if (action != null) { + var update = OperationUpdate.builder() + .id(op.id()) + .name(op.name()) + .type(op.type()) + .action(action) + .parentId(op.parentId()) + .payload(result.result()) + .error(result.error()) + .build(); + applyUpdate(update); + } else if (result.operationStatus() == OperationStatus.TIMED_OUT + || result.operationStatus() == OperationStatus.STOPPED + || result.operationStatus() == OperationStatus.READY) { + var newOp = OperationProcessor.applyResult(op, result); + updateOperation(null, newOp); + } else { + throw new IllegalStateException("Unsupported OperationStatus in result: " + result.operationStatus()); + } + } + + private static OperationAction deriveAction(OperationStatus status) { + return switch (status) { + case STARTED -> OperationAction.START; + case SUCCEEDED -> OperationAction.SUCCEED; + case FAILED -> OperationAction.FAIL; + case PENDING -> OperationAction.RETRY; + case CANCELLED -> OperationAction.CANCEL; + case READY, TIMED_OUT, STOPPED -> null; // no action for these operation statuses + case UNKNOWN_TO_SDK_VERSION -> OperationAction.UNKNOWN_TO_SDK_VERSION; // Todo: Check this + }; } private Operation findOperationByCallbackId(String callbackId) { - return operations.values().stream() + return existingOperations.values().stream() .filter(op -> op.callbackDetails() != null && callbackId.equals(op.callbackDetails().callbackId())) .findFirst() .orElse(null); } - private OperationStatus deriveStatus(OperationAction action) { - return switch (action) { - case START -> OperationStatus.STARTED; - case SUCCEED -> OperationStatus.SUCCEEDED; - case FAIL -> OperationStatus.FAILED; - case RETRY -> OperationStatus.PENDING; - case CANCEL -> OperationStatus.CANCELLED; - case UNKNOWN_TO_SDK_VERSION -> OperationStatus.UNKNOWN_TO_SDK_VERSION; // Todo: Check this - }; + private void updateOperation(OperationUpdate update, Operation op) { + // update can be null when an operation is updated without an OperationUpdate + if (update == null) { + eventProcessor.processUpdate(op); + } else { + eventProcessor.processUpdate(update, op); + } + existingOperations.put(op.id(), op); + synchronized (updatedOperations) { + updatedOperations.put(op.id(), op); + } } } diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/OperationProcessor.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/OperationProcessor.java new file mode 100644 index 000000000..1865e5976 --- /dev/null +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/OperationProcessor.java @@ -0,0 +1,176 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.testing.local; + +import java.time.Instant; +import java.util.UUID; +import software.amazon.awssdk.services.lambda.model.CallbackDetails; +import software.amazon.awssdk.services.lambda.model.ChainedInvokeDetails; +import software.amazon.awssdk.services.lambda.model.ContextDetails; +import software.amazon.awssdk.services.lambda.model.Operation; +import software.amazon.awssdk.services.lambda.model.OperationAction; +import software.amazon.awssdk.services.lambda.model.OperationStatus; +import software.amazon.awssdk.services.lambda.model.OperationUpdate; +import software.amazon.awssdk.services.lambda.model.StepDetails; +import software.amazon.awssdk.services.lambda.model.WaitDetails; + +public class OperationProcessor { + /** Applies the update to the existing operation, returning a new operation. */ + static Operation applyUpdate(OperationUpdate update, Operation existingOp) { + var builder = Operation.builder() + .id(update.id()) + .name(update.name()) + .type(update.type()) + .subType(update.subType()) + .parentId(update.parentId()) + .status(deriveStatus(update.action())); + + switch (update.type()) { + case WAIT -> builder.waitDetails(buildWaitDetails(update)); + case STEP -> builder.stepDetails(buildStepDetails(update, existingOp)); + case CALLBACK -> builder.callbackDetails(buildCallbackDetails(update, existingOp)); + case EXECUTION -> {} // No details needed for EXECUTION operations + case CHAINED_INVOKE -> builder.chainedInvokeDetails(buildChainedInvokeDetails(update)); + case CONTEXT -> builder.contextDetails(buildContextDetails(update)); + case UNKNOWN_TO_SDK_VERSION -> + throw new UnsupportedOperationException("UNKNOWN_TO_SDK_VERSION not supported"); + } + + return builder.build(); + } + + /** Applies the result of an operation to the existing operation, returning a new operation. */ + public static Operation applyResult(Operation op, OperationResult result) { + var builder = Operation.builder() + .id(op.id()) + .name(op.name()) + .type(op.type()) + .subType(op.subType()) + .parentId(op.parentId()) + .status(result.operationStatus()); + + switch (op.type()) { + case WAIT -> builder.waitDetails(buildWaitDetails(result, op)); + case STEP -> builder.stepDetails(buildStepDetails(result, op)); + case CALLBACK -> builder.callbackDetails(buildCallbackDetails(result, op)); + case EXECUTION -> {} // No details needed for EXECUTION operations + case CHAINED_INVOKE -> builder.chainedInvokeDetails(buildChainedInvokeDetails(result, op)); + case CONTEXT -> builder.contextDetails(buildContextDetails(result, op)); + case UNKNOWN_TO_SDK_VERSION -> + throw new UnsupportedOperationException("UNKNOWN_TO_SDK_VERSION not supported"); + } + return builder.build(); + } + + private static ContextDetails buildContextDetails(OperationResult result, Operation op) { + throw new IllegalArgumentException("Context operation type is not supported"); + } + + private static ChainedInvokeDetails buildChainedInvokeDetails(OperationResult result, Operation op) { + if (result.operationStatus() == OperationStatus.STOPPED + || result.operationStatus() == OperationStatus.TIMED_OUT) { + return op.chainedInvokeDetails().toBuilder().error(result.error()).build(); + } + throw new IllegalArgumentException("Operation status not supported: " + result.operationStatus()); + } + + private static ChainedInvokeDetails buildChainedInvokeDetails(OperationUpdate update) { + if (update.action() == OperationAction.START) { + return ChainedInvokeDetails.builder().build(); + } else { + return ChainedInvokeDetails.builder() + .result(update.payload()) + .error(update.error()) + .build(); + } + } + + private static ContextDetails buildContextDetails(OperationUpdate update) { + var detailsBuilder = ContextDetails.builder().result(update.payload()).error(update.error()); + + if (update.contextOptions() != null + && Boolean.TRUE.equals(update.contextOptions().replayChildren())) { + detailsBuilder.replayChildren(true); + } + + return detailsBuilder.build(); + } + + private static WaitDetails buildWaitDetails(OperationResult result, Operation op) { + if (result.operationStatus() != OperationStatus.SUCCEEDED) { + throw new IllegalArgumentException("Operation status is not SUCCEEDED"); + } + return op.waitDetails().toBuilder().build(); + } + + private static WaitDetails buildWaitDetails(OperationUpdate update) { + if (update.waitOptions() == null) return null; + + var scheduledEnd = Instant.now().plusSeconds(update.waitOptions().waitSeconds()); + return WaitDetails.builder().scheduledEndTimestamp(scheduledEnd).build(); + } + + private static StepDetails buildStepDetails(OperationUpdate update, Operation existingOp) { + var existing = existingOp != null ? existingOp.stepDetails() : null; + + var detailsBuilder = existing != null ? existing.toBuilder() : StepDetails.builder(); + var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1; + + if (update.action() == OperationAction.FAIL) { + detailsBuilder.attempt(attempt).error(update.error()); + } + + if (update.action() == OperationAction.RETRY) { + detailsBuilder + .attempt(attempt) + .error(update.error()) + .nextAttemptTimestamp( + Instant.now().plusSeconds(update.stepOptions().nextAttemptDelaySeconds())); + } + + if (update.payload() != null) { + detailsBuilder.result(update.payload()); + } + + return detailsBuilder.build(); + } + + private static StepDetails buildStepDetails(OperationResult result, Operation op) { + if (result.operationStatus() == OperationStatus.READY) { + return op.stepDetails().toBuilder().build(); + } + throw new IllegalArgumentException("Operation status is not READY"); + } + + private static CallbackDetails buildCallbackDetails(OperationResult result, Operation op) { + if (result.operationStatus() == OperationStatus.TIMED_OUT) { + return op.callbackDetails().toBuilder().error(result.error()).build(); + } + return null; + } + + private static CallbackDetails buildCallbackDetails(OperationUpdate update, Operation existingOp) { + var existing = existingOp != null ? existingOp.callbackDetails() : null; + + // Preserve existing callbackId, or generate new one on START + var callbackId = + existing != null ? existing.callbackId() : UUID.randomUUID().toString(); + + return CallbackDetails.builder() + .callbackId(callbackId) + .result(existing != null ? update.payload() : null) + .error(existing != null ? update.error() : null) + .build(); + } + + private static OperationStatus deriveStatus(OperationAction action) { + return switch (action) { + case START -> OperationStatus.STARTED; + case SUCCEED -> OperationStatus.SUCCEEDED; + case FAIL -> OperationStatus.FAILED; + case RETRY -> OperationStatus.PENDING; + case CANCEL -> OperationStatus.CANCELLED; + case UNKNOWN_TO_SDK_VERSION -> OperationStatus.UNKNOWN_TO_SDK_VERSION; // Todo: Check this + }; + } +} diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/OperationResult.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/OperationResult.java index bf24b902b..19a2e682e 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/OperationResult.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/OperationResult.java @@ -22,4 +22,8 @@ public static OperationResult stopped(ErrorObject error) { public static OperationResult timedout() { return new OperationResult(OperationStatus.TIMED_OUT, null, null); } + + public static OperationResult ready() { + return new OperationResult(OperationStatus.READY, null, null); + } }