diff --git a/docs/core/map.md b/docs/core/map.md index d26bd364..0037e740 100644 --- a/docs/core/map.md +++ b/docs/core/map.md @@ -41,14 +41,14 @@ MapResult result = future.get(); | Method | Description | |--------|-------------| | `getResult(i)` | Result at index `i`, or `null` if that item failed | -| `getError(i)` | Error at index `i`, or `null` if that item succeeded | +| `getError(i)` | `ErrorObject` at index `i`, or `null` if that item succeeded | | `getItem(i)` | The `MapResultItem` at index `i` with status, result, and error | | `allSucceeded()` | `true` if every item succeeded | | `size()` | Number of items in the result | | `items()` | All result items as an unmodifiable list | | `results()` | All results as an unmodifiable list (nulls for failed items) | | `succeeded()` | Only the non-null (successful) results | -| `failed()` | Only the non-null errors | +| `failed()` | Only the non-null `ErrorObject`s | | `completionReason()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) | ### MapResultItem @@ -57,9 +57,9 @@ Each `MapResultItem` contains: | Field | Description | |-------|-------------| -| `status()` | `SUCCEEDED`, `FAILED`, or `null` (not started) | +| `status()` | `SUCCEEDED`, `FAILED`, or `NOT_STARTED` | | `result()` | The result value, or `null` if failed/not started | -| `error()` | The error, or `null` if succeeded/not started | +| `error()` | The error details as `ErrorObject`, or `null` if succeeded/not started | ### Error Isolation @@ -126,7 +126,7 @@ var result = ctx.map("find-two", items, String.class, fn, config); assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); ``` -When early termination triggers, items that were never started have `null` for both result and error in the `MapResult`. +When early termination triggers, items that were never started have `NOT_STARTED` status with `null` for both result and error in the `MapResult`. ### Checkpoint-and-Replay diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/ComplexMapExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/ComplexMapExample.java new file mode 100644 index 00000000..e7ba2726 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/ComplexMapExample.java @@ -0,0 +1,63 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import software.amazon.lambda.durable.CompletionConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.MapConfig; + +/** + * Example demonstrating advanced map features: wait operations inside branches, error handling, and early termination. + * + *
    + *
  1. Concurrent map with step + wait + step inside each branch — simulates multi-stage order processing with a + * cooldown between stages + *
  2. Early termination with {@code minSuccessful(2)} — finds 2 healthy servers then stops + *
