diff --git a/build.gradle.kts b/build.gradle.kts
index 8626232f..c4e60df9 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -75,10 +75,12 @@ subprojects
!setOf(
"sdk-api",
"sdk-api-gen",
+ "sdk-fake-api",
"examples",
"sdk-aggregated-javadocs",
"admin-client",
- "test-services")
+ "test-services",
+ )
.contains(it.name)
}
.forEach { p -> p.plugins.apply("org.jetbrains.dokka") }
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index af3f0b51..8f998a59 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -12,6 +12,7 @@
testcontainers = 'org.testcontainers:testcontainers:1.20.4'
tink = 'com.google.crypto.tink:tink:1.18.0'
tomcat-annotations = 'org.apache.tomcat:annotations-api:6.0.53'
+ jetbrains-annotations = 'org.jetbrains:annotations:26.0.2-1'
[libraries.jackson-annotations]
module = 'com.fasterxml.jackson.core:jackson-annotations'
diff --git a/sdk-api/build.gradle.kts b/sdk-api/build.gradle.kts
index eb304428..672b6dad 100644
--- a/sdk-api/build.gradle.kts
+++ b/sdk-api/build.gradle.kts
@@ -8,6 +8,7 @@ description = "Restate SDK APIs"
dependencies {
compileOnly(libs.jspecify)
+ compileOnly(libs.jetbrains.annotations)
api(project(":sdk-common"))
api(project(":sdk-serde-jackson"))
diff --git a/sdk-api/src/main/java/dev/restate/sdk/ContextInternal.java b/sdk-api/src/main/java/dev/restate/sdk/ContextInternal.java
new file mode 100644
index 00000000..5fd7b6d7
--- /dev/null
+++ b/sdk-api/src/main/java/dev/restate/sdk/ContextInternal.java
@@ -0,0 +1,23 @@
+// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
+//
+// This file is part of the Restate Java SDK,
+// which is released under the MIT license.
+//
+// You can find a copy of the license in file LICENSE in the root
+// directory of this repository or package, or at
+// https://github.com/restatedev/sdk-java/blob/main/LICENSE
+package dev.restate.sdk;
+
+import dev.restate.sdk.endpoint.definition.HandlerContext;
+import dev.restate.serde.SerdeFactory;
+import java.util.concurrent.Executor;
+
+@org.jetbrains.annotations.ApiStatus.Internal
+public class ContextInternal {
+
+ @org.jetbrains.annotations.ApiStatus.Internal
+ public static WorkflowContext createContext(
+ HandlerContext handlerContext, Executor serviceExecutor, SerdeFactory serdeFactory) {
+ return new ContextImpl(handlerContext, serviceExecutor, serdeFactory);
+ }
+}
diff --git a/sdk-fake-api/build.gradle.kts b/sdk-fake-api/build.gradle.kts
new file mode 100644
index 00000000..c9ddb5b9
--- /dev/null
+++ b/sdk-fake-api/build.gradle.kts
@@ -0,0 +1,19 @@
+plugins {
+ `java-conventions`
+ `java-library`
+ `library-publishing-conventions`
+}
+
+description = "Restate SDK Fake APIs for mocking"
+
+dependencies {
+ compileOnly(libs.jspecify)
+ compileOnly(libs.jetbrains.annotations)
+
+ api(project(":sdk-api"))
+ implementation(project(":common"))
+ implementation(project(":sdk-core"))
+ implementation(project(":sdk-serde-jackson"))
+ implementation(libs.log4j.api)
+ implementation(libs.junit.api)
+}
diff --git a/sdk-fake-api/src/main/java/dev/restate/sdk/fake/ContextExpectations.java b/sdk-fake-api/src/main/java/dev/restate/sdk/fake/ContextExpectations.java
new file mode 100644
index 00000000..1d1b588d
--- /dev/null
+++ b/sdk-fake-api/src/main/java/dev/restate/sdk/fake/ContextExpectations.java
@@ -0,0 +1,199 @@
+// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
+//
+// This file is part of the Restate Java SDK,
+// which is released under the MIT license.
+//
+// You can find a copy of the license in file LICENSE in the root
+// directory of this repository or package, or at
+// https://github.com/restatedev/sdk-java/blob/main/LICENSE
+package dev.restate.sdk.fake;
+
+import dev.restate.sdk.Context;
+import dev.restate.serde.SerdeFactory;
+import dev.restate.serde.jackson.JacksonSerdeFactory;
+import java.time.Duration;
+import java.util.Map;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Expectation configuration for {@code FakeContext}.
+ *
+ *
This record defines the expected behavior and configuration for a fake context used in
+ * testing. It controls various aspects including:
+ *
+ *
+ * - Random seed used by {@link Context#random()}
+ *
- Expectations for {@link Context#run}
+ *
- Timers' completion conditions (for {@link Context#timer})
+ *
+ *
+ * By default, the {@link FakeContext} will execute all {@code ctx.run}.
+ */
+@org.jetbrains.annotations.ApiStatus.Experimental
+public record ContextExpectations(
+ long randomSeed,
+ String invocationId,
+ Map requestHeaders,
+ Map runExpectations,
+ BiPredicate completeTimerIf,
+ SerdeFactory serdeFactory) {
+
+ public ContextExpectations() {
+ this(
+ 1,
+ "inv_1aiqX0vFEFNH1Umgre58JiCLgHfTtztYK5",
+ Map.of(),
+ Map.of(),
+ (i1, i2) -> false,
+ JacksonSerdeFactory.DEFAULT);
+ }
+
+ public enum RunExpectation {
+ /**
+ * @see ContextExpectations#executeRun
+ */
+ PASS,
+ /**
+ * @see ContextExpectations#dontExecuteRun
+ */
+ DONT_EXECUTE,
+ /**
+ * @see ContextExpectations#dontRetryRun
+ */
+ DONT_RETRY,
+ }
+
+ /**
+ * Set the random seed to be used by {@link Context#random()}.
+ *
+ * @param randomSeed the random seed to use
+ */
+ public ContextExpectations withRandomSeed(long randomSeed) {
+ return new ContextExpectations(
+ randomSeed,
+ this.invocationId,
+ this.requestHeaders,
+ this.runExpectations,
+ this.completeTimerIf,
+ this.serdeFactory);
+ }
+
+ /**
+ * Set the invocation id returned by {@code ctx.request().invocationId()}.
+ *
+ * @param invocationId the invocation ID to use
+ */
+ public ContextExpectations withInvocationId(String invocationId) {
+ return new ContextExpectations(
+ this.randomSeed,
+ invocationId,
+ this.requestHeaders,
+ this.runExpectations,
+ this.completeTimerIf,
+ this.serdeFactory);
+ }
+
+ /**
+ * Set the request headers returned by {@code ctx.request().headers()}.
+ *
+ * @param requestHeaders the request headers to use
+ */
+ public ContextExpectations withRequestHeaders(Map requestHeaders) {
+ return new ContextExpectations(
+ this.randomSeed,
+ this.invocationId,
+ requestHeaders,
+ this.runExpectations,
+ this.completeTimerIf,
+ this.serdeFactory);
+ }
+
+ /**
+ * Specify that the run with the given name should be executed.
+ *
+ * The mocked context will try to execute the run, and in case of a failure, the given
+ * exception will be thrown as is.
+ *
+ * @param runName the name of the run that should be executed
+ */
+ public ContextExpectations executeRun(String runName) {
+ return withRunExpectation(runName, RunExpectation.PASS);
+ }
+
+ /**
+ * Specify that the run with the given name should not be retried.
+ *
+ *
The mocked context will try to execute the run, and in case of a failure, the given
+ * exception will be converted to {@link dev.restate.sdk.common.TerminalException}.
+ *
+ *
This is useful when unit testing a saga, and you want to simulate the "catch" branch.
+ *
+ * @param runName the name of the run that should not be retried
+ */
+ public ContextExpectations dontRetryRun(String runName) {
+ return withRunExpectation(runName, RunExpectation.DONT_RETRY);
+ }
+
+ /**
+ * Specify that the run with the given name should not be executed.
+ *
+ *
The mocked context will not execute the run.
+ *
+ *
This is useful when testing a flow where you either want to wait a {@code ctx.run} to
+ * complete, or another event (such as timers)
+ *
+ * @param runName the name of the run that should not be executed
+ */
+ public ContextExpectations dontExecuteRun(String runName) {
+ return withRunExpectation(runName, RunExpectation.DONT_EXECUTE);
+ }
+
+ /** Specify that all timers immediately complete. */
+ public ContextExpectations completeAllTimersImmediately() {
+ return new ContextExpectations(
+ this.randomSeed,
+ this.invocationId,
+ requestHeaders,
+ this.runExpectations,
+ (i1, i2) -> true,
+ this.serdeFactory);
+ }
+
+ /** Specify that the timer with the given name complete as soon as they're created. */
+ public ContextExpectations completeTimerNamed(String timerName) {
+ return completeTimerIf((duration, name) -> timerName.equals(name));
+ }
+
+ /**
+ * Specify that all timers with duration longer than the given value complete as soon as they're
+ * created.
+ */
+ public ContextExpectations completeTimerLongerOrEqualThan(Duration duration) {
+ return completeTimerIf((timerDuration, name) -> timerDuration.compareTo(duration) >= 0);
+ }
+
+ private ContextExpectations withRunExpectation(String runName, RunExpectation expectation) {
+ return new ContextExpectations(
+ this.randomSeed,
+ this.invocationId,
+ requestHeaders,
+ Stream.concat(
+ this.runExpectations.entrySet().stream(),
+ Stream.of(Map.entry(runName, expectation)))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
+ this.completeTimerIf,
+ this.serdeFactory);
+ }
+
+ private ContextExpectations completeTimerIf(BiPredicate completeTimerIf) {
+ return new ContextExpectations(
+ this.randomSeed,
+ this.invocationId,
+ requestHeaders,
+ this.runExpectations,
+ (d, n) -> this.completeTimerIf.test(d, n) || completeTimerIf.test(d, n),
+ this.serdeFactory);
+ }
+}
diff --git a/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeContext.java b/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeContext.java
new file mode 100644
index 00000000..9ca13e61
--- /dev/null
+++ b/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeContext.java
@@ -0,0 +1,152 @@
+// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
+//
+// This file is part of the Restate Java SDK,
+// which is released under the MIT license.
+//
+// You can find a copy of the license in file LICENSE in the root
+// directory of this repository or package, or at
+// https://github.com/restatedev/sdk-java/blob/main/LICENSE
+package dev.restate.sdk.fake;
+
+import dev.restate.common.Request;
+import dev.restate.common.function.ThrowingSupplier;
+import dev.restate.sdk.*;
+import dev.restate.sdk.common.*;
+import dev.restate.serde.TypeTag;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Fake {@link Context}, see {@link ContextExpectations} for more details.
+ *
+ * You can pass this to Restate handlers.
+ *
+ * @see ContextExpectations
+ */
+@org.jetbrains.annotations.ApiStatus.Experimental
+public class FakeContext
+ implements Context, ObjectContext, SharedObjectContext, WorkflowContext, SharedWorkflowContext {
+
+ private final WorkflowContext inner;
+
+ /**
+ * Fake {@link Context} with default expectations.
+ *
+ * @see ContextExpectations
+ */
+ @org.jetbrains.annotations.ApiStatus.Experimental
+ public FakeContext() {
+ this(new ContextExpectations());
+ }
+
+ /**
+ * Fake {@link Context}, see {@link ContextExpectations} for more details.
+ *
+ * @see ContextExpectations
+ */
+ @org.jetbrains.annotations.ApiStatus.Experimental
+ public FakeContext(ContextExpectations expectations) {
+ // The mocking works as follows:
+
+ // FakeContext -- calls --> ContextImpl -- calls --> FakeHandlerContext
+ // * FakeHandlerContext implements the mocking behavior
+ // * ContextImpl is the real implementation of Context, which delegates all the "restate"
+ // operations to the HandlerContext implementation
+ // * FakeContext is just a nice wrapper around ContextImpl for ease of use.
+
+ FakeHandlerContext fakeHandlerContext = new FakeHandlerContext(expectations);
+ this.inner =
+ ContextInternal.createContext(
+ fakeHandlerContext, Runnable::run, expectations.serdeFactory());
+ }
+
+ // -- Delegate to internal interface mock
+
+ @Override
+ public HandlerRequest request() {
+ return inner.request();
+ }
+
+ @Override
+ public CallDurableFuture call(Request request) {
+ return inner.call(request);
+ }
+
+ @Override
+ public InvocationHandle send(Request request, Duration duration) {
+ return inner.send(request, duration);
+ }
+
+ @Override
+ public InvocationHandle invocationHandle(String s, TypeTag typeTag) {
+ return inner.invocationHandle(s, typeTag);
+ }
+
+ @Override
+ public DurableFuture timer(String s, Duration duration) {
+ return inner.timer(s, duration);
+ }
+
+ @Override
+ public DurableFuture runAsync(
+ String s, TypeTag typeTag, RetryPolicy retryPolicy, ThrowingSupplier throwingSupplier)
+ throws TerminalException {
+ return inner.runAsync(s, typeTag, retryPolicy, throwingSupplier);
+ }
+
+ @Override
+ public Awakeable awakeable(TypeTag typeTag) {
+ return inner.awakeable(typeTag);
+ }
+
+ @Override
+ public AwakeableHandle awakeableHandle(String s) {
+ return inner.awakeableHandle(s);
+ }
+
+ @Override
+ public RestateRandom random() {
+ return inner.random();
+ }
+
+ @Override
+ public void clear(StateKey> stateKey) {
+ inner.clear(stateKey);
+ }
+
+ @Override
+ public void clearAll() {
+ inner.clearAll();
+ }
+
+ @Override
+ public void set(StateKey stateKey, T t) {
+ inner.set(stateKey, t);
+ }
+
+ @Override
+ public DurablePromise promise(DurablePromiseKey durablePromiseKey) {
+ return inner.promise(durablePromiseKey);
+ }
+
+ @Override
+ public DurablePromiseHandle promiseHandle(DurablePromiseKey durablePromiseKey) {
+ return inner.promiseHandle(durablePromiseKey);
+ }
+
+ @Override
+ public String key() {
+ return inner.key();
+ }
+
+ @Override
+ public Optional get(StateKey stateKey) {
+ return inner.get(stateKey);
+ }
+
+ @Override
+ public Collection stateKeys() {
+ return inner.stateKeys();
+ }
+}
diff --git a/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeHandlerContext.java b/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeHandlerContext.java
new file mode 100644
index 00000000..419b5162
--- /dev/null
+++ b/sdk-fake-api/src/main/java/dev/restate/sdk/fake/FakeHandlerContext.java
@@ -0,0 +1,388 @@
+// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
+//
+// This file is part of the Restate Java SDK,
+// which is released under the MIT license.
+//
+// You can find a copy of the license in file LICENSE in the root
+// directory of this repository or package, or at
+// https://github.com/restatedev/sdk-java/blob/main/LICENSE
+package dev.restate.sdk.fake;
+
+import dev.restate.common.Output;
+import dev.restate.common.Slice;
+import dev.restate.common.Target;
+import dev.restate.common.function.ThrowingFunction;
+import dev.restate.sdk.common.HandlerRequest;
+import dev.restate.sdk.common.InvocationId;
+import dev.restate.sdk.common.RetryPolicy;
+import dev.restate.sdk.common.TerminalException;
+import dev.restate.sdk.core.ExceptionUtils;
+import dev.restate.sdk.endpoint.definition.AsyncResult;
+import dev.restate.sdk.endpoint.definition.HandlerContext;
+import io.opentelemetry.context.Context;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.junit.jupiter.api.Assertions;
+
+/** Implementation of {@link HandlerContext}, this implements the mocking behavior. */
+@org.jetbrains.annotations.ApiStatus.Experimental
+class FakeHandlerContext implements HandlerContext {
+
+ private final ContextExpectations expectations;
+
+ FakeHandlerContext(ContextExpectations expectations) {
+ this.expectations = expectations;
+ }
+
+ @Override
+ public String objectKey() {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public HandlerRequest request() {
+ return new HandlerRequest(
+ new InvocationId() {
+ @Override
+ public long toRandomSeed() {
+ return expectations.randomSeed();
+ }
+
+ @Override
+ public String toString() {
+ return expectations.invocationId();
+ }
+ },
+ Context.root(),
+ Slice.EMPTY,
+ expectations.requestHeaders());
+ }
+
+ @Override
+ public CompletableFuture writeOutput(Slice slice) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture writeOutput(TerminalException e) {
+ throw e;
+ }
+
+ @Override
+ public CompletableFuture>> get(String s) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture>> getKeys() {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture clear(String s) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture clearAll() {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture set(String s, Slice slice) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture> timer(Duration duration, String s) {
+ CompletableFuture upstreamFuture =
+ expectations.completeTimerIf().test(duration, s)
+ ? CompletableFuture.completedFuture(null)
+ : new CompletableFuture<>();
+ return CompletableFuture.completedFuture(new MockAsyncResult<>(this, upstreamFuture));
+ }
+
+ @Override
+ public CompletableFuture call(
+ Target target, Slice slice, String s, Collection> collection) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture> send(
+ Target target,
+ Slice slice,
+ String s,
+ Collection> collection,
+ Duration duration) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture> submitRun(
+ String runName, Consumer consumer) {
+ var runExpectation =
+ this.expectations
+ .runExpectations()
+ .getOrDefault(runName, ContextExpectations.RunExpectation.PASS);
+
+ if (runExpectation == ContextExpectations.RunExpectation.DONT_EXECUTE) {
+ // Don't run the side effect at all
+ return CompletableFuture.completedFuture(
+ new MockAsyncResult<>(this, new CompletableFuture<>()));
+ }
+
+ CompletableFuture upstreamFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ var innerFut = new CompletableFuture();
+ consumer.accept(
+ new RunCompleter() {
+ @Override
+ public void proposeSuccess(Slice slice) {
+ innerFut.complete(slice);
+ }
+
+ @Override
+ public void proposeFailure(Throwable throwable, RetryPolicy retryPolicy) {
+ if (throwable instanceof TerminalException) {
+ innerFut.completeExceptionally(throwable);
+ } else {
+ if (runExpectation == ContextExpectations.RunExpectation.DONT_RETRY) {
+ if (retryPolicy.getMaxAttempts() == null
+ && retryPolicy.getMaxDuration() == null) {
+ Assertions.fail(
+ "ContextExpectations.dontRetryRun can be used only on ctx.run with a retry policy setting either maxAttempts or maxDuration.\nRestate stops retrying, and rethrows a TerminalException, only when one of those two values (or both) are set.\nCheck https://docs.restate.dev/develop/java/durable-steps#error-handling-and-retry-policies for more details.");
+ }
+ throw new TerminalException(throwable.toString());
+ } else {
+ sneakyThrow(throwable);
+ }
+ }
+ }
+ });
+ return innerFut;
+ })
+ .thenCompose(Function.identity());
+
+ return CompletableFuture.completedFuture(new MockAsyncResult<>(this, upstreamFuture));
+ }
+
+ @Override
+ public CompletableFuture awakeable() {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture resolveAwakeable(String s, Slice slice) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture rejectAwakeable(String s, TerminalException e) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture> promise(String s) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture>> peekPromise(String s) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture> resolvePromise(String s, Slice slice) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture> rejectPromise(String s, TerminalException e) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture cancelInvocation(String s) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture> attachInvocation(String s) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public CompletableFuture>> getInvocationOutput(String s) {
+ throw new UnsupportedOperationException(
+ "FakeHandlerContext doesn't currently support mocking this operation");
+ }
+
+ @Override
+ public void fail(Throwable throwable) {
+ sneakyThrow(throwable);
+ }
+
+ @Override
+ public AsyncResult createAnyAsyncResult(List> list) {
+ CompletableFuture upstreamFuture = new CompletableFuture<>();
+ for (int i = 0; i < list.size(); i++) {
+ int finalI = i;
+ list.get(i)
+ .poll()
+ .whenComplete(
+ (res, t) -> {
+ if (t != null && !(t instanceof TerminalException)) {
+ upstreamFuture.completeExceptionally(t);
+ } else {
+ upstreamFuture.complete(finalI);
+ }
+ });
+ }
+ return new MockAsyncResult<>(this, upstreamFuture);
+ }
+
+ @Override
+ public AsyncResult createAllAsyncResult(List> list) {
+ CompletableFuture upstreamFuture = new CompletableFuture<>();
+ AtomicInteger completedCount = new AtomicInteger(0);
+ for (int i = 0; i < list.size(); i++) {
+ list.get(i)
+ .poll()
+ .whenComplete(
+ (res, t) -> {
+ if (t != null && !(t instanceof TerminalException)) {
+ upstreamFuture.completeExceptionally(t);
+ } else {
+ var completed = completedCount.addAndGet(1);
+ if (completed == list.size()) {
+ upstreamFuture.complete(null);
+ }
+ }
+ });
+ }
+ return new MockAsyncResult<>(this, upstreamFuture);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void sneakyThrow(Object e) throws E {
+ throw (E) e;
+ }
+
+ private static class MockAsyncResult implements AsyncResult {
+ private final CompletableFuture upstreamFuture;
+ private final HandlerContext ctx;
+
+ private MockAsyncResult(HandlerContext ctx, CompletableFuture upstreamFuture) {
+ this.upstreamFuture = upstreamFuture;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public CompletableFuture poll() {
+ return upstreamFuture;
+ }
+
+ @Override
+ public HandlerContext ctx() {
+ return ctx;
+ }
+
+ @Override
+ public AsyncResult map(
+ ThrowingFunction> successMapper,
+ ThrowingFunction> failureMapper) {
+ CompletableFuture downstreamFuture = new CompletableFuture<>();
+
+ upstreamFuture.whenComplete(
+ (t, throwable) -> {
+ if (ExceptionUtils.isTerminalException(throwable)) {
+ // Upstream future failed with Terminal exception
+ if (failureMapper != null) {
+ try {
+ failureMapper
+ .apply((TerminalException) throwable)
+ .whenCompleteAsync(
+ (u, mapperT) -> {
+ if (ExceptionUtils.isTerminalException(mapperT)) {
+ downstreamFuture.completeExceptionally(mapperT);
+ } else if (mapperT != null) {
+ downstreamFuture.completeExceptionally(mapperT);
+ } else {
+ downstreamFuture.complete(u);
+ }
+ });
+ } catch (Throwable mapperT) {
+ if (ExceptionUtils.isTerminalException(mapperT)) {
+ downstreamFuture.completeExceptionally(mapperT);
+ } else {
+ downstreamFuture.completeExceptionally(mapperT);
+ }
+ }
+ } else {
+ downstreamFuture.completeExceptionally(throwable);
+ }
+ } else if (throwable != null) {
+ // Aborted exception/some other exception. Just propagate it through
+ downstreamFuture.completeExceptionally(throwable);
+ } else {
+ // Success case!
+ if (successMapper != null) {
+ try {
+ successMapper
+ .apply(t)
+ .whenCompleteAsync(
+ (u, mapperT) -> {
+ if (ExceptionUtils.isTerminalException(mapperT)) {
+ downstreamFuture.completeExceptionally(mapperT);
+ } else if (mapperT != null) {
+ downstreamFuture.completeExceptionally(mapperT);
+ } else {
+ downstreamFuture.complete(u);
+ }
+ });
+ } catch (Throwable mapperT) {
+ if (ExceptionUtils.isTerminalException(mapperT)) {
+ downstreamFuture.completeExceptionally(mapperT);
+ } else {
+ downstreamFuture.completeExceptionally(mapperT);
+ }
+ }
+ } else {
+ // Type checked by the API itself
+ //noinspection unchecked
+ downstreamFuture.complete((U) t);
+ }
+ }
+ });
+
+ return new MockAsyncResult<>(this.ctx, downstreamFuture);
+ }
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index c08a7125..23a1c617 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -30,6 +30,7 @@ include(
"sdk-api-gen",
"sdk-api-kotlin-gen",
"sdk-spring-boot",
+ "sdk-fake-api",
// Other modules we don't publish
"examples",