diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java
index d0e09a9..360be67 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java
@@ -15,6 +15,8 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.Set;
+import java.util.WeakHashMap;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -591,21 +593,102 @@ private void maybeReportUnexpectedExhaustion(String message) {
/**
* Helper class to manage the lifecycle of conditions with automatic cleanup.
+ *
+ *
Before the aggregate completes, {@link #getFuture()} returns a
+ * fresh {@link CompletableFuture} per call. This matters because
+ * the run loop calls {@code CompletableFuture.anyOf(getFuture(),
+ * synchronizerNext)} on every iteration: if {@code getFuture()} returned
+ * the shared underlying aggregate while it was still pending, each
+ * {@code anyOf} call would permanently attach an {@code OrRelay}
+ * {@code Completion} to its {@code stack}. On a healthy primary
+ * synchronizer that streams ChangeSets without ever arming the fallback
+ * timer, the aggregate never completes, so those Completion nodes would
+ * accumulate monotonically for the synchronizer's full tenure -- a real
+ * memory leak proportional to event rate.
+ *
+ *
After the aggregate completes, {@link #getFuture()} returns the
+ * aggregate directly: any continuation registered on an already-completed
+ * CompletableFuture fires synchronously at registration time and is
+ * removed from the stack immediately by {@code cleanStack}, so the same
+ * accumulation cannot happen.
+ *
+ *
Fresh pre-completion futures are tracked in a {@link WeakHashMap}-backed
+ * set, so a fresh future whose only strong references were in the caller's
+ * loop iteration becomes garbage-collectable -- and automatically removed
+ * from {@code pending} -- once that iteration ends.
+ *
+ *
Package-private (rather than private) so that direct unit tests can
+ * exercise the API surface and assert per-call distinctness.
*/
- private static class Conditions implements AutoCloseable {
+ static class Conditions implements AutoCloseable {
private final List conditions;
- private final CompletableFuture conditionsFuture;
+ private final CompletableFuture aggregate;
+ private final Object lock = new Object();
+
+ /**
+ * Tracks futures previously returned by {@link #getFuture()} that have
+ * not yet been completed. Held weakly via {@link WeakHashMap} so that
+ * fresh futures abandoned by the caller (the typical end-of-iteration
+ * case) become GC-collectable. Set to {@code null} once the aggregate
+ * has fired and the entries have been drained. Mutated only under
+ * {@code lock}.
+ */
+ private Set> pending =
+ Collections.newSetFromMap(new WeakHashMap<>());
public Conditions(List conditions) {
this.conditions = conditions;
- this.conditionsFuture = conditions.isEmpty()
+ this.aggregate = conditions.isEmpty()
? new CompletableFuture<>() // Never completes if no conditions
: CompletableFuture.anyOf(
- conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new));
+ conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new));
+
+ // Single permanent listener. This is the only Completion node ever
+ // attached to aggregate.stack while the aggregate is still pending
+ // -- subsequent pre-completion getFuture() calls do not touch the
+ // aggregate at all.
+ this.aggregate.whenComplete((result, throwable) -> {
+ List> snapshot;
+ synchronized (lock) {
+ if (pending == null) {
+ return;
+ }
+ // Copy under the lock: the ArrayList holds strong
+ // references so entries that survived GC to this point
+ // stay alive until we complete them below.
+ snapshot = new ArrayList<>(pending);
+ pending = null;
+ }
+ for (CompletableFuture cf : snapshot) {
+ if (throwable != null) {
+ cf.completeExceptionally(throwable);
+ } else {
+ cf.complete(result);
+ }
+ }
+ });
}
+ /**
+ * Returns a future that will complete when the underlying aggregate
+ * condition fires. Pre-completion, this is a fresh future per call;
+ * post-completion, this is the aggregate itself (already done).
+ */
public CompletableFuture getFuture() {
- return conditionsFuture;
+ if (aggregate.isDone()) {
+ return aggregate;
+ }
+
+ CompletableFuture fresh = new CompletableFuture<>();
+ synchronized (lock) {
+ if (pending == null) {
+ // Raced with aggregate completion between isDone() and
+ // the lock acquisition; aggregate is now done.
+ return aggregate;
+ }
+ pending.add(fresh);
+ }
+ return fresh;
}
public void inform(FDv2SourceResult result) {
@@ -615,6 +698,11 @@ public void inform(FDv2SourceResult result) {
@Override
public void close() {
conditions.forEach(Condition::close);
+ synchronized (lock) {
+ if (pending != null) {
+ pending.clear();
+ }
+ }
}
}
}
diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java
new file mode 100644
index 0000000..cae52f8
--- /dev/null
+++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java
@@ -0,0 +1,238 @@
+package com.launchdarkly.sdk.server;
+
+import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition;
+import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition.ConditionType;
+import com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition;
+import com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition;
+import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
+import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Direct tests for {@link FDv2DataSource.Conditions}.
+ *
+ * The Conditions class is the aggregator that races fallback/recovery
+ * condition futures against synchronizer.next() in the FDv2DataSource run
+ * loop. Each iteration of that loop calls getFuture() and passes the result to
+ * CompletableFuture.anyOf(...) -- so getFuture() must not return a shared
+ * instance, or every anyOf call permanently attaches a Completion node to the
+ * shared instance's stack, leaking memory proportional to event rate during
+ * the synchronizer's tenure on a healthy primary.
+ */
+public class FDv2DataSourceConditionsAggregateTest {
+ private ScheduledExecutorService executor;
+
+ @Before
+ public void setUp() {
+ executor = Executors.newScheduledThreadPool(1);
+ }
+
+ @After
+ public void tearDown() {
+ executor.shutdownNow();
+ }
+
+ /**
+ * Bug-proving test: getFuture() must return a fresh instance per call.
+ *
+ *
If it returns the same instance (as it did before the fix), the run
+ * loop's per-iteration {@code anyOf(getFuture(), syncNext)} attaches a new
+ * OrRelay Completion to the shared future's stack every iteration, with no
+ * deregister path -- a monotonic leak for a non-firing aggregate.
+ */
+ @Test
+ public void getFutureReturnsDistinctInstancesPerCall() {
+ Condition fallback = new FallbackCondition(executor, 60);
+ try (FDv2DataSource.Conditions conditions =
+ new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
+ CompletableFuture f1 = conditions.getFuture();
+ CompletableFuture f2 = conditions.getFuture();
+ CompletableFuture f3 = conditions.getFuture();
+ assertThat(f1, not(sameInstance(f2)));
+ assertThat(f2, not(sameInstance(f3)));
+ assertThat(f1, not(sameInstance(f3)));
+ }
+ }
+
+ /**
+ * Even with no underlying conditions (a single-synchronizer configuration),
+ * getFuture() must return fresh instances. The aggregate never completes
+ * in this case, which is exactly the scenario where any per-iteration
+ * accumulation would be most damaging.
+ */
+ @Test
+ public void getFutureReturnsDistinctInstancesEvenWithNoConditions() {
+ try (FDv2DataSource.Conditions conditions =
+ new FDv2DataSource.Conditions(Collections.emptyList())) {
+ CompletableFuture f1 = conditions.getFuture();
+ CompletableFuture f2 = conditions.getFuture();
+ assertThat(f1, not(sameInstance(f2)));
+ }
+ }
+
+ /**
+ * Every fresh future returned by getFuture() must complete when the
+ * underlying aggregate fires. The fan-out via the single permanent listener
+ * is what makes the fresh-per-call pattern work; verify it actually
+ * delivers.
+ */
+ @Test
+ public void allFreshFuturesCompleteWhenAggregateFires() throws Exception {
+ // 0-second timeout -> fires on first INTERRUPTED inform.
+ Condition fallback = new FallbackCondition(executor, 0);
+ try (FDv2DataSource.Conditions conditions =
+ new FDv2DataSource.Conditions(Collections.singletonList(fallback))) {
+ CompletableFuture f1 = conditions.getFuture();
+ CompletableFuture f2 = conditions.getFuture();
+ CompletableFuture f3 = conditions.getFuture();
+
+ conditions.inform(makeInterruptedResult());
+
+ Object r1 = f1.get(2, TimeUnit.SECONDS);
+ Object r2 = f2.get(2, TimeUnit.SECONDS);
+ Object r3 = f3.get(2, TimeUnit.SECONDS);
+
+ assertNotNull(r1);
+ assertNotNull(r2);
+ assertNotNull(r3);
+ assertTrue(r1 instanceof Condition);
+ assertTrue(r2 instanceof Condition);
+ assertTrue(r3 instanceof Condition);
+ }
+ }
+
+ /**
+ * Bug-proving test for the null-sentinel issue caught by Cursor Bugbot:
+ * if the underlying aggregate completes exceptionally , every
+ * future returned by getFuture() -- both those handed out before the
+ * exception and those requested after -- must complete exceptionally too.
+ *
+ * Prior to this fix, the "fired" state was tracked via a {@code null}
+ * sentinel on a {@code completedValue} field, which also stayed
+ * {@code null} on exceptional completion. A subsequent getFuture() call
+ * would then return {@code CompletableFuture.completedFuture(null)} --
+ * silently converting an exceptional completion into a normal
+ * {@code null} completion. The run loop's downstream
+ * {@code res.getClass().getName()} would then throw NPE.
+ */
+ @Test
+ public void getFutureFailsExceptionallyWhenAggregateFailsExceptionally()
+ throws Exception {
+ ManualCondition manualCondition = new ManualCondition();
+ try (FDv2DataSource.Conditions conditions =
+ new FDv2DataSource.Conditions(Collections.singletonList(manualCondition))) {
+ // Future requested BEFORE the exceptional completion.
+ CompletableFuture before = conditions.getFuture();
+
+ RuntimeException boom = new RuntimeException("simulated condition failure");
+ manualCondition.future.completeExceptionally(boom);
+
+ // Future requested AFTER the exceptional completion (exercises the
+ // fast path through makeCompletedFuture). This is the case bugbot
+ // caught: pre-fix, it returned completedFuture(null).
+ CompletableFuture after = conditions.getFuture();
+
+ assertThrowsExecutionExceptionWithCause(before, boom);
+ assertThrowsExecutionExceptionWithCause(after, boom);
+ }
+ }
+
+ /**
+ * getFuture() called after the aggregate has already fired returns an
+ * already-completed future synchronously (the fast path).
+ */
+ @Test
+ public void getFutureAfterAggregateFiresReturnsCompletedFuture() throws Exception {
+ // RecoveryCondition arms its timer in the constructor and fires after
+ // the configured timeout. With timeout=0 it fires near-immediately.
+ Condition recovery = new RecoveryCondition(executor, 0);
+ try (FDv2DataSource.Conditions conditions =
+ new FDv2DataSource.Conditions(Collections.singletonList(recovery))) {
+ // Drain a future to confirm the aggregate has fired.
+ conditions.getFuture().get(2, TimeUnit.SECONDS);
+
+ CompletableFuture postFire = conditions.getFuture();
+ assertTrue("post-fire getFuture() should be already complete", postFire.isDone());
+ assertNotNull(postFire.get(0, TimeUnit.SECONDS));
+ }
+ }
+
+ private static FDv2SourceResult makeInterruptedResult() {
+ return FDv2SourceResult.interrupted(
+ new DataSourceStatusProvider.ErrorInfo(
+ DataSourceStatusProvider.ErrorKind.NETWORK_ERROR,
+ 0,
+ "simulated",
+ Instant.now()),
+ false);
+ }
+
+ /**
+ * Asserts that {@code future.get()} throws {@link ExecutionException}
+ * wrapping the expected cause. {@code CompletableFuture#get} surfaces
+ * exceptional completion as ExecutionException with the original
+ * exception as its cause.
+ */
+ private static void assertThrowsExecutionExceptionWithCause(
+ CompletableFuture future,
+ Throwable expectedCause) throws Exception {
+ try {
+ future.get(2, TimeUnit.SECONDS);
+ throw new AssertionError("expected ExecutionException, got normal completion");
+ } catch (ExecutionException ee) {
+ if (ee.getCause() != expectedCause) {
+ throw new AssertionError(
+ "expected cause to be " + expectedCause + " but was " + ee.getCause(), ee);
+ }
+ }
+ }
+
+ /**
+ * Test-only Condition with an externally-controllable future. The
+ * existing FallbackCondition/RecoveryCondition only resolve normally
+ * (with {@code this}); to exercise the exceptional path through the
+ * aggregate's whenComplete listener we need a Condition we can fail
+ * directly.
+ */
+ private static final class ManualCondition
+ implements Condition {
+ final CompletableFuture future = new CompletableFuture<>();
+
+ @Override
+ public CompletableFuture execute() {
+ return future;
+ }
+
+ @Override
+ public void inform(FDv2SourceResult sourceResult) {
+ // Manually controlled; no auto-trigger from inform.
+ }
+
+ @Override
+ public void close() {
+ // No timer to cancel.
+ }
+
+ @Override
+ public ConditionType getType() {
+ return ConditionType.FALLBACK;
+ }
+ }
+}