Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ protected boolean canComplete() {
}

private void handleComplete() {
// We do not complete the futrure here, the furture is completed via checkpoint
if (isOperationCompleted()) {
return;
}
Expand All @@ -256,9 +257,6 @@ private void handleComplete() {
} else {
handleFailure(completionStatus);
}
synchronized (completionFuture) {
completionFuture.complete(null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ protected <R> ChildContextOperation<R> createItem(
protected void handleSuccess() {
if (skipCheckpoint) {
// Do not send checkpoint during replay
markAlreadyCompleted();
return;
}
sendOperationUpdate(OperationUpdate.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,21 @@ public void execute() {
@Override
protected void handleSuccess() {
successHandled = true;
// Simulate the checkpoint ACK that a real subclass would receive after sendOperationUpdate.
// This drives completionFuture to completion so waitForOperationCompletion() unblocks.
onCheckpointComplete(Operation.builder()
.id(getOperationId())
.status(OperationStatus.SUCCEEDED)
.build());
}

@Override
protected void handleFailure(ConcurrencyCompletionStatus completionStatus) {
failureHandled = true;
onCheckpointComplete(Operation.builder()
.id(getOperationId())
.status(OperationStatus.SUCCEEDED) // always success for parallel
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -69,25 +70,41 @@ void setUp() {
when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to mock OperationIdGenerator. We can use TestUtil::hashOperationId to generate a real one

when(executionManager.getOperationAndUpdateReplayState(anyString())).thenReturn(null);

// Simulate the real backend: when a SUCCEED checkpoint is sent for the parallel op,
// make getOperationAndUpdateReplayState return a SUCCEEDED operation so waitForOperationCompletion() can find
// it.
// Capture registered operations so we can drive onCheckpointComplete callbacks.
var registeredOps = new ConcurrentHashMap<String, BaseDurableOperation<?>>();
doAnswer(inv -> {
BaseDurableOperation<?> op = inv.getArgument(0);
registeredOps.put(op.getOperationId(), op);
return null;
})
.when(executionManager)
.registerOperation(any());

// Simulate the real backend for all sendOperationUpdate calls:
// - For SUCCEED on the parallel op: update the stub and fire onCheckpointComplete to unblock join().
// - For everything else (START, child checkpoints): just return a completed future.
var succeededParallelOp = Operation.builder()
.id(OPERATION_ID)
.name("test-parallel")
.type(OperationType.CONTEXT)
.subType(OperationSubType.PARALLEL.getValue())
.status(OperationStatus.SUCCEEDED)
.build();
when(executionManager.sendOperationUpdate(argThat(u -> u != null
&& u.id() != null
&& u.id().equals(OPERATION_ID)
&& u.action() == OperationAction.SUCCEED)))
.thenAnswer(inv -> {
when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID))
.thenReturn(succeededParallelOp);
doAnswer(inv -> {
var update = (software.amazon.awssdk.services.lambda.model.OperationUpdate) inv.getArgument(0);

if (OPERATION_ID.equals(update.id()) && update.action() == OperationAction.SUCCEED) {
when(executionManager.getOperationAndUpdateReplayState(OPERATION_ID))
.thenReturn(succeededParallelOp);
var op = registeredOps.get(OPERATION_ID);
if (op != null) {
op.onCheckpointComplete(succeededParallelOp);
}
}
return CompletableFuture.completedFuture(null);
});
})
.when(executionManager)
.sendOperationUpdate(any());
}

private ParallelOperation<Void> createOperation(int maxConcurrency, int minSuccessful, int toleratedFailureCount) {
Expand Down
Loading