Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions .github/workflows/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ jobs:
role-to-assume: "${{ secrets.ACTIONS_INTEGRATION_ROLE_NAME }}"
role-session-name: java-language-sdk-test
aws-region: ${{ env.AWS_REGION }}
- name: Setup Java ${{ matrix.java }}
uses: actions/setup-java@v5
with:
distribution: corretto
java-version: ${{ matrix.java }}
cache: maven
- name: Build locally
run: mvn -B -q -Dmaven.test.skip=true install --file pom.xml
- name: sam build
Expand All @@ -70,14 +76,6 @@ jobs:
--resolve-image-repos --resolve-s3 --capabilities CAPABILITY_IAM --parameter-overrides \
'ParameterKey=Architecture,ParameterValue=x86_64 ParameterKey=JavaVersion,ParameterValue=java${{ matrix.java }}'
working-directory: ./examples
- name: Setup Java ${{ matrix.java }}
uses: actions/setup-java@v5
with:
distribution: corretto
java-version: ${{ matrix.java }}
cache: maven
- name: Build locally
run: mvn -B -q -Dmaven.test.skip=true install --file pom.xml
- name: Cloud Based Integration Tests
run: mvn clean test -B -Dtest.cloud.enabled=true -Dtest=CloudBasedIntegrationTest -Dtest.function.name.suffix='-java${{ matrix.java }}-runtime'
working-directory: ./examples
34 changes: 34 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,38 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>exclude-virtual-threads</id>
<activation>
<jdk>[,21)</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/vt/ManyAsyncStepsVirtualThreadPoolExample.java</exclude>
</excludes>
<testExcludes>
<testExclude>**/vt/ManyAsyncStepsVirtualThreadPoolExampleTest.java</testExclude>
</testExcludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/vt/ManyAsyncStepsVirtualThreadPoolExampleTest.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsInput;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsOutput;