+ */ +public class ComplexMapExample extends DurableHandler { + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + var name = input.getName(); + context.getLogger().info("Starting complex map example for {}", name); + + // Part 1: Concurrent map with step + wait inside each branch + var orderIds = List.of("order-1", "order-2", "order-3"); + + var orderResult = context.map("process-orders", orderIds, String.class, (orderId, index, ctx) -> { + // Step 1: validate the order + var validated = ctx.step("validate-" + index, String.class, stepCtx -> "validated:" + orderId + ":" + name); + + // Wait between stages (simulates a cooldown or external dependency) + ctx.wait("cooldown-" + index, Duration.ofSeconds(1)); + + // Step 2: finalize the order + return ctx.step("finalize-" + index, String.class, stepCtx -> "done:" + validated); + }); + + var orderSummary = String.join(", ", orderResult.results()); + + // Part 2: Early termination — find 2 healthy servers then stop + var servers = List.of("server-1", "server-2", "server-3", "server-4", "server-5"); + var earlyTermConfig = MapConfig.builder() + .completionConfig(CompletionConfig.minSuccessful(2)) + .build(); + + var serverResult = context.map( + "find-healthy-servers", + servers, + String.class, + (server, index, ctx) -> ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy"), + earlyTermConfig); + + var healthyServers = serverResult.succeeded().stream().collect(Collectors.joining(", ")); + + return String.format( + "orders=[%s] | servers=[%s] reason=%s", orderSummary, healthyServers, serverResult.completionReason()); + } +} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java deleted file mode 100644 index 5ad8edd0..00000000 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/MapConfigExample.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; - -import java.util.List; -import java.util.stream.Collectors; -import software.amazon.lambda.durable.CompletionConfig; -import software.amazon.lambda.durable.DurableContext; -import software.amazon.lambda.durable.DurableHandler; -import software.amazon.lambda.durable.MapConfig; - -/** - * Example demonstrating MapConfig options: concurrency limiting and completion strategies. - * - *

This handler runs two map operations to showcase different configurations: - * - *

    - *
  1. Sequential processing with {@code maxConcurrency(1)} — items run one at a time - *
  2. Early termination with {@code minSuccessful(2)} — stops after 2 items succeed - *
- */ -public class MapConfigExample extends DurableHandler { - - @Override - public String handleRequest(GreetingRequest input, DurableContext context) { - var name = input.getName(); - context.getLogger().info("Starting map config example for {}", name); - - // Part 1: Sequential execution with maxConcurrency=1 - var items = List.of("alpha", "beta", "gamma"); - var sequentialConfig = MapConfig.builder().maxConcurrency(1).build(); - - var sequentialResult = context.map( - "sequential-processing", - items, - String.class, - (item, index, ctx) -> { - return ctx.step("transform-" + index, String.class, stepCtx -> item.toUpperCase() + "-" + name); - }, - sequentialConfig); - - var sequentialOutput = String.join(", ", sequentialResult.results()); - context.getLogger().info("Sequential result: {}", sequentialOutput); - - // Part 2: Early termination with minSuccessful(2) - var candidates = List.of("server-1", "server-2", "server-3", "server-4", "server-5"); - var earlyTermConfig = MapConfig.builder() - .maxConcurrency(1) - .completionConfig(CompletionConfig.minSuccessful(2)) - .build(); - - var earlyTermResult = context.map( - "find-healthy-servers", - candidates, - String.class, - (server, index, ctx) -> { - return ctx.step("health-check-" + index, String.class, stepCtx -> server + ":healthy"); - }, - earlyTermConfig); - - context.getLogger() - .info( - "Early termination: reason={}, succeeded={}", - earlyTermResult.completionReason(), - earlyTermResult.succeeded().size()); - - var healthyServers = earlyTermResult.succeeded().stream().collect(Collectors.joining(", ")); - - return String.format( - "sequential=[%s] | earlyTerm=[%s] reason=%s", - sequentialOutput, healthyServers, earlyTermResult.completionReason()); - } -} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java deleted file mode 100644 index f5de5185..00000000 --- a/examples/src/main/java/software/amazon/lambda/durable/examples/MapErrorHandlingExample.java +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; - -import java.util.List; -import java.util.stream.Collectors; -import software.amazon.lambda.durable.DurableContext; -import software.amazon.lambda.durable.DurableHandler; -import software.amazon.lambda.durable.StepConfig; -import software.amazon.lambda.durable.retry.RetryStrategies; - -/** - * Example demonstrating error handling with the map operation. - * - *

Shows how individual item failures are isolated and captured in the {@code MapResult}, while other items continue - * to succeed. Demonstrates inspecting partial results using {@code allSucceeded()}, {@code getError()}, - * {@code succeeded()}, and {@code failed()}. - * - *

    - *
  1. Map over a list of order IDs concurrently - *
  2. Some orders intentionally fail to simulate real-world partial failures - *
  3. Inspect the MapResult to handle successes and failures separately - *
- */ -public class MapErrorHandlingExample extends DurableHandler { - - @Override - public String handleRequest(GreetingRequest input, DurableContext context) { - var name = input.getName(); - context.getLogger().info("Starting map error handling example for {}", name); - - var orderIds = List.of("order-1", "order-INVALID", "order-3", "order-ERROR", "order-5"); - - // Map over orders — some will fail, but others continue processing - var result = context.map("process-orders", orderIds, String.class, (orderId, index, ctx) -> { - return ctx.step( - "process-" + index, - String.class, - stepCtx -> { - if (orderId.contains("INVALID")) { - throw new IllegalArgumentException("Invalid order: " + orderId); - } - if (orderId.contains("ERROR")) { - throw new RuntimeException("Processing error for: " + orderId); - } - return "Processed " + orderId + " for " + name; - }, - StepConfig.builder() - .retryStrategy(RetryStrategies.Presets.NO_RETRY) - .build()); - }); - - context.getLogger() - .info( - "Map completed: allSucceeded={}, succeeded={}, failed={}", - result.allSucceeded(), - result.succeeded().size(), - result.failed().size()); - - // Build a summary showing successful results and error messages - var successSummary = result.succeeded().stream().collect(Collectors.joining(", ")); - - var errorSummary = new StringBuilder(); - for (int i = 0; i < result.size(); i++) { - if (result.getError(i) != null) { - errorSummary.append( - String.format("index %d: %s; ", i, result.getError(i).getMessage())); - } - } - - return String.format( - "succeeded=%d, failed=%d | results=[%s] | errors=[%s]", - result.succeeded().size(), - result.failed().size(), - successSummary, - errorSummary.toString().trim()); - } -} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java index 56849ffd..c0e703cc 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -571,39 +571,21 @@ void testSimpleMapExample() { } @Test - void testMapErrorHandlingExample() { - var runner = CloudDurableTestRunner.create( - arn("map-error-handling-example"), GreetingRequest.class, String.class, lambdaClient); - var result = runner.run(new GreetingRequest("Alice")); - - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - var output = result.getResult(String.class); - assertNotNull(output); - - // 3 of 5 orders succeed, 2 fail - assertTrue(output.contains("succeeded=3")); - assertTrue(output.contains("failed=2")); - assertTrue(output.contains("Processed order-1 for Alice")); - assertTrue(output.contains("Processed order-3 for Alice")); - assertTrue(output.contains("Processed order-5 for Alice")); - } - - @Test - void testMapConfigExample() { - var runner = CloudDurableTestRunner.create( - arn("map-config-example"), GreetingRequest.class, String.class, lambdaClient); + void testComplexMapExample() { + var runner = CloudDurableTestRunner.create(arn("complex-map-example"), GreetingRequest.class, String.class); var result = runner.run(new GreetingRequest("Alice")); assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); var output = result.getResult(String.class); assertNotNull(output); - // Sequential part: all 3 items processed - assertTrue(output.contains("ALPHA-Alice")); - assertTrue(output.contains("BETA-Alice")); - assertTrue(output.contains("GAMMA-Alice")); + // Part 1: Concurrent order processing with step + wait + step + assertTrue(output.contains("done:validated:order-1:Alice")); + assertTrue(output.contains("done:validated:order-2:Alice")); + assertTrue(output.contains("done:validated:order-3:Alice")); - // Early termination part + // Part 2: Early termination — find 2 healthy servers then stop + assertTrue(output.contains("healthy")); assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); } } diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/ComplexMapExampleTest.java similarity index 69% rename from examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java rename to examples/src/test/java/software/amazon/lambda/durable/examples/ComplexMapExampleTest.java index f2df8179..1b1ee2cd 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/MapConfigExampleTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/ComplexMapExampleTest.java @@ -8,11 +8,11 @@ import software.amazon.lambda.durable.model.ExecutionStatus; import software.amazon.lambda.durable.testing.LocalDurableTestRunner; -class MapConfigExampleTest { +class ComplexMapExampleTest { @Test - void testSequentialAndEarlyTermination() { - var handler = new MapConfigExample(); + void testComplexMapExample() { + var handler = new ComplexMapExample(); var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); var result = runner.runUntilComplete(new GreetingRequest("Alice")); @@ -20,17 +20,19 @@ void testSequentialAndEarlyTermination() { assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); var output = result.getResult(String.class); - assertTrue(output.contains("ALPHA-Alice")); - assertTrue(output.contains("BETA-Alice")); - assertTrue(output.contains("GAMMA-Alice")); + // Part 1: all 3 orders processed with step + wait + step + assertTrue(output.contains("done:validated:order-1:Alice")); + assertTrue(output.contains("done:validated:order-2:Alice")); + assertTrue(output.contains("done:validated:order-3:Alice")); + + // Part 2: early termination after 2 healthy servers assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); - assertTrue(output.contains("server-1:healthy")); - assertTrue(output.contains("server-2:healthy")); + assertTrue(output.contains("healthy")); } @Test void testReplay() { - var handler = new MapConfigExample(); + var handler = new ComplexMapExample(); var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); var input = new GreetingRequest("Bob"); diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java deleted file mode 100644 index b899edd3..00000000 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/MapErrorHandlingExampleTest.java +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package software.amazon.lambda.durable.examples; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.Test; -import software.amazon.lambda.durable.model.ExecutionStatus; -import software.amazon.lambda.durable.testing.LocalDurableTestRunner; - -class MapErrorHandlingExampleTest { - - @Test - void testPartialFailure() { - var handler = new MapErrorHandlingExample(); - var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); - - var result = runner.runUntilComplete(new GreetingRequest("Alice")); - - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - var output = result.getResult(String.class); - - // 3 of 5 orders succeed, 2 fail - assertTrue(output.contains("succeeded=3")); - assertTrue(output.contains("failed=2")); - assertTrue(output.contains("Processed order-1 for Alice")); - assertTrue(output.contains("Processed order-3 for Alice")); - assertTrue(output.contains("Processed order-5 for Alice")); - assertTrue(output.contains("Invalid order: order-INVALID")); - assertTrue(output.contains("Processing error for: order-ERROR")); - } - - @Test - void testReplay() { - var handler = new MapErrorHandlingExample(); - var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); - - var input = new GreetingRequest("Bob"); - var result1 = runner.runUntilComplete(input); - assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); - - var result2 = runner.runUntilComplete(input); - assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); - var output = result2.getResult(String.class); - // Replay — errors are not preserved in checkpoints (Throwable is not serializable), - // so the replay result will show failed=0 instead of failed=2. - // The successful results should still match. - assertTrue(output.contains("succeeded=3")); - assertTrue(output.contains("Processed order-1 for Bob")); - } -} diff --git a/examples/template.yaml b/examples/template.yaml index 0487db95..84217844 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -493,16 +493,16 @@ Resources: DockerContext: ../ DockerTag: durable-examples - MapErrorHandlingExampleFunction: + ComplexMapExampleFunction: Type: AWS::Serverless::Function Properties: PackageType: Image FunctionName: !Join - '' - - - 'map-error-handling-example' + - - 'complex-map-example' - !Ref FunctionNameSuffix ImageConfig: - Command: ["software.amazon.lambda.durable.examples.MapErrorHandlingExample::handleRequest"] + Command: ["software.amazon.lambda.durable.examples.ComplexMapExample::handleRequest"] DurableConfig: ExecutionTimeout: 300 RetentionPeriodInDays: 7 @@ -512,32 +512,7 @@ Resources: Action: - lambda:CheckpointDurableExecutions - lambda:GetDurableExecutionState - Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:map-error-handling-example${FunctionNameSuffix}" - Metadata: - Dockerfile: !Ref DockerFile - DockerContext: ../ - DockerTag: durable-examples - - MapConfigExampleFunction: - Type: AWS::Serverless::Function - Properties: - PackageType: Image - FunctionName: !Join - - '' - - - 'map-config-example' - - !Ref FunctionNameSuffix - ImageConfig: - Command: ["software.amazon.lambda.durable.examples.MapConfigExample::handleRequest"] - DurableConfig: - ExecutionTimeout: 300 - RetentionPeriodInDays: 7 - Policies: - - Statement: - - Effect: Allow - Action: - - lambda:CheckpointDurableExecutions - - lambda:GetDurableExecutionState - Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:map-config-example${FunctionNameSuffix}" + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:complex-map-example${FunctionNameSuffix}" Metadata: Dockerfile: !Ref DockerFile DockerContext: ../ @@ -696,19 +671,11 @@ Outputs: Description: Simple Map Example Function Name Value: !Ref SimpleMapExampleFunction - MapErrorHandlingExampleFunction: - Description: Map Error Handling Example Function ARN - Value: !GetAtt MapErrorHandlingExampleFunction.Arn - - MapErrorHandlingExampleFunctionName: - Description: Map Error Handling Example Function Name - Value: !Ref MapErrorHandlingExampleFunction - - MapConfigExampleFunction: - Description: Map Config Example Function ARN - Value: !GetAtt MapConfigExampleFunction.Arn + ComplexMapExampleFunction: + Description: Complex Map Example Function ARN + Value: !GetAtt ComplexMapExampleFunction.Arn - MapConfigExampleFunctionName: - Description: Map Config Example Function Name - Value: !Ref MapConfigExampleFunction + ComplexMapExampleFunctionName: + Description: Complex Map Example Function Name + Value: !Ref ComplexMapExampleFunction diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java index b141d528..17ba1ece 100644 --- a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/MapIntegrationTest.java @@ -4,12 +4,14 @@ import static org.junit.jupiter.api.Assertions.*; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import software.amazon.lambda.durable.model.CompletionReason; import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.model.MapResultItem; import software.amazon.lambda.durable.testing.LocalDurableTestRunner; class MapIntegrationTest { @@ -72,7 +74,7 @@ void testMapPartialFailure_failedItemDoesNotPreventOthers() { assertEquals("A", result.getResult(0)); assertNull(result.getResult(1)); assertNotNull(result.getError(1)); - assertTrue(result.getError(1).getMessage().contains("item failed")); + assertTrue(result.getError(1).errorMessage().contains("item failed")); assertEquals("C", result.getResult(2)); // successful items have no error @@ -111,10 +113,10 @@ void testMapMultipleFailures_allCapturedAtCorrectIndices() { // Failed items at correct indices assertNull(result.getResult(1)); assertNotNull(result.getError(1)); - assertTrue(result.getError(1).getMessage().contains("bad1")); + assertTrue(result.getError(1).errorMessage().contains("bad1")); assertNull(result.getResult(3)); assertNotNull(result.getError(3)); - assertTrue(result.getError(3).getMessage().contains("bad2")); + assertTrue(result.getError(3).errorMessage().contains("bad2")); assertEquals(2, result.succeeded().size()); assertEquals(2, result.failed().size()); @@ -143,8 +145,8 @@ void testMapAllItemsFail() { assertNull(result.getResult(i)); assertNotNull(result.getError(i)); } - assertTrue(result.getError(0).getMessage().contains("fail-x")); - assertTrue(result.getError(1).getMessage().contains("fail-y")); + assertTrue(result.getError(0).errorMessage().contains("fail-x")); + assertTrue(result.getError(1).errorMessage().contains("fail-y")); return "done"; }); @@ -230,59 +232,6 @@ void testMapWithMaxConcurrency2_limitedConcurrency() { assertTrue(peakConcurrency.get() <= 2, "Expected peak concurrency <= 2 but was " + peakConcurrency.get()); } - @Test - void testMapWithMaxConcurrencyNull_unlimitedConcurrency() { - var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("a", "b", "c"); - var result = context.map("unlimited-map", items, String.class, (item, index, ctx) -> { - return item.toUpperCase(); - }); - - assertTrue(result.allSucceeded()); - assertEquals(3, result.size()); - assertEquals("A", result.getResult(0)); - assertEquals("B", result.getResult(1)); - assertEquals("C", result.getResult(2)); - - return String.join(",", result.results()); - }); - - var result = runner.runUntilComplete("test"); - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - assertEquals("A,B,C", result.getResult(String.class)); - } - - @Test - void testMapWithMaxConcurrency1_partialFailure() { - var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("a", "FAIL", "c"); - var config = MapConfig.builder().maxConcurrency(1).build(); - var result = context.map( - "sequential-partial-fail", - items, - String.class, - (item, index, ctx) -> { - if ("FAIL".equals(item)) { - throw new RuntimeException("item failed"); - } - return item.toUpperCase(); - }, - config); - - assertFalse(result.allSucceeded()); - assertEquals(3, result.size()); - assertEquals("A", result.getResult(0)); - assertNull(result.getResult(1)); - assertNotNull(result.getError(1)); - assertEquals("C", result.getResult(2)); - - return "done"; - }); - - var result = runner.runUntilComplete("test"); - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - } - @Test void testMapWithToleratedFailureCount_earlyTermination() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { @@ -342,30 +291,6 @@ void testMapWithMinSuccessful_earlyTermination() { assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); } - @Test - void testMapWithFirstSuccessful_stopsAfterFirstSuccess() { - var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("a", "b", "c"); - var config = MapConfig.builder() - .maxConcurrency(1) - .completionConfig(CompletionConfig.firstSuccessful()) - .build(); - var result = context.map( - "first-successful", items, String.class, (item, index, ctx) -> item.toUpperCase(), config); - - assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); - assertEquals(3, result.size()); - assertEquals("A", result.getResult(0)); - assertNull(result.getResult(1)); - assertNull(result.getResult(2)); - - return "done"; - }); - - var result = runner.runUntilComplete("test"); - assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); - } - @Test void testMapReplayAfterInterruption_cachedResultsUsed() { var executionCounts = new AtomicInteger(0); @@ -398,35 +323,6 @@ void testMapReplayAfterInterruption_cachedResultsUsed() { assertEquals(firstRunCount, executionCounts.get(), "Map functions should not re-execute on replay"); } - @Test - void testMapReplayWithSteps_cachedStepResultsUsed() { - var stepExecutionCount = new AtomicInteger(0); - - var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { - var items = List.of("hello", "world"); - var result = context.map("replay-steps-map", items, String.class, (item, index, ctx) -> { - return ctx.step("step-" + index, String.class, stepCtx -> { - stepExecutionCount.incrementAndGet(); - return item.toUpperCase(); - }); - }); - - assertTrue(result.allSucceeded()); - return String.join(" ", result.results()); - }); - - var result1 = runner.runUntilComplete("test"); - assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); - assertEquals("HELLO WORLD", result1.getResult(String.class)); - var firstRunStepCount = stepExecutionCount.get(); - assertTrue(firstRunStepCount >= 2, "Expected at least 2 step executions but got " + firstRunStepCount); - - var result2 = runner.run("test"); - assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); - assertEquals("HELLO WORLD", result2.getResult(String.class)); - assertEquals(firstRunStepCount, stepExecutionCount.get(), "Steps should not re-execute on replay"); - } - @Test void testNestedMap_mapInsideMapBranch() { var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { @@ -459,4 +355,428 @@ void testNestedMap_mapInsideMapBranch() { assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); assertEquals("GROUP1-A+GROUP1-B|GROUP2-A+GROUP2-B", result.getResult(String.class)); } + + @Test + void testMapWithWaitInsideBranches() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b"); + var result = context.map("map-with-wait", items, String.class, (item, index, ctx) -> { + var stepped = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + ctx.wait("pause-" + index, Duration.ofSeconds(1)); + return stepped + "-done"; + }); + + assertTrue(result.allSucceeded()); + assertEquals("A-done", result.getResult(0)); + assertEquals("B-done", result.getResult(1)); + return String.join(",", result.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A-done,B-done", result.getResult(String.class)); + } + + @Test + void testMapAsyncWithInterleavedWork() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("x", "y"); + var future = context.mapAsync("async-map", items, String.class, (item, index, ctx) -> { + return ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + }); + + // Do other work while map runs + var other = context.step("other-work", String.class, stepCtx -> "OTHER"); + + // Now collect map results + var mapResult = future.get(); + assertTrue(mapResult.allSucceeded()); + + return other + ":" + String.join(",", mapResult.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("OTHER:X,Y", result.getResult(String.class)); + } + + @Test + void testMapUnlimitedConcurrencyWithToleratedFailureCount() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok1", "FAIL1", "ok2", "FAIL2", "ok3"); + var config = MapConfig.builder() + .completionConfig(CompletionConfig.toleratedFailureCount(1)) + .build(); + var result = context.map( + "unlimited-tolerated", + items, + String.class, + (item, index, ctx) -> { + if (item.startsWith("FAIL")) { + throw new RuntimeException("failed: " + item); + } + return item.toUpperCase(); + }, + config); + + assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason()); + assertFalse(result.allSucceeded()); + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapReplayWithFailedBranches() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok", "FAIL", "ok2"); + var result = context.map("replay-fail-map", items, String.class, (item, index, ctx) -> { + executionCount.incrementAndGet(); + if ("FAIL".equals(item)) { + throw new RuntimeException("item failed"); + } + return item.toUpperCase(); + }); + + // Errors survive replay since they are stored as ErrorObject (not raw Throwable) + assertEquals("OK", result.getResult(0)); + assertEquals("OK2", result.getResult(2)); + return "done"; + }); + + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + var firstRunCount = executionCount.get(); + + // Replay — functions should not re-execute + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay"); + } + + @Test + void testMapWithSingleItem() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("only"); + var result = context.map("single-item", items, String.class, (item, index, ctx) -> { + return ctx.step("process", String.class, stepCtx -> item.toUpperCase()); + }); + + assertTrue(result.allSucceeded()); + assertEquals(1, result.size()); + assertEquals("ONLY", result.getResult(0)); + assertEquals(0, result.failed().size()); + return result.getResult(0); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("ONLY", result.getResult(String.class)); + } + + @Test + void testStepBeforeAndAfterMap() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var before = context.step("before", String.class, stepCtx -> "BEFORE"); + + var items = List.of("a", "b"); + var mapResult = context.map("middle-map", items, String.class, (item, index, ctx) -> item.toUpperCase()); + + var after = context.step("after", String.class, stepCtx -> "AFTER"); + + return before + ":" + String.join(",", mapResult.results()) + ":" + after; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("BEFORE:A,B:AFTER", result.getResult(String.class)); + } + + @Test + void testSequentialMaps() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var result1 = + context.map("map-1", List.of("a", "b"), String.class, (item, index, ctx) -> item.toUpperCase()); + var result2 = context.map("map-2", List.of("x", "y"), String.class, (item, index, ctx) -> item + "!"); + + return String.join(",", result1.results()) + "|" + String.join(",", result2.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A,B|x!,y!", result.getResult(String.class)); + } + + @Test + void testMapWithAllSuccessfulCompletionConfig_stopsOnFirstFailure() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok1", "FAIL", "ok2", "ok3"); + var config = MapConfig.builder() + .maxConcurrency(1) + .completionConfig(CompletionConfig.allSuccessful()) + .build(); + var result = context.map( + "all-successful", + items, + String.class, + (item, index, ctx) -> { + if (item.startsWith("FAIL")) { + throw new RuntimeException("failed"); + } + return item.toUpperCase(); + }, + config); + + assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason()); + assertEquals("OK1", result.getResult(0)); + assertNotNull(result.getError(1)); + // Items after the failure should be NOT_STARTED + assertEquals(MapResultItem.Status.NOT_STARTED, result.getItem(2).status()); + assertEquals(MapResultItem.Status.NOT_STARTED, result.getItem(3).status()); + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapWithWaitInsideBranches_replay() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b"); + var result = context.map("wait-replay-map", items, String.class, (item, index, ctx) -> { + executionCount.incrementAndGet(); + var stepped = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + ctx.wait("pause-" + index, Duration.ofSeconds(1)); + return stepped + "-done"; + }); + + assertTrue(result.allSucceeded()); + return String.join(",", result.results()); + }); + + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + assertEquals("A-done,B-done", result1.getResult(String.class)); + var firstRunCount = executionCount.get(); + + // Replay — should use cached results, not re-execute + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("A-done,B-done", result2.getResult(String.class)); + assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay"); + } + + @Test + void testNestedMap_replay() { + var executionCount = new AtomicInteger(0); + + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var outerItems = List.of("g1", "g2"); + var outerResult = context.map("outer", outerItems, String.class, (group, outerIdx, outerCtx) -> { + var innerItems = List.of(group + "-a", group + "-b"); + var innerResult = + outerCtx.map("inner-" + outerIdx, innerItems, String.class, (item, innerIdx, innerCtx) -> { + executionCount.incrementAndGet(); + return item.toUpperCase(); + }); + return String.join("+", innerResult.results()); + }); + + return String.join("|", outerResult.results()); + }); + + var result1 = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus()); + assertEquals("G1-A+G1-B|G2-A+G2-B", result1.getResult(String.class)); + var firstRunCount = executionCount.get(); + + var result2 = runner.run("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + assertEquals("G1-A+G1-B|G2-A+G2-B", result2.getResult(String.class)); + assertEquals(firstRunCount, executionCount.get(), "Nested map should not re-execute on replay"); + } + + @Test + void testMapWithToleratedFailurePercentage() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("ok1", "FAIL1", "ok2", "FAIL2", "ok3", "FAIL3", "ok4"); + var config = MapConfig.builder() + .completionConfig(CompletionConfig.toleratedFailurePercentage(0.3)) + .build(); + var result = context.map( + "pct-fail", + items, + String.class, + (item, index, ctx) -> { + if (item.startsWith("FAIL")) { + throw new RuntimeException("failed: " + item); + } + return item.toUpperCase(); + }, + config); + + assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason()); + return "done"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapAsyncWithWaitInsideBranches() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b"); + var future = context.mapAsync("async-wait-map", items, String.class, (item, index, ctx) -> { + var stepped = ctx.step("process-" + index, String.class, stepCtx -> item.toUpperCase()); + ctx.wait("pause-" + index, Duration.ofSeconds(1)); + return stepped + "-done"; + }); + + var other = context.step("other", String.class, stepCtx -> "OTHER"); + var mapResult = future.get(); + assertTrue(mapResult.allSucceeded()); + + return other + ":" + String.join(",", mapResult.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("OTHER:A-done,B-done", result.getResult(String.class)); + } + + @Test + void testMapWithCustomSerDes() { + var customSerDes = new software.amazon.lambda.durable.serde.JacksonSerDes(); + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b"); + var config = MapConfig.builder().serDes(customSerDes).build(); + var result = context.map( + "custom-serdes-map", items, String.class, (item, index, ctx) -> item.toUpperCase(), config); + + assertTrue(result.allSucceeded()); + return String.join(",", result.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A,B", result.getResult(String.class)); + } + + @Test + void testMapWithGenericResultType() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a,b", "c,d"); + var result = context.map("generic-map", items, new TypeToken>() {}, (item, index, ctx) -> { + return ctx.step( + "split-" + index, new TypeToken>() {}, stepCtx -> List.of(item.split(","))); + }); + + assertTrue(result.allSucceeded()); + assertEquals(List.of("a", "b"), result.getResult(0)); + assertEquals(List.of("c", "d"), result.getResult(1)); + return "ok"; + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + } + + @Test + void testMapInsideParallelBranch() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + try (var parallel = + context.parallel("outer-parallel", ParallelConfig.builder().build())) { + var future1 = parallel.branch("branch-a", String.class, branchCtx -> { + var mapResult = branchCtx.map( + "map-in-branch-a", + List.of("x", "y"), + String.class, + (item, index, ctx) -> item.toUpperCase()); + return String.join(",", mapResult.results()); + }); + var future2 = parallel.branch("branch-b", String.class, branchCtx -> { + return branchCtx.step("simple-step", String.class, stepCtx -> "DONE"); + }); + parallel.join(); + return future1.get() + "|" + future2.get(); + } + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("X,Y|DONE", result.getResult(String.class)); + } + + @Test + void testParallelInsideMapBranch() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("group1", "group2"); + var result = context.map("map-with-parallel", items, String.class, (item, index, ctx) -> { + try (var parallel = ctx.parallel( + "parallel-" + index, ParallelConfig.builder().build())) { + var f1 = parallel.branch("sub-a-" + index, String.class, bCtx -> { + return bCtx.step("step-a-" + index, String.class, stepCtx -> item + "-A"); + }); + var f2 = parallel.branch("sub-b-" + index, String.class, bCtx -> { + return bCtx.step("step-b-" + index, String.class, stepCtx -> item + "-B"); + }); + parallel.join(); + return f1.get() + "+" + f2.get(); + } + }); + + assertTrue(result.allSucceeded()); + return String.join("|", result.results()); + }); + + var result = runner.runUntilComplete("test"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("group1-A+group1-B|group2-A+group2-B", result.getResult(String.class)); + } + + @Test + void testMapWithWaitInsideBranches_maxConcurrency1() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var items = List.of("a", "b"); + var config = MapConfig.builder().maxConcurrency(1).build(); + var result = context.map( + "seq-wait-map", + items, + String.class, + (item, index, ctx) -> { + var stepped = ctx.step("step-" + index, String.class, stepCtx -> item.toUpperCase()); + ctx.wait("pause-" + index, Duration.ofSeconds(1)); + return stepped + "-done"; + }, + config); + + assertTrue(result.allSucceeded()); + assertEquals(2, result.size()); + assertEquals("A-done", result.getResult(0)); + assertEquals("B-done", result.getResult(1)); + return String.join(",", result.results()); + }); + + // With maxConcurrency=1, each invocation processes one branch's wait. + // Use explicit run() + advanceTime() loop due to a known thread coordination race + // (same as ChildContextIntegrationTest.twoAsyncChildContextsBothWaitSuspendAndResume). + for (int i = 0; i < 10; i++) { + var result = runner.run("test"); + if (result.getStatus() != ExecutionStatus.PENDING) { + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("A-done,B-done", result.getResult(String.class)); + return; + } + runner.advanceTime(); + } + fail("Expected SUCCEEDED within 10 invocations"); + } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java index df8de8f9..fc52cd1a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/CompletionConfig.java @@ -36,16 +36,26 @@ public static CompletionConfig firstSuccessful() { /** Complete when the specified number of items have succeeded. */ public static CompletionConfig minSuccessful(int count) { + if (count < 1) { + throw new IllegalArgumentException("minSuccessful must be at least 1, got: " + count); + } return new CompletionConfig(count, null, null); } /** Complete when more than the specified number of failures have occurred. */ public static CompletionConfig toleratedFailureCount(int count) { + if (count < 0) { + throw new IllegalArgumentException("toleratedFailureCount must be non-negative, got: " + count); + } return new CompletionConfig(null, count, null); } /** Complete when the failure percentage exceeds the specified threshold (0.0 to 1.0). */ public static CompletionConfig toleratedFailurePercentage(double percentage) { + if (percentage < 0.0 || percentage > 1.0) { + throw new IllegalArgumentException( + "toleratedFailurePercentage must be between 0.0 and 1.0, got: " + percentage); + } return new CompletionConfig(null, null, percentage); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java index fa0e8b39..d4f6b583 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/MapConfig.java @@ -71,6 +71,9 @@ public Builder serDes(SerDes serDes) { } public MapConfig build() { + if (maxConcurrency != null && maxConcurrency < 1) { + throw new IllegalArgumentException("maxConcurrency must be at least 1, got: " + maxConcurrency); + } return new MapConfig(this); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/MapFunction.java b/sdk/src/main/java/software/amazon/lambda/durable/MapFunction.java index a349ac9e..041dccfc 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/MapFunction.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/MapFunction.java @@ -22,7 +22,6 @@ public interface MapFunction { * @param index the zero-based index of the item in the input collection * @param context the durable context for this item's execution * @return the result of processing the item - * @throws Exception if the function fails */ - O apply(I item, int index, DurableContext context) throws Exception; + O apply(I item, int index, DurableContext context); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java b/sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java index 1d735d8a..f56d0430 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/MapResult.java @@ -4,6 +4,7 @@ import java.util.Collections; import java.util.List; +import software.amazon.awssdk.services.lambda.model.ErrorObject; /** * Result container for map operations. @@ -12,24 +13,19 @@ * item is represented as a {@link MapResultItem} containing its status, result, and error. Includes the * {@link CompletionReason} indicating why the operation completed. * - *

When serialized for checkpointing, only status and result fields of each item are included. Error fields are - * transient because Throwable objects are not reliably serializable. On replay from a small-result checkpoint, errors - * will be null; on replay from a large-result checkpoint (replayChildren), errors are reconstructed from individual - * child context checkpoints. + *

Errors are stored as {@link ErrorObject} rather than raw Throwable, so they survive serialization across + * checkpoint-and-replay cycles. * + * @param items ordered result items from the map operation + * @param completionReason why the operation completed * @param the result type of each item */ -public class MapResult { +public record MapResult(List> items, CompletionReason completionReason) { - private List> items; - private CompletionReason completionReason; - - /** Default constructor for deserialization. */ - public MapResult() {} - - public MapResult(List> items, CompletionReason completionReason) { - this.items = items != null ? List.copyOf(items) : Collections.emptyList(); - this.completionReason = completionReason != null ? completionReason : CompletionReason.ALL_COMPLETED; + /** Compact constructor that applies defensive copy and defaults. */ + public MapResult { + items = items != null ? List.copyOf(items) : Collections.emptyList(); + completionReason = completionReason != null ? completionReason : CompletionReason.ALL_COMPLETED; } /** Returns an empty MapResult with no items. */ @@ -48,23 +44,13 @@ public T getResult(int index) { } /** Returns the error at the given index, or null if that item succeeded or was not started. */ - public Throwable getError(int index) { + public ErrorObject getError(int index) { return items.get(index).error(); } - /** Returns true if all items succeeded (no errors). */ + /** Returns true if all items succeeded (no failures or not-started items). */ public boolean allSucceeded() { - return items.stream().noneMatch(item -> item.error() != null); - } - - /** Returns the reason the operation completed. */ - public CompletionReason getCompletionReason() { - return completionReason; - } - - /** Returns all result items as an unmodifiable list. */ - public List> getItems() { - return items; + return items.stream().allMatch(item -> item.status() == MapResultItem.Status.SUCCEEDED); } /** Returns the number of items in this result. */ @@ -72,31 +58,25 @@ public int size() { return items.size(); } - // Convenience accessors matching the original API style - - /** Returns the reason the operation completed. */ - public CompletionReason completionReason() { - return completionReason; - } - - /** Returns all result items as an unmodifiable list. */ - public List> items() { - return items; - } - /** Returns all results as an unmodifiable list (nulls for failed/not-started items). */ public List results() { return Collections.unmodifiableList( items.stream().map(MapResultItem::result).toList()); } - /** Returns results that succeeded (non-null results). */ + /** Returns results from items that succeeded (includes null results from successful items). */ public List succeeded() { - return items.stream().map(MapResultItem::result).filter(r -> r != null).toList(); + return items.stream() + .filter(item -> item.status() == MapResultItem.Status.SUCCEEDED) + .map(MapResultItem::result) + .toList(); } - /** Returns errors that occurred (non-null errors). */ - public List failed() { - return items.stream().map(MapResultItem::error).filter(e -> e != null).toList(); + /** Returns errors from items that failed. */ + public List failed() { + return items.stream() + .filter(item -> item.status() == MapResultItem.Status.FAILED) + .map(MapResultItem::error) + .toList(); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/MapResultItem.java b/sdk/src/main/java/software/amazon/lambda/durable/model/MapResultItem.java index 1ae3b165..6d6dd016 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/MapResultItem.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/MapResultItem.java @@ -2,37 +2,29 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.model; +import software.amazon.awssdk.services.lambda.model.ErrorObject; + /** * Represents the outcome of a single item in a map operation. * - *

Each item either succeeds with a result or fails with an error. The status field indicates which case applies. + *

Each item either succeeds with a result, fails with an error, or was never started. The status field indicates + * which case applies. * - *

When serialized for checkpointing, only status and result are included. The error field is transient because - * Throwable objects are not reliably serializable across different serializers. On replay from a small-result - * checkpoint, errors will be null; on replay from a large-result checkpoint (replayChildren), errors are reconstructed - * from individual child context checkpoints. + *

Errors are stored as {@link ErrorObject} (errorType, errorMessage, stackTrace) rather than raw Throwable, so they + * survive serialization across checkpoint-and-replay cycles. * + * @param status the status of this item + * @param result the result value, or null if failed/not started + * @param error the error details, or null if succeeded/not started * @param the result type */ -public class MapResultItem { +public record MapResultItem(Status status, T result, ErrorObject error) { /** Status of an individual map item. */ public enum Status { SUCCEEDED, - FAILED - } - - private Status status; - private T result; - private transient Throwable error; - - /** Default constructor for deserialization. */ - public MapResultItem() {} - - private MapResultItem(Status status, T result, Throwable error) { - this.status = status; - this.result = result; - this.error = error; + FAILED, + NOT_STARTED } /** Creates a successful result item. */ @@ -41,44 +33,12 @@ public static MapResultItem success(T result) { } /** Creates a failed result item. */ - public static MapResultItem failure(Throwable error) { + public static MapResultItem failure(ErrorObject error) { return new MapResultItem<>(Status.FAILED, null, error); } - /** Creates an empty (not started) result item. */ + /** Creates a not-started result item. */ public static MapResultItem notStarted() { - return new MapResultItem<>(null, null, null); - } - - /** Returns the status of this item, or null if the item was never started. */ - public Status getStatus() { - return status; - } - - /** Returns the result, or null if the item failed or was not started. */ - public T getResult() { - return result; - } - - /** Returns the error, or null if the item succeeded or was not started. */ - public Throwable getError() { - return error; - } - - // Convenience accessors matching the original API style - - /** Returns the status of this item, or null if the item was never started. */ - public Status status() { - return status; - } - - /** Returns the result, or null if the item failed or was not started. */ - public T result() { - return result; - } - - /** Returns the error, or null if the item succeeded or was not started. */ - public Throwable error() { - return error; + return new MapResultItem<>(Status.NOT_STARTED, null, null); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java index a7abbfb6..37b5322c 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseConcurrentOperation.java @@ -5,12 +5,12 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +24,9 @@ import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.context.DurableContextImpl; +import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.OperationIdGenerator; +import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.model.CompletionReason; import software.amazon.lambda.durable.model.OperationIdentifier; import software.amazon.lambda.durable.model.OperationSubType; @@ -46,17 +48,21 @@ public abstract class BaseConcurrentOperation extends BaseDurableOperation private static final Logger logger = LoggerFactory.getLogger(BaseConcurrentOperation.class); private static final int LARGE_RESULT_THRESHOLD = 256 * 1024; + // All mutable state below is guarded by `lock`. + protected final Object lock = new Object(); private final List> branches = new ArrayList<>(); - private final Queue> pendingQueue = new ConcurrentLinkedQueue<>(); + private final Queue> pendingQueue = new LinkedList<>(); private final Set> startedBranches = ConcurrentHashMap.newKeySet(); - private final AtomicInteger activeBranches = new AtomicInteger(0); - private final AtomicInteger succeeded = new AtomicInteger(0); - private final AtomicInteger failed = new AtomicInteger(0); + private int activeBranches; + private int succeeded; + private int failed; + private CompletionReason completionReason; + private boolean earlyTermination; + private final Set completedBranchIds = new HashSet<>(); + private final Integer maxConcurrency; private final CompletionConfig completionConfig; private final OperationSubType subType; - private volatile CompletionReason completionReason; - private volatile boolean earlyTermination = false; private DurableContextImpl rootContext; private OperationIdGenerator operationIdGenerator; @@ -85,9 +91,12 @@ protected BaseConcurrentOperation( protected void start() { sendOperationUpdateAsync( OperationUpdate.builder().action(OperationAction.START).subType(subType.getValue())); + var parentThreadContext = getCurrentThreadContext(); this.rootContext = getContext().createChildContext(getOperationId(), getName()); this.operationIdGenerator = new OperationIdGenerator(getOperationId()); startBranches(); + processAlreadyCompletedBranches(); + getContext().getExecutionManager().setCurrentThreadContext(parentThreadContext); } @Override @@ -96,20 +105,23 @@ protected void replay(Operation existing) { case SUCCEEDED -> { if (existing.contextDetails() != null && Boolean.TRUE.equals(existing.contextDetails().replayChildren())) { - // Large result: reconstruct by replaying child contexts + var parentThreadContext = getCurrentThreadContext(); this.rootContext = getContext().createChildContext(getOperationId(), getName()); this.operationIdGenerator = new OperationIdGenerator(getOperationId()); startBranches(); + processAlreadyCompletedBranches(); + getContext().getExecutionManager().setCurrentThreadContext(parentThreadContext); } else { markAlreadyCompleted(); } } - case FAILED -> markAlreadyCompleted(); case STARTED -> { - // Interrupted mid-execution: resume from last checkpoint + var parentThreadContext = getCurrentThreadContext(); this.rootContext = getContext().createChildContext(getOperationId(), getName()); this.operationIdGenerator = new OperationIdGenerator(getOperationId()); startBranches(); + processAlreadyCompletedBranches(); + getContext().getExecutionManager().setCurrentThreadContext(parentThreadContext); } default -> terminateExecutionWithIllegalDurableOperationException( @@ -123,6 +135,31 @@ protected void replay(Operation existing) { protected abstract R aggregateResults(); + // ========== post-startBranches processing ========== + + /** + * Processes branches that completed synchronously during {@code startBranches()} via {@code markAlreadyCompleted()} + * (replay of SUCCEEDED/FAILED branches). These branches don't go through {@code executeChildContext()}, so their + * {@code onCompleteCallback} doesn't fire. We process them here, after all branches have been created, to avoid + * re-entrant issues during {@code startBranches()}. + */ + private void processAlreadyCompletedBranches() { + // Take a snapshot of branches that are already completed + List> alreadyCompleted; + synchronized (lock) { + alreadyCompleted = + branches.stream().filter(b -> b.completionFuture.isDone()).toList(); + } + + for (var branch : alreadyCompleted) { + var op = branch.getOperation(); + if (op != null) { + boolean success = op.status() == OperationStatus.SUCCEEDED; + onChildContextComplete(branch, success); + } + } + } + // ========== branch creation ========== protected ChildContextOperation branchInternal( @@ -138,25 +175,36 @@ protected ChildContextOperation branchInternal( typeToken, serDes, rootContext); - branches.add(branch); - - // Attach callback BEFORE execution starts (or before future can complete). - // The thenRun runs synchronously inside the synchronized(completionFuture) block - // when completionFuture.complete(null) is called, so it executes on the checkpoint - // processing thread. This callback only does lightweight work: update counters, - // evaluate CompletionConfig, dequeue and start next branch. - branch.completionFuture.thenRun(() -> { + + // Only use onCompleteCallback (finally block of executeChildContext) for branches that + // actually execute. For branches that replay as SUCCEEDED/FAILED via markAlreadyCompleted(), + // we process them after startBranches() completes — see processCompletedBranches(). + branch.setOnCompleteCallback(() -> { var op = branch.getOperation(); - boolean success = op != null && op.status() == OperationStatus.SUCCEEDED; - onChildContextComplete(branch, success); + if (op != null && ExecutionManager.isTerminalStatus(op.status())) { + boolean success = op.status() == OperationStatus.SUCCEEDED; + onChildContextComplete(branch, success); + } else { + // Branch suspended (op is null or STARTED) — decrement active count. + onChildContextSuspended(); + } }); - if (!earlyTermination && (maxConcurrency == null || activeBranches.get() < maxConcurrency)) { - activeBranches.incrementAndGet(); + boolean shouldStart; + synchronized (lock) { + branches.add(branch); + if (!earlyTermination && (maxConcurrency == null || activeBranches < maxConcurrency)) { + activeBranches++; + shouldStart = true; + } else { + pendingQueue.add(branch); + shouldStart = false; + } + } + + if (shouldStart) { startedBranches.add(branch); branch.execute(); - } else { - pendingQueue.add(branch); } return branch; } @@ -164,82 +212,135 @@ protected ChildContextOperation branchInternal( // ========== completion callback ========== /** - * Called on the checkpoint processing thread when a branch's completionFuture completes. Only does lightweight - * work: update counters, evaluate CompletionConfig, dequeue and start next branch. Does NOT call - * finalizeOperation() or checkpointResult() — those happen in get() on the context thread. + * Called on the checkpoint processing thread when a branch's completionFuture completes. Updates counters, + * evaluates completion criteria, and either starts the next queued branch or finalizes the operation. + * + *

When all branches have completed (or early termination criteria are met with no pending/active branches), this + * method aggregates results, checkpoints the parent operation, and completes the parent's {@code completionFuture} + * — allowing {@code get()} to unblock via {@code waitForOperationCompletion()}. + * + *

All mutable state updates are synchronized via {@code lock} to prevent races with {@link #branchInternal} + * (which runs on the parent context thread) and with other concurrent completion callbacks. */ protected void onChildContextComplete(ChildContextOperation branch, boolean success) { - if (success) { - succeeded.incrementAndGet(); - } else { - failed.incrementAndGet(); - } + ChildContextOperation nextToStart = null; + boolean shouldFinalize = false; + + synchronized (lock) { + // Idempotency: both onCompleteCallback (finally block) and completionFuture.thenRun + // may fire for the same branch. Skip if already counted. + if (!completedBranchIds.add(branch.getOperationId())) { + return; + } - // Evaluate completion criteria - if (!earlyTermination && shouldTerminateEarly()) { - earlyTermination = true; - completionReason = evaluateCompletionReason(); - logger.trace("Early termination triggered for operation {}: reason={}", getOperationId(), completionReason); - } + if (success) { + succeeded++; + } else { + failed++; + } + + // Evaluate completion criteria + if (!earlyTermination && shouldTerminateEarly()) { + earlyTermination = true; + completionReason = evaluateCompletionReason(); + logger.trace( + "Early termination triggered for operation {}: reason={}", getOperationId(), completionReason); + } - // Start next queued branch with correct thread ordering: - // register new branch thread BEFORE deregistering completed branch thread - if (!earlyTermination) { - var next = pendingQueue.poll(); - if (next != null) { - // activeBranches stays the same (one completing, one starting) - startedBranches.add(next); - next.execute(); // registers new thread internally via ChildContextOperation.start() + // Start next queued branch with correct thread ordering: + // register new branch thread BEFORE deregistering completed branch thread + if (!earlyTermination) { + nextToStart = pendingQueue.poll(); + if (nextToStart == null) { + activeBranches--; + } + // else activeBranches stays the same (one completing, one starting) } else { - activeBranches.decrementAndGet(); + activeBranches--; } - } else { - activeBranches.decrementAndGet(); + + // Check if all work is done: no active branches and either no pending branches + // or early termination (pending branches won't be started) + if (activeBranches == 0 && (pendingQueue.isEmpty() || earlyTermination)) { + shouldFinalize = true; + if (completionReason == null) { + completionReason = CompletionReason.ALL_COMPLETED; + } + } + } + + // Execute outside the lock — branch.execute() submits to the executor and may trigger + // further callbacks; holding the lock here would risk deadlock. + if (nextToStart != null) { + startedBranches.add(nextToStart); + nextToStart.execute(); + } + + // Finalize outside the lock — checkpointing is blocking I/O. + if (shouldFinalize) { + R result = aggregateResults(); + checkpointResult(result); } // completed branch's thread is deregistered by ChildContextOperation's close() in BaseContext } + /** + * Called when a branch suspends (e.g., due to a wait() operation). Decrements the active branch count so that the + * finalization check in {@link #onChildContextComplete} can detect when all branches are either completed or + * suspended. Does NOT start the next queued branch — suspended branches will resume on re-invocation. + */ + private void onChildContextSuspended() { + boolean allSuspendedOrComplete; + synchronized (lock) { + activeBranches--; + // Don't start next branch — this branch suspended, it will resume on re-invocation. + // Check if all branches are now either completed or suspended with no pending work. + allSuspendedOrComplete = activeBranches == 0 && (pendingQueue.isEmpty() || earlyTermination); + } + + if (allSuspendedOrComplete) { + // All branches have either completed or suspended. Complete the parent's completionFuture + // so the parent thread in get() is unblocked. The parent will see the operation is not + // SUCCEEDED (it's still STARTED) and propagate the suspension. + synchronized (completionFuture) { + completionFuture.complete(null); + } + } + } + // ========== completion evaluation ========== + /** Must be called while holding {@code lock}. */ private boolean shouldTerminateEarly() { // Check minSuccessful - if (completionConfig.minSuccessful() != null && succeeded.get() >= completionConfig.minSuccessful()) { + if (completionConfig.minSuccessful() != null && succeeded >= completionConfig.minSuccessful()) { return true; } // Check toleratedFailureCount - if (completionConfig.toleratedFailureCount() != null - && failed.get() > completionConfig.toleratedFailureCount()) { + if (completionConfig.toleratedFailureCount() != null && failed > completionConfig.toleratedFailureCount()) { return true; } // Check toleratedFailurePercentage - int totalCompleted = succeeded.get() + failed.get(); + int totalCompleted = succeeded + failed; if (completionConfig.toleratedFailurePercentage() != null && totalCompleted > 0 - && ((double) failed.get() / totalCompleted) > completionConfig.toleratedFailurePercentage()) { + && ((double) failed / totalCompleted) > completionConfig.toleratedFailurePercentage()) { return true; } return false; } + /** Must be called while holding {@code lock}. */ protected CompletionReason evaluateCompletionReason() { - if (completionConfig.minSuccessful() != null && succeeded.get() >= completionConfig.minSuccessful()) { + if (completionConfig.minSuccessful() != null && succeeded >= completionConfig.minSuccessful()) { return CompletionReason.MIN_SUCCESSFUL_REACHED; } return CompletionReason.FAILURE_TOLERANCE_EXCEEDED; } - private void finalizeOperation() { - if (completionReason == null) { - completionReason = CompletionReason.ALL_COMPLETED; - } - - R result = aggregateResults(); - checkpointResult(result); - } - /** * Checkpoints the parent concurrent operation as SUCCEEDED. Uses synchronous {@code sendOperationUpdate} because * this is called from the context thread in {@code get()}, where it is safe to block. @@ -271,25 +372,35 @@ protected void checkpointResult(R result) { @Override public R get() { - var op = waitForOperationCompletion(); + var executionManager = getContext().getExecutionManager(); + var threadContext = getCurrentThreadContext(); + + synchronized (completionFuture) { + if (!isOperationCompleted()) { + completionFuture.thenRun(() -> registerActiveThread(threadContext.threadId())); + executionManager.deregisterActiveThread(threadContext.threadId()); + } + } + + // Block until operation completes or all branches suspend. + // When all branches suspend, onChildContextSuspended completes completionFuture directly, + // so the parent thread is freed without racing against the global executionExceptionFuture. + completionFuture.join(); + + var op = getOperation(); if (op.status() == OperationStatus.SUCCEEDED) { if (op.contextDetails() != null && Boolean.TRUE.equals(op.contextDetails().replayChildren())) { - // Large result was reconstructed via replay — aggregate from branches return aggregateResults(); } var contextDetails = op.contextDetails(); var result = (contextDetails != null) ? contextDetails.result() : null; return deserializeResult(result); - } else if (op.status() == OperationStatus.FAILED) { - var contextDetails = op.contextDetails(); - var errorObject = (contextDetails != null) ? contextDetails.error() : null; - var original = deserializeException(errorObject); - if (original != null) { - throw new RuntimeException(original); - } - throw new RuntimeException("Concurrent operation failed: " + getOperationId()); + } else if (op.status() == OperationStatus.STARTED) { + // All branches suspended (e.g., wait inside map branches) — propagate suspension. + // onChildContextSuspended completed completionFuture when activeBranches hit 0. + throw new SuspendExecutionException(); } else { return terminateExecutionWithIllegalDurableOperationException( "Unexpected operation status after completion: " + op.status()); @@ -299,32 +410,44 @@ public R get() { // ========== protected accessors for subclasses ========== protected List> getBranches() { - return Collections.unmodifiableList(branches); + synchronized (lock) { + return Collections.unmodifiableList(new ArrayList<>(branches)); + } } protected CompletionReason getCompletionReason() { - return completionReason; + synchronized (lock) { + return completionReason; + } } - protected AtomicInteger getSucceeded() { - return succeeded; + protected int getSucceeded() { + synchronized (lock) { + return succeeded; + } } - protected AtomicInteger getFailed() { - return failed; + protected int getFailed() { + synchronized (lock) { + return failed; + } } protected boolean isEarlyTermination() { - return earlyTermination; + synchronized (lock) { + return earlyTermination; + } } protected DurableContext getRootContext() { return rootContext; } - /** Returns the pending queue of branches that have not yet been started. */ + /** Returns a snapshot of the pending queue of branches that have not yet been started. */ protected Queue> getPendingQueue() { - return pendingQueue; + synchronized (lock) { + return new LinkedList<>(pendingQueue); + } } /** Returns the set of branches that have been started (had execute() called). */ diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index d94e9561..8ae43fcb 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -48,6 +48,7 @@ public class ChildContextOperation extends BaseDurableOperation { private final Function function; private final ExecutorService userExecutor; private final ConcurrencyOperation parentOperation; + private Runnable onCompleteCallback; private boolean replayChildContext; private T reconstructedResult; @@ -73,6 +74,15 @@ public ChildContextOperation( this.parentOperation = parentOperation; } + /** + * Sets a callback that fires in the finally block of {@code executeChildContext()}, after the branch completes, + * fails, or suspends. Used by {@link BaseConcurrentOperation} to track branch completion without relying on + * {@code completionFuture} (which doesn't fire on suspension). + */ + public void setOnCompleteCallback(Runnable callback) { + this.onCompleteCallback = callback; + } + /** Starts the operation. */ @Override protected void start() { @@ -146,6 +156,9 @@ private void executeChildContext() { if (parentOperation != null) { parentOperation.onItemComplete(this); } + if (onCompleteCallback != null) { + onCompleteCallback.run(); + } } } }; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java index e0f5e476..39c6735e 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java @@ -5,7 +5,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import software.amazon.awssdk.services.lambda.model.OperationStatus; import software.amazon.lambda.durable.MapConfig; import software.amazon.lambda.durable.MapFunction; import software.amazon.lambda.durable.TypeToken; @@ -15,6 +14,7 @@ import software.amazon.lambda.durable.model.MapResultItem; import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.SerDes; +import software.amazon.lambda.durable.util.ExceptionHelper; /** * Executes a map operation: applies a function to each item in a collection concurrently, with each item running in its @@ -59,59 +59,18 @@ protected void startBranches() { var index = i; var item = items.get(i); branchInternal("map-iteration-" + i, OperationSubType.MAP_ITERATION, itemResultType, serDes, childCtx -> { - try { - return function.apply(item, index, childCtx); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } + return function.apply(item, index, childCtx); }); } } /** - * Waits for all branches to complete and aggregates results, then checkpoints the parent MAP operation. + * Aggregates results from completed branches into a {@code MapResult}. * - *

Handles three cases: - * - *

    - *
  • Replay with small result (parent SUCCEEDED, no replayChildren): deserialize cached MapResult directly - *
  • Replay with large result (parent SUCCEEDED + replayChildren): aggregate from child replays, no - * re-checkpoint needed - *
  • First execution or STARTED replay: aggregate from branches, then checkpoint parent result - *
+ *

Called from {@link BaseConcurrentOperation#onChildContextComplete} on the checkpoint processing thread after + * all branches have completed. At this point every branch's {@code completionFuture} is already done, so + * {@code branch.get()} returns immediately without blocking. */ - @Override - public MapResult get() { - // Check if parent operation already completed (replay with small result) - if (isOperationCompleted()) { - var op = getOperation(); - if (op != null && op.status() == OperationStatus.SUCCEEDED) { - if (op.contextDetails() != null - && Boolean.TRUE.equals(op.contextDetails().replayChildren())) { - // Large result on replay: aggregate from child replays - return aggregateResults(); - } - // Small result on replay: deserialize cached MapResult - var result = (op.contextDetails() != null) ? op.contextDetails().result() : null; - return deserializeResult(result); - } - } - - // First execution, STARTED replay, or SUCCEEDED+replayChildren replay: aggregate from branches - var mapResult = aggregateResults(); - - // Check if parent is already SUCCEEDED (replayChildren case) — skip re-checkpointing - var existingOp = getOperation(); - if (existingOp == null || existingOp.status() != OperationStatus.SUCCEEDED) { - // First execution or STARTED: checkpoint parent result from context thread (safe to .join() here) - checkpointResult(mapResult); - } - - return mapResult; - } - @Override @SuppressWarnings("unchecked") protected MapResult aggregateResults() { @@ -129,7 +88,7 @@ protected MapResult aggregateResults() { try { resultItems.set(i, MapResultItem.success(branch.get())); } catch (Exception e) { - resultItems.set(i, MapResultItem.failure(e)); + resultItems.set(i, MapResultItem.failure(ExceptionHelper.buildErrorObject(e, serDes))); } } @@ -140,7 +99,7 @@ protected MapResult aggregateResults() { var reason = getCompletionReason(); if (reason == null) { - reason = !pendingQueue.isEmpty() ? evaluateCompletionReason() : CompletionReason.ALL_COMPLETED; + reason = CompletionReason.ALL_COMPLETED; } return new MapResult<>(resultItems, reason); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java b/sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java index 820cfd47..c24ef3a0 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java @@ -38,6 +38,7 @@ public class JacksonSerDes implements SerDes { public JacksonSerDes() { this(new ObjectMapper() .registerModule(new JavaTimeModule()) + .registerModule(new AwsSdkV2Module()) .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java b/sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java index d1443c80..97222a80 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/validation/ParameterValidator.java @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; +import java.util.LinkedHashSet; import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; @@ -118,8 +119,9 @@ public static void validateOperationName(String name, int maxLength) { /** * Validates that a collection has deterministic iteration order. * - *

Rejects known unordered collection types: {@link HashSet} (and subclasses), and views returned by - * {@link HashMap}, {@link IdentityHashMap}, {@link WeakHashMap}, and {@link ConcurrentHashMap}. + *

Rejects known unordered collection types: {@link HashSet} (but not {@link LinkedHashSet}, which has stable + * insertion-order iteration), and views returned by {@link HashMap}, {@link IdentityHashMap}, {@link WeakHashMap}, + * and {@link ConcurrentHashMap}. * * @param items the collection to validate * @throws IllegalArgumentException if items is null or has non-deterministic iteration order @@ -128,6 +130,10 @@ public static void validateOrderedCollection(Collection items) { if (items == null) { throw new IllegalArgumentException("items cannot be null"); } + // LinkedHashSet extends HashSet but has stable insertion-order iteration — allow it + if (items instanceof LinkedHashSet) { + return; + } if (items instanceof HashSet || isUnorderedMapView(items)) { throw new IllegalArgumentException("items must have deterministic iteration order"); } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java index f57ce3ed..710bf9b1 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/CompletionConfigTest.java @@ -61,4 +61,48 @@ void toleratedFailurePercentage_setsPercentage() { assertNull(config.toleratedFailureCount()); assertEquals(0.25, config.toleratedFailurePercentage()); } + + @Test + void minSuccessful_withZero_shouldThrow() { + var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.minSuccessful(0)); + assertEquals("minSuccessful must be at least 1, got: 0", exception.getMessage()); + } + + @Test + void minSuccessful_withNegative_shouldThrow() { + var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.minSuccessful(-1)); + assertEquals("minSuccessful must be at least 1, got: -1", exception.getMessage()); + } + + @Test + void toleratedFailureCount_withNegative_shouldThrow() { + var exception = assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailureCount(-1)); + assertEquals("toleratedFailureCount must be non-negative, got: -1", exception.getMessage()); + } + + @Test + void toleratedFailurePercentage_withNegative_shouldThrow() { + var exception = + assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailurePercentage(-0.1)); + assertEquals("toleratedFailurePercentage must be between 0.0 and 1.0, got: -0.1", exception.getMessage()); + } + + @Test + void toleratedFailurePercentage_aboveOne_shouldThrow() { + var exception = + assertThrows(IllegalArgumentException.class, () -> CompletionConfig.toleratedFailurePercentage(1.5)); + assertEquals("toleratedFailurePercentage must be between 0.0 and 1.0, got: 1.5", exception.getMessage()); + } + + @Test + void toleratedFailurePercentage_atBoundaries_shouldPass() { + assertDoesNotThrow(() -> CompletionConfig.toleratedFailurePercentage(0.0)); + assertDoesNotThrow(() -> CompletionConfig.toleratedFailurePercentage(1.0)); + } + + @Test + void toleratedFailureCount_withZero_shouldPass() { + var config = CompletionConfig.toleratedFailureCount(0); + assertEquals(0, config.toleratedFailureCount()); + } } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java index c2a50759..11c567d8 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/MapConfigTest.java @@ -101,4 +101,26 @@ void toBuilder_canOverrideValues() { assertEquals(10, modified.maxConcurrency()); assertEquals(4, original.maxConcurrency()); } + + @Test + void builderWithZeroMaxConcurrency_shouldThrow() { + var exception = assertThrows( + IllegalArgumentException.class, + () -> MapConfig.builder().maxConcurrency(0).build()); + assertEquals("maxConcurrency must be at least 1, got: 0", exception.getMessage()); + } + + @Test + void builderWithNegativeMaxConcurrency_shouldThrow() { + var exception = assertThrows( + IllegalArgumentException.class, + () -> MapConfig.builder().maxConcurrency(-1).build()); + assertEquals("maxConcurrency must be at least 1, got: -1", exception.getMessage()); + } + + @Test + void builderWithNullMaxConcurrency_shouldPass() { + var config = MapConfig.builder().maxConcurrency(null).build(); + assertNull(config.maxConcurrency()); + } } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java b/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java index ee87d409..9bc98c1a 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/MapFunctionTest.java @@ -14,7 +14,7 @@ void isFunctionalInterface() { } @Test - void canBeUsedAsLambda() throws Exception { + void canBeUsedAsLambda() { MapFunction fn = (item, index, ctx) -> item.toUpperCase(); var result = fn.apply("hello", 0, null); @@ -23,23 +23,13 @@ void canBeUsedAsLambda() throws Exception { } @Test - void receivesCorrectIndex() throws Exception { + void receivesCorrectIndex() { MapFunction fn = (item, index, ctx) -> index; assertEquals(0, fn.apply("a", 0, null)); assertEquals(5, fn.apply("b", 5, null)); } - @Test - void canThrowCheckedException() { - MapFunction fn = (item, index, ctx) -> { - throw new Exception("checked"); - }; - - var ex = assertThrows(Exception.class, () -> fn.apply("x", 0, null)); - assertEquals("checked", ex.getMessage()); - } - @Test void canThrowRuntimeException() { MapFunction fn = (item, index, ctx) -> { diff --git a/sdk/src/test/java/software/amazon/lambda/durable/model/MapResultTest.java b/sdk/src/test/java/software/amazon/lambda/durable/model/MapResultTest.java index fc797d3e..ee42b824 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/model/MapResultTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/model/MapResultTest.java @@ -6,9 +6,17 @@ import java.util.List; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.ErrorObject; class MapResultTest { + private static ErrorObject testError(String message) { + return ErrorObject.builder() + .errorType("java.lang.RuntimeException") + .errorMessage(message) + .build(); + } + @Test void empty_returnsZeroSizeResult() { var result = MapResult.empty(); @@ -36,7 +44,7 @@ void allSucceeded_trueWhenNoErrors() { @Test void allSucceeded_falseWhenAnyError() { - var error = new RuntimeException("fail"); + var error = testError("fail"); var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.failure(error)), CompletionReason.ALL_COMPLETED); @@ -46,7 +54,7 @@ void allSucceeded_falseWhenAnyError() { @Test void getResult_returnsNullForFailedItem() { - var error = new RuntimeException("fail"); + var error = testError("fail"); var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.failure(error)), CompletionReason.ALL_COMPLETED); @@ -57,7 +65,7 @@ void getResult_returnsNullForFailedItem() { @Test void getError_returnsNullForSucceededItem() { - var error = new RuntimeException("fail"); + var error = testError("fail"); var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.failure(error)), CompletionReason.ALL_COMPLETED); @@ -71,7 +79,7 @@ void succeeded_filtersNullResults() { var result = new MapResult<>( List.of( MapResultItem.success("a"), - MapResultItem.failure(new RuntimeException()), + MapResultItem.failure(testError("fail")), MapResultItem.success("c")), CompletionReason.ALL_COMPLETED); @@ -80,7 +88,7 @@ void succeeded_filtersNullResults() { @Test void failed_filtersNullErrors() { - var error = new RuntimeException("fail"); + var error = testError("fail"); var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.failure(error), MapResultItem.success("c")), CompletionReason.ALL_COMPLETED); @@ -107,7 +115,7 @@ void items_returnsUnmodifiableList() { @Test void getItem_returnsMapResultItem() { var result = new MapResult<>( - List.of(MapResultItem.success("a"), MapResultItem.failure(new RuntimeException("fail"))), + List.of(MapResultItem.success("a"), MapResultItem.failure(testError("fail"))), CompletionReason.ALL_COMPLETED); assertEquals(MapResultItem.Status.SUCCEEDED, result.getItem(0).status()); @@ -120,12 +128,12 @@ void getItem_returnsMapResultItem() { } @Test - void notStartedItems_haveNullStatusResultAndError() { + void notStartedItems_haveNotStartedStatusAndNullResultAndError() { var result = new MapResult<>( List.of(MapResultItem.success("a"), MapResultItem.notStarted()), CompletionReason.MIN_SUCCESSFUL_REACHED); - assertNull(result.getItem(1).status()); + assertEquals(MapResultItem.Status.NOT_STARTED, result.getItem(1).status()); assertNull(result.getResult(1)); assertNull(result.getError(1)); } diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java index 8df56e79..7ae7ac71 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/BaseDurableOperationTest.java @@ -63,6 +63,9 @@ void setUp() { when(durableContext.getExecutionManager()).thenReturn(executionManager); when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext(CONTEXT_ID, ThreadType.CONTEXT)); when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID)).thenReturn(OPERATION); + // Stub runUntilCompleteOrSuspend to pass through the user future — in unit tests there's + // no executionExceptionFuture to race against, so just wait on the completionFuture directly. + when(executionManager.runUntilCompleteOrSuspend(any())).thenAnswer(invocation -> invocation.getArgument(0)); } @Test diff --git a/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java b/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java index 0ee0ee0c..ad281fe9 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/validation/ParameterValidatorTest.java @@ -331,11 +331,9 @@ void validateOrderedCollection_withOrderedSet_shouldPass() { } @Test - void validateOrderedCollection_withLinkedHashSet_shouldThrow() { - // LinkedHashSet extends HashSet, so it's rejected even though it has deterministic order - assertThrows( - IllegalArgumentException.class, - () -> ParameterValidator.validateOrderedCollection(new LinkedHashSet<>(List.of("a", "b")))); + void validateOrderedCollection_withLinkedHashSet_shouldPass() { + // LinkedHashSet extends HashSet but has stable insertion-order iteration — allowed + assertDoesNotThrow(() -> ParameterValidator.validateOrderedCollection(new LinkedHashSet<>(List.of("a", "b")))); } @Test