diff --git a/docs/advanced/configuration.md b/docs/advanced/configuration.md index 57f628c30..abf2dfbde 100644 --- a/docs/advanced/configuration.md +++ b/docs/advanced/configuration.md @@ -8,15 +8,14 @@ public class OrderProcessor extends DurableHandler { @Override protected DurableConfig createConfiguration() { // Custom Lambda client with connection pooling - var lambdaClient = LambdaClient.builder() + var lambdaClientBuilder = LambdaClient.builder() .httpClient(ApacheHttpClient.builder() .maxConnections(50) .connectionTimeout(Duration.ofSeconds(30)) - .build()) - .build(); + .build()); return DurableConfig.builder() - .withLambdaClient(lambdaClient) + .withLambdaClientBuilder(lambdaClientBuilder) .withSerDes(new MyCustomSerDes()) // Custom serialization .withExecutorService(Executors.newFixedThreadPool(10)) // Custom thread pool .withLoggerConfig(LoggerConfig.withReplayLogging()) // Enable replay logs @@ -30,11 +29,11 @@ public class OrderProcessor extends DurableHandler { } ``` -| Option | Description | Default | -|--------|-------------|---------| -| `withLambdaClient()` | Custom AWS Lambda client | Auto-configured Lambda client | -| `withSerDes()` | Serializer for step results | Jackson with default settings | -| `withExecutorService()` | Thread pool for user-defined operations | Cached daemon thread pool | -| `withLoggerConfig()` | Logger behavior configuration | Suppress logs during replay | +| Option | Description | Default | +|-----------------------------|-----------------------------------------|-------------------------------| +| `withLambdaClientBuilder()` | Custom AWS Lambda client | Auto-configured Lambda client | +| `withSerDes()` | Serializer for step results | Jackson with default settings | +| `withExecutorService()` | Thread pool for user-defined operations | Cached daemon thread pool | +| `withLoggerConfig()` | Logger behavior configuration | Suppress logs during replay | The `withExecutorService()` option configures the thread pool used for running user-defined operations. Internal SDK coordination (checkpoint batching, polling) runs on an SDK-managed thread pool. \ No newline at end of file diff --git a/docs/design.md b/docs/design.md index 89f1df16a..d9e445cba 100644 --- a/docs/design.md +++ b/docs/design.md @@ -87,7 +87,7 @@ public class MyHandler extends DurableHandler { @Override protected DurableConfig createConfiguration() { return DurableConfig.builder() - .withLambdaClient(customLambdaClient) + .withLambdaClientBuilder(customLambdaClientBuilder) .withSerDes(new CustomSerDes()) .withExecutorService(Executors.newFixedThreadPool(4)) .build(); @@ -95,12 +95,12 @@ public class MyHandler extends DurableHandler { } ``` -| Option | Default | -|--------|---------| -| `lambdaClient` | Auto-created `LambdaClient` for current region, primed for performance (see [`DurableConfig.java`](../sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java)) | -| `serDes` | `JacksonSerDes` | -| `executorService` | `Executors.newCachedThreadPool()` (for user-defined operations only) | -| `loggerConfig` | `LoggerConfig.defaults()` (suppress replay logs) | +| Option | Default | +|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `lambdaClientBuilder` | Auto-created `LambdaClient` for current region, primed for performance (see [`DurableConfig.java`](../sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java)) | +| `serDes` | `JacksonSerDes` | +| `executorService` | `Executors.newCachedThreadPool()` (for user-defined operations only) | +| `loggerConfig` | `LoggerConfig.defaults()` (suppress replay logs) | ### Thread Pool Architecture @@ -507,7 +507,7 @@ Implementations: - `LambdaDurableFunctionsClient` - Production (wraps AWS SDK) - `LocalMemoryExecutionClient` - Testing (in-memory) -For production customization, use `DurableConfig.builder().withLambdaClient(lambdaClient)`. +For production customization, use `DurableConfig.builder().withLambdaClientBuilder(lambdaClientBuilder)`. For testing, use `DurableConfig.builder().withDurableExecutionClient(localMemoryClient)`. --- @@ -734,12 +734,12 @@ When a context thread calls `ctx.step(...)`, the following coordination occurs: ### Sequence: Wait with Suspension -| Seq | Context Thread | System Thread | -|-----|--------------------------------------------------------------------------------------------------------------------------------|------------------------| -| 1 | Create `WaitOperation` + `completionFuture`. Call `execute()`. `execute()` calls `start()` → checkpoint WAIT with duration → `pollForOperationUpdates(remainingWaitTime)`. | Begin polling backend. | -| 2 | `wait()` calls `get()` → `waitForOperationCompletion()`. Attach `thenRun(re-register)`. Deregister context thread. | (polling) | -| 3 | `activeThreads` is empty → `suspendExecution()` → `executionExceptionFuture.completeExceptionally(SuspendExecutionException)`. | — | -| 4 | `runUntilCompleteOrSuspend` resolves with `SuspendExecutionException` → return `PENDING`. | — | +| Seq | Context Thread | System Thread | +|-----|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------| +| 1 | Create `WaitOperation` + `completionFuture`. Call `execute()`. `execute()` calls `start()` → checkpoint WAIT with duration → `pollForOperationUpdates(remainingWaitTime)`. | Begin polling backend. | +| 2 | `wait()` calls `get()` → `waitForOperationCompletion()`. Attach `thenRun(re-register)`. Deregister context thread. | (polling) | +| 3 | `activeThreads` is empty → `suspendExecution()` → `executionExceptionFuture.completeExceptionally(SuspendExecutionException)`. | — | +| 4 | `runUntilCompleteOrSuspend` resolves with `SuspendExecutionException` → return `PENDING`. | — | On re-invocation, the wait replays. If the scheduled end time has passed, `markAlreadyCompleted()` fires and the context thread continues without deregistering. diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java index d30b6c47b..e97843360 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java @@ -50,13 +50,12 @@ *
{@code
  * @Override
  * protected DurableConfig createConfiguration() {
- *     LambdaClient lambdaClient = LambdaClient.builder()
+ *     LambdaClientBuilder lambdaClientBuilder = LambdaClient.builder()
  *         .region(Region.US_WEST_2)
- *         .credentialsProvider(ProfileCredentialsProvider.create("my-profile"))
- *         .build();
+ *         .credentialsProvider(ProfileCredentialsProvider.create("my-profile"));
  *
  *     return DurableConfig.builder()
- *         .withLambdaClient(lambdaClient)
+ *         .withLambdaClientBuilder(lambdaClientBuilder)
  *         .build();
  * }
  * }
diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java index 453d5f7a1..90179bcdb 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java @@ -42,23 +42,23 @@ public interface DurableContext extends BaseContext { * * @param the result type * @param name the unique operation name within this context - * @param typeToken the type token for deserialization of generic types + * @param resultType the type token for deserialization of generic types * @param func the function to execute, receiving a {@link StepContext} * @return the step result */ - T step(String name, TypeToken typeToken, Function func); + T step(String name, TypeToken resultType, Function func); /** * Executes a durable step using a {@link TypeToken} and configuration, blocking until it completes. * * @param the result type * @param name the unique operation name within this context - * @param typeToken the type token for deserialization of generic types + * @param resultType the type token for deserialization of generic types * @param func the function to execute, receiving a {@link StepContext} * @param config the step configuration (retry strategy, semantics, custom SerDes) * @return the step result */ - T step(String name, TypeToken typeToken, Function func, StepConfig config); + T step(String name, TypeToken resultType, Function func, StepConfig config); /** * Asynchronously executes a durable step, returning a {@link DurableFuture} that can be composed or blocked on. @@ -91,11 +91,11 @@ public interface DurableContext extends BaseContext { * * @param the result type * @param name the unique operation name within this context - * @param typeToken the type token for deserialization of generic types + * @param resultType the type token for deserialization of generic types * @param func the function to execute, receiving a {@link StepContext} * @return a future representing the step result */ - DurableFuture stepAsync(String name, TypeToken typeToken, Function func); + DurableFuture stepAsync(String name, TypeToken resultType, Function func); /** * Asynchronously executes a durable step using a {@link TypeToken} and custom configuration. @@ -104,13 +104,13 @@ public interface DurableContext extends BaseContext { * * @param the result type * @param name the unique operation name within this context - * @param typeToken the type token for deserialization of generic types + * @param resultType the type token for deserialization of generic types * @param func the function to execute, receiving a {@link StepContext} * @param config the step configuration (retry strategy, semantics, custom SerDes) * @return a future representing the step result */ DurableFuture stepAsync( - String name, TypeToken typeToken, Function func, StepConfig config); + String name, TypeToken resultType, Function func, StepConfig config); @Deprecated T step(String name, Class resultType, Supplier func); @@ -119,10 +119,10 @@ DurableFuture stepAsync( T step(String name, Class resultType, Supplier func, StepConfig config); @Deprecated - T step(String name, TypeToken typeToken, Supplier func); + T step(String name, TypeToken resultType, Supplier func); @Deprecated - T step(String name, TypeToken typeToken, Supplier func, StepConfig config); + T step(String name, TypeToken resultType, Supplier func, StepConfig config); @Deprecated DurableFuture stepAsync(String name, Class resultType, Supplier func); @@ -131,10 +131,10 @@ DurableFuture stepAsync( DurableFuture stepAsync(String name, Class resultType, Supplier func, StepConfig config); @Deprecated - DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func); + DurableFuture stepAsync(String name, TypeToken resultType, Supplier func); @Deprecated - DurableFuture stepAsync(String name, TypeToken typeToken, Supplier func, StepConfig config); + DurableFuture stepAsync(String name, TypeToken resultType, Supplier func, StepConfig config); /** * Suspends execution for the specified duration without consuming compute resources. @@ -177,9 +177,10 @@ DurableFuture stepAsync( T invoke(String name, String functionName, U payload, Class resultType, InvokeConfig config); /** Invokes another Lambda function using a {@link TypeToken} for generic result types, blocking until complete. */ - T invoke(String name, String functionName, U payload, TypeToken typeToken); + T invoke(String name, String functionName, U payload, TypeToken resultType); - T invoke(String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config); + /** Invokes another Lambda function using a {@link TypeToken} and custom configuration, blocking until complete. */ + T invoke(String name, String functionName, U payload, TypeToken resultType, InvokeConfig config); /** Invokes another Lambda function using a {@link TypeToken} and custom configuration, blocking until complete. */ DurableFuture invokeAsync( @@ -201,18 +202,18 @@ DurableFuture invokeAsync( * @param name the unique operation name within this context * @param functionName the ARN or name of the Lambda function to invoke * @param payload the input payload to send to the target function - * @param typeToken the type token for deserialization of generic result types + * @param resultType the type token for deserialization of generic result types * @param config the invoke configuration (custom SerDes for result and payload) * @return a future representing the invocation result */ DurableFuture invokeAsync( - String name, String functionName, U payload, TypeToken typeToken, InvokeConfig config); + String name, String functionName, U payload, TypeToken resultType, InvokeConfig config); /** Creates a callback with custom configuration. */ DurableCallbackFuture createCallback(String name, Class resultType, CallbackConfig config); /** Creates a callback using a {@link TypeToken} for generic result types. */ - DurableCallbackFuture createCallback(String name, TypeToken typeToken); + DurableCallbackFuture createCallback(String name, TypeToken resultType); /** Creates a callback with default configuration. */ DurableCallbackFuture createCallback(String name, Class resultType); @@ -225,11 +226,11 @@ DurableFuture invokeAsync( * * @param the result type * @param name the unique operation name within this context - * @param typeToken the type token for deserialization of generic result types + * @param resultType the type token for deserialization of generic result types * @param config the callback configuration (custom SerDes) * @return a future containing the callback ID and eventual result */ - DurableCallbackFuture createCallback(String name, TypeToken typeToken, CallbackConfig config); + DurableCallbackFuture createCallback(String name, TypeToken resultType, CallbackConfig config); /** * Runs a function in a child context, blocking until it completes. @@ -248,13 +249,13 @@ DurableFuture invokeAsync( /** * Runs a function in a child context using a {@link TypeToken} for generic result types, blocking until complete. */ - T runInChildContext(String name, TypeToken typeToken, Function func); + T runInChildContext(String name, TypeToken resultType, Function func); /** Asynchronously runs a function in a child context, returning a {@link DurableFuture}. */ DurableFuture runInChildContextAsync(String name, Class resultType, Function func); /** Asynchronously runs a function in a child context using a {@link TypeToken} for generic result types. */ - DurableFuture runInChildContextAsync(String name, TypeToken typeToken, Function func); + DurableFuture runInChildContextAsync(String name, TypeToken resultType, Function func); MapResult map(String name, Collection items, Class resultType, MapFunction function); @@ -301,7 +302,7 @@ DurableFuture> mapAsync( T waitForCallback(String name, Class resultType, BiConsumer func); /** Executes a submitter and waits for an external callback using a {@link TypeToken}, blocking until complete. */ - T waitForCallback(String name, TypeToken typeToken, BiConsumer func); + T waitForCallback(String name, TypeToken resultType, BiConsumer func); /** Executes a submitter and waits for an external callback with custom configuration, blocking until complete. */ T waitForCallback( @@ -313,7 +314,7 @@ T waitForCallback( /** Executes a submitter and waits for an external callback using a {@link TypeToken} and custom configuration. */ T waitForCallback( String name, - TypeToken typeToken, + TypeToken resultType, BiConsumer func, WaitForCallbackConfig waitForCallbackConfig); @@ -322,7 +323,7 @@ T waitForCallback( /** Asynchronously executes a submitter and waits for an external callback using a {@link TypeToken}. */ DurableFuture waitForCallbackAsync( - String name, TypeToken typeToken, BiConsumer func); + String name, TypeToken resultType, BiConsumer func); /** Asynchronously executes a submitter and waits for an external callback with custom configuration. */ DurableFuture waitForCallbackAsync( @@ -341,14 +342,14 @@ DurableFuture waitForCallbackAsync( * * @param the result type * @param name the unique operation name within this context - * @param typeToken the type token for deserialization of generic result types + * @param resultType the type token for deserialization of generic result types * @param func the submitter function, receiving the callback ID and a {@link StepContext} * @param waitForCallbackConfig the configuration for both the callback and submitter step * @return a future representing the callback result */ DurableFuture waitForCallbackAsync( String name, - TypeToken typeToken, + TypeToken resultType, BiConsumer func, WaitForCallbackConfig waitForCallbackConfig); @@ -379,7 +380,7 @@ T waitForCondition( /** Polls a condition function until it signals done, using a {@link TypeToken}, blocking until complete. */ T waitForCondition( String name, - TypeToken typeToken, + TypeToken resultType, BiFunction> checkFunc, T initialState); @@ -389,7 +390,7 @@ T waitForCondition( */ T waitForCondition( String name, - TypeToken typeToken, + TypeToken resultType, BiFunction> checkFunc, T initialState, WaitForConditionConfig config); @@ -412,7 +413,7 @@ DurableFuture waitForConditionAsync( /** Asynchronously polls a condition function until it signals done, using a {@link TypeToken}. */ DurableFuture waitForConditionAsync( String name, - TypeToken typeToken, + TypeToken resultType, BiFunction> checkFunc, T initialState); @@ -425,7 +426,7 @@ DurableFuture waitForConditionAsync( * * @param the type of state being polled * @param name the unique operation name within this context - * @param typeToken the type token for deserialization of generic types + * @param resultType the type token for deserialization of generic types * @param checkFunc the function that evaluates the condition and returns a {@link WaitForConditionResult} * @param initialState the initial state passed to the first check invocation * @param config the waitForCondition configuration (wait strategy, custom SerDes) @@ -433,7 +434,7 @@ DurableFuture waitForConditionAsync( */ DurableFuture waitForConditionAsync( String name, - TypeToken typeToken, + TypeToken resultType, BiFunction> checkFunc, T initialState, WaitForConditionConfig config); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java index 434d097bf..415f26428 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableExecutor.java @@ -38,6 +38,8 @@ public class DurableExecutor { // Lambda response size limit is 6MB minus small epsilon for envelope private static final int LAMBDA_RESPONSE_SIZE_LIMIT = 6 * 1024 * 1024 - 50; + private DurableExecutor() {} + public static DurableExecutionOutput execute( DurableExecutionInput input, Context lambdaContext, @@ -115,11 +117,11 @@ private static String handleLargePayload(ExecutionManager executionManager, Stri private static ErrorObject buildErrorObject(Throwable e, SerDes serDes) { // exceptions thrown from operations, e.g. Step - if (e instanceof DurableOperationException) { - return ((DurableOperationException) e).getErrorObject(); + if (e instanceof DurableOperationException durableOperationException) { + return durableOperationException.getErrorObject(); } - if (e instanceof UnrecoverableDurableExecutionException) { - return ((UnrecoverableDurableExecutionException) e).getErrorObject(); + if (e instanceof UnrecoverableDurableExecutionException unrecoverableDurableExecutionException) { + return unrecoverableDurableExecutionException.getErrorObject(); } // exceptions thrown from non-operation code return ExceptionHelper.buildErrorObject(e, serDes); @@ -134,6 +136,16 @@ private static I extractUserInput(Operation executionOp, SerDes serDes, Type return serDes.deserialize(inputPayload, inputType); } + /** + * Wraps a user handler in a RequestHandler that can be used by the Lambda runtime. + * + * @param inputType the type token for the input + * @param handler the handler function + * @param config the durable config + * @return a request handler that executes the durable function + * @param the type of the input + * @param the type of the output + */ public static RequestHandler wrap( TypeToken inputType, BiFunction handler, DurableConfig config) { return (input, context) -> execute(input, context, inputType, handler, config); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/InvokeConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/InvokeConfig.java index b8bd24ca1..0f091e9c7 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/InvokeConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/InvokeConfig.java @@ -52,6 +52,15 @@ private Builder(SerDes payloadSerDes, SerDes resultSerDes, String tenantId) { this.tenantId = tenantId; } + /** + * Sets the tenant ID for the invoke operation. + * + *

The tenant ID is used to isolate execution state for different tenants. It's required when invoking + * multi-tenant functions. + * + * @param tenantId the tenant ID to use + * @return this builder for method chaining + */ public Builder tenantId(String tenantId) { this.tenantId = tenantId; return this; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/TypeToken.java b/sdk/src/main/java/software/amazon/lambda/durable/TypeToken.java index 12d0ed374..ecb7c987c 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/TypeToken.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/TypeToken.java @@ -36,8 +36,8 @@ public abstract class TypeToken { */ protected TypeToken() { Type superClass = getClass().getGenericSuperclass(); - if (superClass instanceof ParameterizedType) { - this.type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; + if (superClass instanceof ParameterizedType parameterizedType) { + this.type = parameterizedType.getActualTypeArguments()[0]; } else { throw new IllegalStateException("TypeToken must be created as an anonymous subclass with a type parameter. " + "Example: new TypeToken>() {}"); @@ -74,10 +74,13 @@ public Type getType() { @Override public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof TypeToken)) return false; - TypeToken other = (TypeToken) obj; - return type.equals(other.type); + if (this == obj) { + return true; + } + if (obj instanceof TypeToken other) { + return type.equals(other.type); + } + return false; } @Override diff --git a/sdk/src/main/java/software/amazon/lambda/durable/WaitForCallbackConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/WaitForCallbackConfig.java index 0732630f5..8c6b7e28a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/WaitForCallbackConfig.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/WaitForCallbackConfig.java @@ -45,16 +45,29 @@ public static class Builder { private Builder() {} + /** + * Sets the step configuration for the composite operation. + * + * @param stepConfig the step configuration + * @return this builder for method chaining + */ public Builder stepConfig(StepConfig stepConfig) { this.stepConfig = stepConfig; return this; } + /** + * Sets the callback configuration for the composite operation. + * + * @param callbackConfig the callback configuration + * @return this builder for method chaining + */ public Builder callbackConfig(CallbackConfig callbackConfig) { this.callbackConfig = callbackConfig; return this; } + /** Builds the WaitForCallbackConfig instance. */ public WaitForCallbackConfig build() { return new WaitForCallbackConfig(this); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionResult.java b/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionResult.java index 6b482b7f2..74346110a 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionResult.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/WaitForConditionResult.java @@ -3,7 +3,7 @@ package software.amazon.lambda.durable; /** - * Result returned by a {@link WaitForConditionWaitStrategy} check function to signal whether the condition is met. + * Result returned by a WaitForCondition check function to signal whether the condition is met. * *

When {@code isDone} is true, polling stops and {@code value} becomes the final result. When {@code isDone} is * false, polling continues using the delay computed by the wait strategy. diff --git a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java index 5d95d34f2..735e42867 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/logging/DurableLogger.java @@ -45,27 +45,25 @@ public DurableLogger(Logger delegate, BaseContextImpl context) { } if (context instanceof DurableContext) { - // context thread - context id + // context thread - context id and name if (context.getContextId() != null) { MDC.put(MDC_CONTEXT_ID, context.getContextId()); } if (context.getContextName() != null) { MDC.put(MDC_CONTEXT_NAME, context.getContextName()); } - } else { - // step context + } else if (context instanceof StepContext stepContext) { + // In step context, context id is the operation id, context name is the operation name var operationId = context.getContextId(); - // step context - step operation id MDC.put(MDC_OPERATION_ID, operationId); - // step context - step operation name if (context.getContextName() != null) { MDC.put(MDC_OPERATION_NAME, context.getContextName()); } - MDC.put(MDC_ATTEMPT, String.valueOf(((StepContext) context).getAttempt())); + MDC.put(MDC_ATTEMPT, String.valueOf(stepContext.getAttempt())); } } - /** Clears all MDC entries set by this logger. */ + /** Clears all MDC entries. User set MDC entries will also be removed as the thread will not be used anymore. */ public void close() { MDC.clear(); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index d94e95610..92fc7a02b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -194,12 +194,13 @@ private void checkpointSuccess(T result) { private void handleChildContextFailure(Throwable exception) { exception = ExceptionHelper.unwrapCompletableFuture(exception); - if (exception instanceof SuspendExecutionException) { + if (exception instanceof SuspendExecutionException suspendExecutionException) { // Rethrow Error immediately — do not checkpoint - ExceptionHelper.sneakyThrow(exception); + throw suspendExecutionException; } - if (exception instanceof UnrecoverableDurableExecutionException) { - terminateExecution((UnrecoverableDurableExecutionException) exception); + if (exception instanceof UnrecoverableDurableExecutionException unrecoverableDurableExecutionException) { + // terminate the execution and throw the exception if it's not recoverable + terminateExecution(unrecoverableDurableExecutionException); } // Skip checkpointing if parent ConcurrencyOperation has already completed — diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java index 2eb540538..be94fd0b0 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java @@ -20,18 +20,18 @@ * Durable operation that invokes another Lambda function and waits for its result. * * @param the result type from the invoked function - * @param the payload type sent to the invoked function + * @param the payload type sent to the invoked function */ -public class InvokeOperation extends BaseDurableOperation { +public class InvokeOperation extends BaseDurableOperation { private final String functionName; - private final U payload; + private final I payload; private final InvokeConfig invokeConfig; private final SerDes payloadSerDes; public InvokeOperation( OperationIdentifier operationIdentifier, String functionName, - U payload, + I payload, TypeToken resultTypeToken, InvokeConfig config, DurableContextImpl durableContext) { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java index afbf3b1fa..4570f8637 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java @@ -150,17 +150,17 @@ private void handleStepSucceeded(T result) { private void handleStepFailure(Throwable exception, int attempt) { exception = ExceptionHelper.unwrapCompletableFuture(exception); - if (exception instanceof SuspendExecutionException) { - ExceptionHelper.sneakyThrow(exception); + if (exception instanceof SuspendExecutionException suspendExecutionException) { + throw suspendExecutionException; } - if (exception instanceof UnrecoverableDurableExecutionException) { + if (exception instanceof UnrecoverableDurableExecutionException unrecoverableDurableExecutionException) { // terminate the execution and throw the exception if it's not recoverable - terminateExecution((UnrecoverableDurableExecutionException) exception); + terminateExecution(unrecoverableDurableExecutionException); } final ErrorObject errorObject; - if (exception instanceof DurableOperationException) { - errorObject = ((DurableOperationException) exception).getErrorObject(); + if (exception instanceof DurableOperationException durableOperationException) { + errorObject = durableOperationException.getErrorObject(); } else { errorObject = serializeException(exception); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java index da4aecc02..622ff94aa 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -178,8 +178,8 @@ private void executeCheckLogic(T currentState, int attempt) { private void handleCheckFailure(Throwable exception) { exception = ExceptionHelper.unwrapCompletableFuture(exception); - if (exception instanceof SuspendExecutionException) { - ExceptionHelper.sneakyThrow(exception); + if (exception instanceof SuspendExecutionException suspendExecutionException) { + throw suspendExecutionException; } if (exception instanceof UnrecoverableDurableExecutionException unrecoverable) { terminateExecution(unrecoverable); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java index 5034eb7c0..febd16333 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java @@ -39,8 +39,6 @@ public WaitOperation( /** Starts the operation. */ @Override protected void start() { - Duration remainingWaitTime = duration; - // First execution - checkpoint with full duration var update = OperationUpdate.builder() .action(OperationAction.START) @@ -49,28 +47,32 @@ protected void start() { .build()); sendOperationUpdate(update); - logger.debug("Remaining wait time: {} seconds", remainingWaitTime.getSeconds()); - pollForOperationUpdates(remainingWaitTime); + pollForWaitExpiration(); } /** Replays the operation. */ @Override protected void replay(Operation existing) { - Duration remainingWaitTime = duration; - if (existing.status() == OperationStatus.SUCCEEDED) { // Wait already completed markAlreadyCompleted(); return; } - // Replay - calculate remaining time from scheduledEndTimestamp - // TODO: if the checkpoint is slow remaining wait time might be off. Track - // endTimestamp instead and move calculation in front of polling start. - if (existing.waitDetails() != null && existing.waitDetails().scheduledEndTimestamp() != null) { + + pollForWaitExpiration(); + } + + private void pollForWaitExpiration() { + // Always calculate remaining time from scheduledEndTimestamp if scheduledEndTimestamp exists + var remainingWaitTime = duration; + var existing = getOperation(); + if (existing != null + && existing.waitDetails() != null + && existing.waitDetails().scheduledEndTimestamp() != null) { remainingWaitTime = Duration.between(Instant.now(), existing.waitDetails().scheduledEndTimestamp()); } - logger.debug("Remaining wait time: {} seconds", remainingWaitTime.getSeconds()); + logger.debug("Remaining wait time: {} ms", remainingWaitTime.toMillis()); pollForOperationUpdates(remainingWaitTime); } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/serde/AwsSdkV2Module.java b/sdk/src/main/java/software/amazon/lambda/durable/serde/AwsSdkV2Module.java index d79a2eb76..f9fb55997 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/serde/AwsSdkV2Module.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/serde/AwsSdkV2Module.java @@ -29,7 +29,9 @@ public class AwsSdkV2Module extends SimpleModule { * List of AWS SDK v2 classes that require custom serialization/deserialization. Add new SDK classes here to * automatically register serializers and deserializers. * - *

See https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-serialization-changes.html + *

See serialization + * differences */ private static final List> SDK_CLASSES = List.of(Operation.class, ErrorObject.class, CheckpointUpdatedExecutionState.class); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java b/sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java index 820cfd476..d69ea805c 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java @@ -17,8 +17,7 @@ /** * Jackson-based implementation of {@link SerDes}. * - *

This implementation uses Jackson's ObjectMapper for JSON serialization and deserialization, with support for both - * simple types via {@link Class} and complex generic types via {@link TypeToken}. + *

This implementation uses Jackson's ObjectMapper for JSON serialization and deserialization. * *

Features: * @@ -67,7 +66,7 @@ public T deserialize(String data, TypeToken typeToken) { try { // Convert TypeToken to Jackson's JavaType using TypeFactory // Cache to avoid repeated reflection overhead - JavaType javaType = typeCache.computeIfAbsent(typeToken.getType(), type -> typeFactory.constructType(type)); + JavaType javaType = typeCache.computeIfAbsent(typeToken.getType(), typeFactory::constructType); return mapper.readValue(data, javaType); } catch (Exception e) { throw new SerDesException( diff --git a/sdk/src/main/java/software/amazon/lambda/durable/serde/NoopSerDes.java b/sdk/src/main/java/software/amazon/lambda/durable/serde/NoopSerDes.java index 9561f3e90..245fed925 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/serde/NoopSerDes.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/serde/NoopSerDes.java @@ -4,7 +4,7 @@ import software.amazon.lambda.durable.TypeToken; -/** a placeholder for operations that don't have data to serialize or deserialize */ +/** A {@link SerDes} implementation that does nothing. Used as a placeholder when no serialization is required. */ public class NoopSerDes implements SerDes { @Override public String serialize(Object value) { diff --git a/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java index 2795c2508..209684fbe 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java @@ -384,4 +384,81 @@ void testBuilder_CheckpointDelayNull_UsesDefault() { assertEquals(Duration.ofSeconds(0), config.getCheckpointDelay()); } + + // --- validateConfiguration tests --- + + @Test + void validateConfiguration_PassesForValidConfig() { + var config = DurableConfig.builder() + .withDurableExecutionClient(mockClient) + .withSerDes(mockSerDes) + .withExecutorService(mockExecutor) + .build(); + + // Should not throw — all fields are set + config.validateConfiguration(); + } + + @Test + void validateConfiguration_ThrowsWhenDurableExecutionClientIsNull() throws Exception { + var config = + DurableConfig.builder().withDurableExecutionClient(mockClient).build(); + + setField(config, "durableExecutionClient", null); + + var ex = assertThrows(IllegalStateException.class, config::validateConfiguration); + assertEquals("DurableExecutionClient configuration failed", ex.getMessage()); + } + + @Test + void validateConfiguration_ThrowsWhenSerDesIsNull() throws Exception { + var config = + DurableConfig.builder().withDurableExecutionClient(mockClient).build(); + + setField(config, "serDes", null); + + var ex = assertThrows(IllegalStateException.class, config::validateConfiguration); + assertEquals("SerDes configuration failed", ex.getMessage()); + } + + @Test + void validateConfiguration_ThrowsWhenExecutorServiceIsNull() throws Exception { + var config = + DurableConfig.builder().withDurableExecutionClient(mockClient).build(); + + setField(config, "executorService", null); + + var ex = assertThrows(IllegalStateException.class, config::validateConfiguration); + assertEquals("ExecutorService configuration failed", ex.getMessage()); + } + + @Test + void validateConfiguration_ChecksClientBeforeSerDes() throws Exception { + var config = + DurableConfig.builder().withDurableExecutionClient(mockClient).build(); + + setField(config, "durableExecutionClient", null); + setField(config, "serDes", null); + + var ex = assertThrows(IllegalStateException.class, config::validateConfiguration); + assertEquals("DurableExecutionClient configuration failed", ex.getMessage()); + } + + @Test + void validateConfiguration_ChecksSerDesBeforeExecutorService() throws Exception { + var config = + DurableConfig.builder().withDurableExecutionClient(mockClient).build(); + + setField(config, "serDes", null); + setField(config, "executorService", null); + + var ex = assertThrows(IllegalStateException.class, config::validateConfiguration); + assertEquals("SerDes configuration failed", ex.getMessage()); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + var field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } }