Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void testParallelPartialFailure_failedBranchDoesNotPreventOthers() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var config = ParallelConfig.builder().build();
var futures = new ArrayList<DurableFuture<String>>();
var parallel = context.parallel("partial-fail", config);
Copy link
Copy Markdown
Contributor

@SilanHe SilanHe Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, any reason why we decided to use var for most of the code base?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because the SDK supports only Java 17+?

ParallelDurableFuture parallel = context.parallel("partial-fail", config);

try (parallel) {
futures.add(parallel.branch("branch-a", String.class, ctx -> "A"));
Expand Down Expand Up @@ -322,15 +322,15 @@ void testStepBeforeAndAfterParallel() {
void testSequentialParallelBlocks() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var futures1 = new ArrayList<DurableFuture<String>>();
var parallel1 =
ParallelDurableFuture parallel1 =
context.parallel("parallel-1", ParallelConfig.builder().build());
try (parallel1) {
futures1.add(parallel1.branch("branch-a", String.class, ctx -> "A"));
futures1.add(parallel1.branch("branch-b", String.class, ctx -> "B"));
}

var futures2 = new ArrayList<DurableFuture<String>>();
var parallel2 =
ParallelDurableFuture parallel2 =
context.parallel("parallel-2", ParallelConfig.builder().build());
try (parallel2) {
futures2.add(parallel2.branch("branch-x", String.class, ctx -> "x!"));
Expand All @@ -354,7 +354,7 @@ void testParallelReplayWithFailedBranches() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var config = ParallelConfig.builder().build();
var futures = new ArrayList<DurableFuture<String>>();
var parallel = context.parallel("replay-fail-parallel", config);
ParallelDurableFuture parallel = context.parallel("replay-fail-parallel", config);

try (parallel) {
futures.add(parallel.branch("branch-ok", String.class, ctx -> {
Expand All @@ -371,7 +371,7 @@ void testParallelReplayWithFailedBranches() {
}));
}

var result = parallel.get();
parallel.get();
assertEquals("OK", futures.get(0).get());
assertEquals("OK2", futures.get(2).get());
return "done";
Expand All @@ -392,7 +392,7 @@ void testParallelWithSingleBranch() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var config = ParallelConfig.builder().build();
var futures = new ArrayList<DurableFuture<String>>();
var parallel = context.parallel("single-branch", config);
ParallelDurableFuture parallel = context.parallel("single-branch", config);

try (parallel) {
futures.add(parallel.branch(
Expand All @@ -418,7 +418,7 @@ void testParallelWithWaitInsideBranches_replay() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var config = ParallelConfig.builder().build();
var futures = new ArrayList<DurableFuture<String>>();
var parallel = context.parallel("wait-replay-parallel", config);
ParallelDurableFuture parallel = context.parallel("wait-replay-parallel", config);

try (parallel) {
for (var item : List.of("a", "b")) {
Expand Down Expand Up @@ -546,7 +546,7 @@ void testParallelWithToleratedFailureCount_earlyTermination() {
.completionConfig(CompletionConfig.toleratedFailureCount(1))
.build();
var futures = new ArrayList<DurableFuture<String>>();
var parallel = context.parallel("tolerated-fail", config);
ParallelDurableFuture parallel = context.parallel("tolerated-fail", config);

try (parallel) {
futures.add(parallel.branch("branch-ok", String.class, ctx -> "OK"));
Expand Down Expand Up @@ -580,7 +580,7 @@ void testParallelWithMinSuccessful_earlyTermination() {
.completionConfig(CompletionConfig.minSuccessful(2))
.build();
var futures = new ArrayList<DurableFuture<String>>();
var parallel = context.parallel("min-successful", config);
ParallelDurableFuture parallel = context.parallel("min-successful", config);

try (parallel) {
for (var item : List.of("a", "b", "c", "d", "e")) {
Expand Down Expand Up @@ -617,7 +617,7 @@ void testParallelWithAllSuccessful_stopsOnFirstFailure() {
.completionConfig(CompletionConfig.allSuccessful())
.build();
var futures = new ArrayList<DurableFuture<String>>();
var parallel = context.parallel("all-successful", config);
ParallelDurableFuture parallel = context.parallel("all-successful", config);

try (parallel) {
futures.add(parallel.branch("branch-ok1", String.class, ctx -> "OK1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,7 @@ private DurableExecutionInput createDurableInput(I input) {
.build();

// Load previous operations and include them in InitialExecutionState
var existingOps =
storage.getExecutionState(executionArn, "test-token", null).operations();
var existingOps = storage.getAllOperations();
var allOps = new ArrayList<>(List.of(executionOp));
allOps.addAll(existingOps);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,86 @@
import static software.amazon.awssdk.services.lambda.model.EventType.*;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.services.lambda.model.*;

/** Generates Event objects from OperationUpdate for local testing. */
class EventProcessor {
private final AtomicInteger eventId = new AtomicInteger(1);
private final List<Event> allEvents = new CopyOnWriteArrayList<>();

Event processUpdate(OperationUpdate update, Operation operation) {
void processUpdate(OperationUpdate update, Operation operation) {
var builder = Event.builder()
.eventId(eventId.getAndIncrement())
.eventTimestamp(Instant.now())
.id(update.id())
.name(update.name());

return switch (update.type()) {
case STEP -> buildStepEvent(builder, update, operation);
case WAIT -> buildWaitEvent(builder, update, operation);
case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation);
case EXECUTION -> buildExecutionEvent(builder, update);
case CALLBACK -> buildCallbackEvent(builder, update);
case CONTEXT -> buildContextEvent(builder, update);
default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type());
};
Event event =
switch (update.type()) {
case STEP -> buildStepEvent(builder, update, operation);
case WAIT -> buildWaitEvent(builder, update, operation);
case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation);
case EXECUTION -> buildExecutionEvent(builder, update);
case CALLBACK -> buildCallbackEvent(builder, update);
case CONTEXT -> buildContextEvent(builder, update);
default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type());
};

allEvents.add(event);
}

// process new status of an operation without an OperationUpdate
void processUpdate(Operation updatedOperation) {
var builder = Event.builder()
.eventId(eventId.getAndIncrement())
.eventTimestamp(Instant.now())
.id(updatedOperation.id())
.name(updatedOperation.name());
// support the statuses that don't have a corresponding OperationAction
switch (updatedOperation.status()) {
case STARTED -> {
// used by resetCheckpointToStarted
return;
}
case READY -> {
if (updatedOperation.type() == OperationType.STEP) {
// no event type for this case
return;
} else {
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
}
}
case TIMED_OUT -> {
switch (updatedOperation.type()) {
case EXECUTION -> builder.eventType(EXECUTION_TIMED_OUT);
case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_TIMED_OUT);
case CALLBACK -> builder.eventType(CALLBACK_TIMED_OUT);
default ->
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
}
}
case STOPPED -> {
switch (updatedOperation.type()) {
case EXECUTION -> builder.eventType(EXECUTION_STOPPED);
case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_STOPPED);
default ->
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
}
}
default -> throw new IllegalArgumentException("Unsupported operation status: " + updatedOperation.status());
}
allEvents.add(builder.build());
}

List<Event> getAllEvents() {
return List.copyOf(allEvents);
}

public List<Event> getEventsForOperation(String operationId) {
return allEvents.stream().filter(e -> e.id().equals(operationId)).toList();
}

private Event buildStepEvent(Event.Builder builder, OperationUpdate update, Operation operation) {
Expand Down
Loading
Loading