/**
* Performance test example demonstrating concurrent async child contexts.
Expand All @@ -21,15 +23,10 @@
* <li>All results are collected using {@link DurableFuture#allOf}
* </ul>
*/
public class ManyAsyncChildContextExample
extends DurableHandler<ManyAsyncChildContextExample.Input, ManyAsyncChildContextExample.Output> {

public record Input(int multiplier, int steps) {}

public record Output(long result, long executionTimeMs, long replayTimeMs) {}
public class ManyAsyncChildContextExample extends DurableHandler<ManyAsyncStepsInput, ManyAsyncStepsOutput> {

@Override
public Output handleRequest(Input input, DurableContext context) {
public ManyAsyncStepsOutput handleRequest(ManyAsyncStepsInput input, DurableContext context) {
var startTime = System.nanoTime();
var multiplier = input.multiplier();
var steps = input.steps();
Expand Down Expand Up @@ -65,7 +62,7 @@ public Output handleRequest(Input input, DurableContext context) {

var replayTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);

return new Output(totalSum, executionTimeMs, replayTimeMs);
return new ManyAsyncStepsOutput(totalSum, executionTimeMs, replayTimeMs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsInput;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsOutput;

/**
* Performance test example demonstrating concurrent async steps.
Expand All @@ -21,14 +23,10 @@
* <li>All results are collected using {@link DurableFuture#allOf}
* </ul>
*/
public class ManyAsyncStepsExample extends DurableHandler<ManyAsyncStepsExample.Input, ManyAsyncStepsExample.Output> {

public record Input(int multiplier, int steps) {}

public record Output(long result, long executionTimeMs, long replayTimeMs) {}
public class ManyAsyncStepsExample extends DurableHandler<ManyAsyncStepsInput, ManyAsyncStepsOutput> {

@Override
public Output handleRequest(Input input, DurableContext context) {
public ManyAsyncStepsOutput handleRequest(ManyAsyncStepsInput input, DurableContext context) {
var startTime = System.nanoTime();
var multiplier = input.multiplier();
var steps = input.steps();
Expand Down Expand Up @@ -60,7 +58,7 @@ public Output handleRequest(Input input, DurableContext context) {

var replayTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);

return new Output(totalSum, executionTimeMs, replayTimeMs);
return new ManyAsyncStepsOutput(totalSum, executionTimeMs, replayTimeMs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.types;

public record ManyAsyncStepsInput(int multiplier, int steps) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.types;

public record ManyAsyncStepsOutput(long result, long executionTimeMs, long replayTimeMs) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.examples.vt;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import software.amazon.lambda.durable.DurableConfig;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsInput;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsOutput;

/**
* Performance test example demonstrating concurrent async steps.
*
* <p>This example tests the SDK's ability to handle many concurrent operations:
*
* <ul>
* <li>Creates async steps in a loop
* <li>Each step performs a simple computation
* <li>All results are collected using {@link DurableFuture#allOf}
* </ul>
*/
public class ManyAsyncStepsVirtualThreadPoolExample extends DurableHandler<ManyAsyncStepsInput, ManyAsyncStepsOutput> {

@Override
public ManyAsyncStepsOutput handleRequest(ManyAsyncStepsInput input, DurableContext context) {
var startTime = System.nanoTime();
var multiplier = input.multiplier();
var steps = input.steps();
var logger = context.getLogger();

logger.info("Starting {} async steps with multiplier {}", steps, multiplier);

// Create async steps
var futures = new ArrayList<DurableFuture<Integer>>(steps);
for (var i = 0; i < steps; i++) {
var index = i;
var future = context.stepAsync("compute-" + i, Integer.class, stepCtx -> index * multiplier);
futures.add(future);
}

logger.info("All {} async steps created, collecting results", steps);

// Collect all results using allOf
var results = DurableFuture.allOf(futures);
var totalSum = results.stream().mapToInt(Integer::intValue).sum();

// checkpoint the executionTime so that we can have the same value when replay
var executionTimeMs = context.step(
"execution-time", Long.class, stepCtx -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
logger.info("Completed {} steps, total sum: {}, execution time: {}ms", steps, totalSum, executionTimeMs);

// Wait 2 seconds to test replay
context.wait("post-compute-wait", Duration.ofSeconds(2));

var replayTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);

return new ManyAsyncStepsOutput(totalSum, executionTimeMs, replayTimeMs);
}

@Override
protected DurableConfig createConfiguration() {
// Add a small checkpoint delay to help batch the checkpoint requests and reduce the overall latencies
// when the function has many concurrent operations
return DurableConfig.builder()
.withCheckpointDelay(Duration.ofMillis(10))
.withExecutorService(Executors.newVirtualThreadPerTaskExecutor())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
Expand All @@ -22,11 +24,11 @@
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.examples.child.ManyAsyncChildContextExample;
import software.amazon.lambda.durable.examples.general.GenericTypesExample;
import software.amazon.lambda.durable.examples.step.ManyAsyncStepsExample;
import software.amazon.lambda.durable.examples.types.ApprovalRequest;
import software.amazon.lambda.durable.examples.types.GreetingRequest;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsInput;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsOutput;
import software.amazon.lambda.durable.examples.wait.ConcurrentWaitForConditionExample;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.serde.JacksonSerDes;
Expand Down Expand Up @@ -536,10 +538,10 @@ void testManyAsyncStepsExample(int steps, long maxExecutionTime, long maxReplayT
for (var i = 0; i < PERFORMANCE_TEST_REPEAT; i++) {
var runner = CloudDurableTestRunner.create(
arn("many-async-steps-example"),
ManyAsyncStepsExample.Input.class,
ManyAsyncStepsExample.Output.class,
ManyAsyncStepsInput.class,
ManyAsyncStepsOutput.class,
lambdaClient);
var result = runner.run(new ManyAsyncStepsExample.Input(2, steps));
var result = runner.run(new ManyAsyncStepsInput(2, steps));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

Expand All @@ -565,6 +567,44 @@ void testManyAsyncStepsExample(int steps, long maxExecutionTime, long maxReplayT
assertTrue(minimalExecutionTimeMs < maxExecutionTime);
}

@EnabledForJreRange(min = JRE.JAVA_21)
@ParameterizedTest
@CsvSource({"100, 1000, 20", "500, 2000, 30", "1000, 3000, 50"})
void testManyAsyncStepsVirtualThreadExample(int steps, long maxExecutionTime, long maxReplayTime) {
long minimalExecutionTimeMs = Long.MAX_VALUE;
long minimalReplayTimeMs = Long.MAX_VALUE;
for (var i = 0; i < PERFORMANCE_TEST_REPEAT; i++) {
var runner = CloudDurableTestRunner.create(
arn("many-async-steps-virtual-thread-pool-example"),
ManyAsyncStepsInput.class,
ManyAsyncStepsOutput.class,
lambdaClient);
var result = runner.run(new ManyAsyncStepsInput(2, steps));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var finalResult = result.getResult();
System.out.printf("ManyAsyncStepsVirtualThreadPoolExample result (%d steps): %s\n", steps, finalResult);
assertNotNull(finalResult);
assertEquals((long) steps * (steps - 1), finalResult.result()); // Sum of 0..steps * 2

// Verify some operations are tracked
assertNotNull(runner.getOperation("compute-0"));
assertNotNull(runner.getOperation("compute-" + (steps - 1)));

if (finalResult.executionTimeMs() < minimalExecutionTimeMs) {
minimalExecutionTimeMs = finalResult.executionTimeMs();
}

if (finalResult.replayTimeMs() < minimalReplayTimeMs) {
minimalReplayTimeMs = finalResult.replayTimeMs();
}
}

assertTrue(minimalReplayTimeMs < maxReplayTime);
assertTrue(minimalExecutionTimeMs < maxExecutionTime);
}

@ParameterizedTest
// OOM if it creates 1000 child contexts
@CsvSource({"100, 1500, 10", "500, 3000, 20"})
Expand All @@ -574,10 +614,10 @@ void testManyAsyncChildContextExample(int steps, long maxExecutionTime, long max
for (var i = 0; i < PERFORMANCE_TEST_REPEAT; i++) {
var runner = CloudDurableTestRunner.create(
arn("many-async-child-context-example"),
ManyAsyncChildContextExample.Input.class,
ManyAsyncChildContextExample.Output.class,
ManyAsyncStepsInput.class,
ManyAsyncStepsOutput.class,
lambdaClient);
var result = runner.run(new ManyAsyncChildContextExample.Input(2, steps));
var result = runner.run(new ManyAsyncStepsInput(2, steps));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;

import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.examples.step.ManyAsyncStepsExample;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsInput;
import software.amazon.lambda.durable.examples.types.ManyAsyncStepsOutput;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

Expand All @@ -15,14 +16,14 @@ class ManyAsyncChildContextExampleTest {
@Test
void testManyAsyncSteps() {
var handler = new ManyAsyncChildContextExample();
var runner = LocalDurableTestRunner.create(ManyAsyncChildContextExample.Input.class, handler);
var runner = LocalDurableTestRunner.create(ManyAsyncStepsInput.class, handler);

var input = new ManyAsyncChildContextExample.Input(2, 500);
var input = new ManyAsyncStepsInput(2, 500);
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var output = result.getResult(ManyAsyncStepsExample.Output.class);
var output = result.getResult(ManyAsyncStepsOutput.class);
assertNotNull(output);

// Sum of 0..499 * 2 = 499 * 500 / 2 * 2 = 249500
Expand All @@ -32,25 +33,23 @@ void testManyAsyncSteps() {
@Test
void testManyAsyncStepsWithDefaultMultiplier() {
var handler = new ManyAsyncChildContextExample();
var runner = LocalDurableTestRunner.create(ManyAsyncChildContextExample.Input.class, handler);
var runner = LocalDurableTestRunner.create(ManyAsyncStepsInput.class, handler);

var input = new ManyAsyncChildContextExample.Input(1, 500);
var input = new ManyAsyncStepsInput(1, 500);
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

// Sum of 0..499 = 499 * 500 / 2 = 124750
assertEquals(
124750,
result.getResult(ManyAsyncChildContextExample.Output.class).result());
assertEquals(124750, result.getResult(ManyAsyncStepsOutput.class).result());
}

@Test
void testOperationsAreTracked() {
var handler = new ManyAsyncChildContextExample();
var runner = LocalDurableTestRunner.create(ManyAsyncChildContextExample.Input.class, handler);
var runner = LocalDurableTestRunner.create(ManyAsyncStepsInput.class, handler);

var result = runner.runUntilComplete(new ManyAsyncChildContextExample.Input(1, 500));
var result = runner.runUntilComplete(new ManyAsyncStepsInput(1, 500));

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

Expand Down
Loading
Loading