generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathLocalDurableTestRunner.java
More file actions
342 lines (310 loc) · 14.8 KB
/
LocalDurableTestRunner.java
File metadata and controls
342 lines (310 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.testing;
import com.amazonaws.services.lambda.runtime.Context;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.BiFunction;
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.awssdk.services.lambda.model.ExecutionDetails;
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableExecutor;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.serde.JacksonSerDes;
import software.amazon.lambda.durable.serde.SerDes;
/**
* In-memory test runner for durable Lambda functions. Simulates the Lambda re-invocation loop locally without requiring
* AWS infrastructure, enabling fast unit and integration tests.
*
* @param <I> the handler input type
* @param <O> the handler output type
*/
public class LocalDurableTestRunner<I, O> {
private static final int MAX_INVOCATIONS = 100;
private final TypeToken<I> inputType;
private final BiFunction<I, DurableContext, O> handler;
private final LocalMemoryExecutionClient storage;
private final SerDes serDes;
private final DurableConfig customerConfig;
private LocalDurableTestRunner(TypeToken<I> inputType, BiFunction<I, DurableContext, O> handlerFn) {
this.inputType = inputType;
this.handler = handlerFn;
this.storage = new LocalMemoryExecutionClient();
this.serDes = new JacksonSerDes();
this.customerConfig = null;
}
private LocalDurableTestRunner(
TypeToken<I> inputType, BiFunction<I, DurableContext, O> handlerFn, DurableConfig customerConfig) {
this.inputType = inputType;
this.handler = handlerFn;
this.storage = new LocalMemoryExecutionClient();
this.serDes = customerConfig.getSerDes();
this.customerConfig = customerConfig;
}
/**
* Creates a LocalDurableTestRunner with default configuration. Use this method when your handler uses the default
* DurableConfig.
*
* @param inputType The input type class
* @param handlerFn The handler function
* @param <I> Input type
* @param <O> Output type
* @return LocalDurableTestRunner with default configuration
*/
public static <I, O> LocalDurableTestRunner<I, O> create(
Class<I> inputType, BiFunction<I, DurableContext, O> handlerFn) {
return new LocalDurableTestRunner<>(TypeToken.get(inputType), handlerFn);
}
/**
* Creates a LocalDurableTestRunner with default configuration. Use this method when your handler uses the default
* DurableConfig.
*
* <p>If your handler has custom configuration (custom SerDes, ExecutorService, etc.), use {@link #create(TypeToken,
* DurableHandler)} instead to ensure the test runner uses the same configuration as your handler.
*
* <p>Optionally, you can also use {@link #create(TypeToken, BiFunction, DurableConfig)} to pass in any
* DurableConfig directly.
*
* @param inputType The input type class
* @param handlerFn The handler function
* @param <I> Input type
* @param <O> Output type
* @return LocalDurableTestRunner with default configuration
*/
public static <I, O> LocalDurableTestRunner<I, O> create(
TypeToken<I> inputType, BiFunction<I, DurableContext, O> handlerFn) {
return new LocalDurableTestRunner<>(inputType, handlerFn);
}
/**
* Creates a LocalDurableTestRunner that uses a custom configuration. This allows the test runner to use custom
* SerDes and other configuration, while overriding the DurableExecutionClient with the in-memory implementation.
*
* @param inputType The input type class
* @param handlerFn The handler function
* @param config The DurableConfig to use (DurableExecutionClient will be overridden with in-memory implementation)
* @param <I> Input type
* @param <O> Output type
* @return LocalDurableTestRunner configured with the provided settings
*/
public static <I, O> LocalDurableTestRunner<I, O> create(
Class<I> inputType, BiFunction<I, DurableContext, O> handlerFn, DurableConfig config) {
return new LocalDurableTestRunner<>(TypeToken.get(inputType), handlerFn, config);
}
/**
* Creates a LocalDurableTestRunner that uses a custom configuration. This allows the test runner to use custom
* SerDes and other configuration, while overriding the DurableExecutionClient with the in-memory implementation.
*
* <p>Use this method when you need to pass a custom DurableConfig directly, for example when testing with a custom
* SerDes without using a DurableHandler.
*
* <p>Example usage:
*
* <pre>{@code
* // Create a custom DurableConfig with custom SerDes
* var config = DurableConfig.builder()
* .withSerDes(new MyCustomSerDes())
* .build();
*
* // Create test runner with custom configuration
* var runner = LocalDurableTestRunner.create(
* String.class,
* (input, context) -> context.step("process", String.class, stepCtx -> "result"),
* config
* );
*
* // Run test with custom configuration
* var result = runner.run("test-input");
* assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
* }</pre>
*
* @param inputType The input type class
* @param handlerFn The handler function
* @param config The DurableConfig to use (DurableExecutionClient will be overridden with in-memory implementation)
* @param <I> Input type
* @param <O> Output type
* @return LocalDurableTestRunner configured with the provided settings
*/
public static <I, O> LocalDurableTestRunner<I, O> create(
TypeToken<I> inputType, BiFunction<I, DurableContext, O> handlerFn, DurableConfig config) {
return new LocalDurableTestRunner<>(inputType, handlerFn, config);
}
/**
* Creates a LocalDurableTestRunner from a DurableHandler instance, automatically extracting the configuration. This
* is a convenient method when you have a handler instance and want to test it with the same configuration it uses
* in production.
*
* @param inputType The input type class
* @param handler The DurableHandler instance to test
* @param <I> Input type
* @param <O> Output type
* @return LocalDurableTestRunner configured with the handler's settings
*/
public static <I, O> LocalDurableTestRunner<I, O> create(Class<I> inputType, DurableHandler<I, O> handler) {
return new LocalDurableTestRunner<>(
TypeToken.get(inputType), handler::handleRequest, handler.getConfiguration());
}
/**
* Creates a LocalDurableTestRunner from a DurableHandler instance, automatically extracting the configuration. This
* is a convenient method when you have a handler instance and want to test it with the same configuration it uses
* in production.
*
* <p>This method automatically:
*
* <ul>
* <li>Uses the handler's configuration (SerDes, ExecutorService, etc.)
* <li>Overrides the DurableExecutionClient with the in-memory implementation for testing
* </ul>
*
* <p>Example usage:
*
* <pre>{@code
* // Create handler instance
* var handler = new MyCustomHandler();
*
* // Create test runner from handler (automatically extracts config)
* var runner = LocalDurableTestRunner.create(String.class, handler);
*
* // Run test with the handler's configuration
* var result = runner.run("test-input");
* assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
* }</pre>
*
* @param inputType The input type class
* @param handler The DurableHandler instance to test
* @param <I> Input type
* @param <O> Output type
* @return LocalDurableTestRunner configured with the handler's settings
*/
public static <I, O> LocalDurableTestRunner<I, O> create(TypeToken<I> inputType, DurableHandler<I, O> handler) {
return new LocalDurableTestRunner<>(inputType, handler::handleRequest, handler.getConfiguration());
}
/** Run a single invocation (may return PENDING if waiting/retrying). */
public TestResult<O> run(I input) {
var durableInput = createDurableInput(input);
// Create config that uses customer's configuration but overrides the client with in-memory storage
DurableConfig config;
if (customerConfig != null) {
// Use customer's config but override the client with our in-memory implementation
config = DurableConfig.builder()
.withDurableExecutionClient(storage)
.withSerDes(customerConfig.getSerDes())
.withExecutorService(customerConfig.getExecutorService())
.withPollingStrategy(customerConfig.getPollingStrategy())
.withCheckpointDelay(customerConfig.getCheckpointDelay())
.withLoggerConfig(customerConfig.getLoggerConfig())
.build();
} else {
// Fallback to default config with in-memory client
config = DurableConfig.builder().withDurableExecutionClient(storage).build();
}
var output = DurableExecutor.execute(durableInput, mockLambdaContext(), inputType, handler, config);
return storage.toTestResult(output);
}
/**
* Run until completion (SUCCEEDED or FAILED) or pending manual intervention, simulating Lambda re-invocations.
* Operations that don't require manual intervention (like WAIT in STARTED or STEP in PENDING) will be automatically
* advanced.
*
* @param input The input to process
* @return Final test result (SUCCEEDED or FAILED) or PENDING if operations pending manual intervention
*/
public TestResult<O> runUntilComplete(I input) {
TestResult<O> result = null;
for (int i = 0; i < MAX_INVOCATIONS; i++) {
result = run(input);
if (result.getStatus() != ExecutionStatus.PENDING || !storage.advanceReadyOperations()) {
// break the loop if
// - Return SUCCEEDED or FAILED - we're done
// - Return PENDING and let test manually advance operations if no operations can be auto advanced
break;
}
}
return result;
}
/** Resets a named step operation to STARTED status, simulating a checkpoint failure. */
public void resetCheckpointToStarted(String stepName) {
storage.resetCheckpointToStarted(stepName);
}
/** Removes a named step operation entirely, simulating loss of a fire-and-forget checkpoint. */
public void simulateFireAndForgetCheckpointLoss(String stepName) {
storage.simulateFireAndForgetCheckpointLoss(stepName);
}
/** Returns the {@link TestOperation} for the given operation name, or null if not found. */
public TestOperation getOperation(String name) {
var op = storage.getOperationByName(name);
return op != null ? new TestOperation(op, serDes) : null;
}
/** Get callback ID for a named callback operation. */
public String getCallbackId(String operationName) {
return storage.getCallbackId(operationName);
}
/** Complete a callback with success result. */
public void completeCallback(String callbackId, String result) {
storage.completeCallback(callbackId, result);
}
/** Fail a callback with error. */
public void failCallback(String callbackId, ErrorObject error) {
storage.failCallback(callbackId, error);
}
/** Timeout a callback. */
public void timeoutCallback(String callbackId) {
storage.timeoutCallback(callbackId);
}
/** Advances all pending operations, simulating time passing for retries and waits. */
public void advanceTime() {
storage.advanceReadyOperations();
}
/** Completes a chained invoke operation with a successful result. */
public void completeChainedInvoke(String name, String result) {
storage.completeChainedInvoke(name, new OperationResult(OperationStatus.SUCCEEDED, result, null));
}
/** Marks a chained invoke operation as timed out. */
public void timeoutChainedInvoke(String name) {
storage.completeChainedInvoke(name, new OperationResult(OperationStatus.TIMED_OUT, null, null));
}
/** Fails a chained invoke operation with the given error. */
public void failChainedInvoke(String name, ErrorObject error) {
storage.completeChainedInvoke(name, new OperationResult(OperationStatus.FAILED, null, error));
}
/** Stops a chained invoke operation with the given error. */
public void stopChainedInvoke(String name, ErrorObject error) {
storage.completeChainedInvoke(name, new OperationResult(OperationStatus.STOPPED, null, error));
}
private DurableExecutionInput createDurableInput(I input) {
var executionName = UUID.randomUUID().toString();
var invocationId = UUID.randomUUID().toString();
var executionArn = String.format(
"arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/%s/%s",
executionName, invocationId);
var inputJson = serDes.serialize(input);
var executionOp = Operation.builder()
.id(invocationId)
.name(executionName)
.type(OperationType.EXECUTION)
.status(OperationStatus.STARTED)
.executionDetails(
ExecutionDetails.builder().inputPayload(inputJson).build())
.build();
// Load previous operations and include them in InitialExecutionState
var existingOps =
storage.getExecutionState(executionArn, "test-token", null).operations();
var allOps = new ArrayList<>(List.of(executionOp));
allOps.addAll(existingOps);
return new DurableExecutionInput(
executionArn,
"test-token",
CheckpointUpdatedExecutionState.builder().operations(allOps).build());
}
private Context mockLambdaContext() {
return null; // Minimal - tests don't need real Lambda context
}
}