From 1cd29b01cf76bfbef35a297a355c960c65a128fd Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 15 May 2026 14:06:33 -0700 Subject: [PATCH 1/4] fix: stop FDv2DataSource.Conditions from leaking on healthy primary FDv2DataSource's run loop calls CompletableFuture.anyOf(conditions.getFuture(), synchronizer.next()).get() on every iteration. Before this change, getFuture() returned the same shared CompletableFuture instance to every caller. Each anyOf call attaches an OrRelay Completion node to the shared instance's stack; CompletableFuture has no deregister path for the loser of a race, so the OrRelay stays on the stack until the shared future completes. The shared future only completes when fallback or recovery fires. On a healthy primary streaming ChangeSets, fallback is never armed and recovery is suppressed (only-available-synchronizer / single-prime configurations). The future never completes; the stack grows monotonically for the synchronizer's full tenure -- effectively the SDK's uptime on a healthy server. Per-iteration cost ~200 B: an OrRelay Completion plus the anyOf result CompletableFuture plus the chain references back to the inputs. At 10 ChangeSets/sec sustained that is ~150 MB/day per active synchronizer. The fix: a single permanent whenComplete listener on the underlying aggregate fans out completion to every fresh future handed out by getFuture(). Pending fresh futures are tracked via WeakReference, so a fresh future whose only strong references were the caller's local variables (typical lifetime: one loop iteration) becomes garbage-collectable once that iteration ends. Pending entries whose referent has been collected are pruned opportunistically on each getFuture() call and on close(). Conditions is now package-private rather than private so the new direct unit tests can reach it. Adds a test-only pendingSize() helper. Verified bug-proving discipline: two of the new tests (getFutureReturnsDistinctInstancesPerCall, getFutureReturnsDistinctInstancesEvenWithNoConditions) fail on the pre-fix shared-instance behavior and pass after the fix. Full server-sdk test suite (1857 tests) is clean. --- .../sdk/server/FDv2DataSource.java | 131 +++++++++++- ...FDv2DataSourceConditionsAggregateTest.java | 202 ++++++++++++++++++ 2 files changed, 328 insertions(+), 5 deletions(-) create mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index d0e09a9..cfca59f 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -11,9 +11,11 @@ import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -591,21 +593,115 @@ private void maybeReportUnexpectedExhaustion(String message) { /** * Helper class to manage the lifecycle of conditions with automatic cleanup. + * + *

{@link #getFuture()} returns a fresh {@link CompletableFuture} + * per call rather than returning the same shared instance. This matters + * because the run loop calls {@code CompletableFuture.anyOf(getFuture(), + * synchronizerNext)} on every iteration: if {@code getFuture()} returned a + * shared instance, each {@code anyOf} call would permanently attach an + * {@code OrRelay} {@code Completion} to its {@code stack}. On a healthy + * primary synchronizer that streams ChangeSets without ever arming the + * fallback timer, the aggregate never completes, so those Completion nodes + * accumulate monotonically for the synchronizer's full tenure -- a real + * memory leak proportional to event rate. + * + *

The fix: a single permanent listener on the underlying aggregate fans + * out completion to every fresh future handed out by {@link #getFuture()}. + * Fresh futures are tracked via {@link WeakReference} on a pending list, so + * a fresh future whose only strong references were in the caller's loop + * iteration becomes garbage-collectable once that iteration ends. Pending + * entries whose referent has been collected are pruned opportunistically on + * subsequent {@code getFuture()} calls and on {@link #close()}. + * + *

Package-private (rather than private) so that direct unit tests can + * exercise the API surface and assert per-call distinctness. */ - private static class Conditions implements AutoCloseable { + static class Conditions implements AutoCloseable { private final List conditions; - private final CompletableFuture conditionsFuture; + private final CompletableFuture aggregate; + private final Object lock = new Object(); + + /** + * Holds the value the aggregate completed with, once it has completed. + * {@code volatile} so the fast path in {@link #getFuture()} avoids + * taking the lock. Set under {@code lock} together with clearing + * {@code pending} so the two stay consistent. + */ + private volatile Object completedValue; + + /** + * Tracks futures previously returned by {@link #getFuture()} that have + * not yet been completed. {@code null} once the aggregate has fired + * (and all pending entries have been drained). Mutated only under + * {@code lock}. + */ + private List>> pending = new ArrayList<>(); public Conditions(List conditions) { this.conditions = conditions; - this.conditionsFuture = conditions.isEmpty() + this.aggregate = conditions.isEmpty() ? new CompletableFuture<>() // Never completes if no conditions : CompletableFuture.anyOf( - conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new)); + conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new)); + + // Single permanent listener. This is the only Completion node ever + // attached to aggregate.stack -- subsequent getFuture() calls do + // not touch the aggregate at all. + this.aggregate.whenComplete((result, throwable) -> { + List>> snapshot; + synchronized (lock) { + completedValue = (throwable == null) ? result : null; + snapshot = pending; + pending = null; + } + if (snapshot == null) { + return; + } + for (WeakReference> ref : snapshot) { + CompletableFuture cf = ref.get(); + if (cf == null) { + continue; // Already GC'd -- nothing to complete. + } + if (throwable != null) { + cf.completeExceptionally(throwable); + } else { + cf.complete(result); + } + } + }); } + /** + * Returns a fresh future that will complete when the underlying + * aggregate condition fires, or an already-completed future if the + * aggregate has already fired by the time this method is called. + */ public CompletableFuture getFuture() { - return conditionsFuture; + Object v = completedValue; + if (v != null) { + return CompletableFuture.completedFuture(v); + } + + CompletableFuture fresh = new CompletableFuture<>(); + synchronized (lock) { + if (pending == null) { + // Raced with aggregate completion. completedValue is now + // guaranteed populated (set under lock before pending was + // nulled). + return CompletableFuture.completedFuture(completedValue); + } + // Opportunistic prune of weak refs whose target has been + // collected. Keeps pending bounded even if the aggregate never + // fires. + Iterator>> it = pending.iterator(); + while (it.hasNext()) { + if (it.next().get() == null) { + it.remove(); + } + } + pending.add(new WeakReference<>(fresh)); + } + return fresh; } public void inform(FDv2SourceResult result) { @@ -615,6 +711,31 @@ public void inform(FDv2SourceResult result) { @Override public void close() { conditions.forEach(Condition::close); + synchronized (lock) { + if (pending != null) { + pending.clear(); + } + } + } + + /** + * Test-only: snapshot of the current pending list size after + * opportunistic pruning. Used by tests to assert that the pending list + * does not grow unboundedly across iterations. + */ + int pendingSize() { + synchronized (lock) { + if (pending == null) { + return 0; + } + Iterator>> it = pending.iterator(); + while (it.hasNext()) { + if (it.next().get() == null) { + it.remove(); + } + } + return pending.size(); + } } } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java new file mode 100644 index 0000000..2e2fdda --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java @@ -0,0 +1,202 @@ +package com.launchdarkly.sdk.server; + +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition; +import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Instant; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Direct tests for {@link FDv2DataSource.Conditions}. + * + *

The Conditions class is the aggregator that races fallback/recovery + * condition futures against synchronizer.next() in the FDv2DataSource run + * loop. Each iteration of that loop calls getFuture() and passes the result to + * CompletableFuture.anyOf(...) -- so getFuture() must not return a shared + * instance, or every anyOf call permanently attaches a Completion node to the + * shared instance's stack, leaking memory proportional to event rate during + * the synchronizer's tenure on a healthy primary. + */ +public class FDv2DataSourceConditionsAggregateTest { + private ScheduledExecutorService executor; + + @Before + public void setUp() { + executor = Executors.newScheduledThreadPool(1); + } + + @After + public void tearDown() { + executor.shutdownNow(); + } + + /** + * Bug-proving test: getFuture() must return a fresh instance per call. + * + *

If it returns the same instance (as it did before the fix), the run + * loop's per-iteration {@code anyOf(getFuture(), syncNext)} attaches a new + * OrRelay Completion to the shared future's stack every iteration, with no + * deregister path -- a monotonic leak for a non-firing aggregate. + */ + @Test + public void getFutureReturnsDistinctInstancesPerCall() { + Condition fallback = new FallbackCondition(executor, 60); + try (FDv2DataSource.Conditions conditions = + new FDv2DataSource.Conditions(Collections.singletonList(fallback))) { + CompletableFuture f1 = conditions.getFuture(); + CompletableFuture f2 = conditions.getFuture(); + CompletableFuture f3 = conditions.getFuture(); + assertThat(f1, not(sameInstance(f2))); + assertThat(f2, not(sameInstance(f3))); + assertThat(f1, not(sameInstance(f3))); + } + } + + /** + * Even with no underlying conditions (a single-synchronizer configuration), + * getFuture() must return fresh instances. The aggregate never completes + * in this case, which is exactly the scenario where any per-iteration + * accumulation would be most damaging. + */ + @Test + public void getFutureReturnsDistinctInstancesEvenWithNoConditions() { + try (FDv2DataSource.Conditions conditions = + new FDv2DataSource.Conditions(Collections.emptyList())) { + CompletableFuture f1 = conditions.getFuture(); + CompletableFuture f2 = conditions.getFuture(); + assertThat(f1, not(sameInstance(f2))); + } + } + + /** + * Every fresh future returned by getFuture() must complete when the + * underlying aggregate fires. The fan-out via the single permanent listener + * is what makes the fresh-per-call pattern work; verify it actually + * delivers. + */ + @Test + public void allFreshFuturesCompleteWhenAggregateFires() throws Exception { + // 0-second timeout -> fires on first INTERRUPTED inform. + Condition fallback = new FallbackCondition(executor, 0); + try (FDv2DataSource.Conditions conditions = + new FDv2DataSource.Conditions(Collections.singletonList(fallback))) { + CompletableFuture f1 = conditions.getFuture(); + CompletableFuture f2 = conditions.getFuture(); + CompletableFuture f3 = conditions.getFuture(); + + conditions.inform(makeInterruptedResult()); + + Object r1 = f1.get(2, TimeUnit.SECONDS); + Object r2 = f2.get(2, TimeUnit.SECONDS); + Object r3 = f3.get(2, TimeUnit.SECONDS); + + assertNotNull(r1); + assertNotNull(r2); + assertNotNull(r3); + assertTrue(r1 instanceof Condition); + assertTrue(r2 instanceof Condition); + assertTrue(r3 instanceof Condition); + } + } + + /** + * getFuture() called after the aggregate has already fired returns an + * already-completed future synchronously (the fast path). + */ + @Test + public void getFutureAfterAggregateFiresReturnsCompletedFuture() throws Exception { + // RecoveryCondition arms its timer in the constructor and fires after + // the configured timeout. With timeout=0 it fires near-immediately. + Condition recovery = new RecoveryCondition(executor, 0); + try (FDv2DataSource.Conditions conditions = + new FDv2DataSource.Conditions(Collections.singletonList(recovery))) { + // Drain a future to confirm the aggregate has fired. + conditions.getFuture().get(2, TimeUnit.SECONDS); + + CompletableFuture postFire = conditions.getFuture(); + assertTrue("post-fire getFuture() should be already complete", postFire.isDone()); + assertNotNull(postFire.get(0, TimeUnit.SECONDS)); + } + } + + /** + * Bug-proving test for the underlying leak: repeated getFuture() calls + * whose returned futures are then dropped (the run-loop pattern: each + * iteration's anyOf result becomes garbage at end of iteration) must NOT + * cause the pending list to grow without bound. The opportunistic prune + * inside getFuture() collects entries whose WeakReference target has been + * collected. + * + *

Java does not guarantee that {@link System#gc()} actually runs, but + * in practice with HotSpot's default GC plus a brief sleep this is + * reliable. If it ever flakes on CI, increase the iteration count or the + * sleep, or migrate to a {@code -XX:+UseSerialGC} test profile. + */ + @Test + public void pendingListDoesNotGrowUnboundedlyWhenFreshFuturesAreDropped() + throws Exception { + Condition fallback = new FallbackCondition(executor, 60); // never fires + try (FDv2DataSource.Conditions conditions = + new FDv2DataSource.Conditions(Collections.singletonList(fallback))) { + int iterations = 10_000; + for (int i = 0; i < iterations; i++) { + CompletableFuture f = conditions.getFuture(); + // Simulate the run loop: race f against a fast-resolving sibling. + // The anyOf result is awaited and discarded; f becomes unreachable + // at end of iteration. + CompletableFuture sibling = CompletableFuture.completedFuture("ok"); + CompletableFuture.anyOf(f, sibling).get(1, TimeUnit.SECONDS); + // f goes out of scope here. + + // Periodically encourage GC + give the cleanup path a chance. + if (i % 1000 == 999) { + System.gc(); + Thread.sleep(10); + } + } + System.gc(); + Thread.sleep(50); + + // After 10k iterations, pendingSize() should not be anywhere near + // 10k. The opportunistic prune inside getFuture() runs on every + // call, so any entry whose WeakReference has been collected drops + // out. A small handful (< 100) of recently-added live refs is + // expected because the most recent iterations may not yet have + // been GC'd. Choose a generous ceiling to avoid CI flakiness while + // still being orders of magnitude below the pre-fix accumulation. + int finalSize = conditions.pendingSize(); + assertThat( + "pending list size should be bounded; was " + finalSize + + " after " + iterations + " iterations", + finalSize, lessThanOrEqualTo(500)); + } + } + + private static FDv2SourceResult makeInterruptedResult() { + return FDv2SourceResult.interrupted( + new DataSourceStatusProvider.ErrorInfo( + DataSourceStatusProvider.ErrorKind.NETWORK_ERROR, + 0, + "simulated", + Instant.now()), + false); + } +} From 536176eb0ce1c083abace9e1a24ef0c38de6abda Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 15 May 2026 14:11:45 -0700 Subject: [PATCH 2/4] test: drop GC-dependent soak test and its production helper The pendingListDoesNotGrowUnboundedlyWhenFreshFuturesAreDropped test needed System.gc() + Thread.sleep to encourage reclamation, which is brittle. The two distinctness tests are sufficient bug-provers for the shared-instance behavior they fail on, so drop the soak test and the test-only pendingSize() helper that supported it. --- .../sdk/server/FDv2DataSource.java | 20 ------- ...FDv2DataSourceConditionsAggregateTest.java | 54 ------------------- 2 files changed, 74 deletions(-) diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index cfca59f..2e86daa 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -717,25 +717,5 @@ public void close() { } } } - - /** - * Test-only: snapshot of the current pending list size after - * opportunistic pruning. Used by tests to assert that the pending list - * does not grow unboundedly across iterations. - */ - int pendingSize() { - synchronized (lock) { - if (pending == null) { - return 0; - } - Iterator>> it = pending.iterator(); - while (it.hasNext()) { - if (it.next().get() == null) { - it.remove(); - } - } - return pending.size(); - } - } } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java index 2e2fdda..e786c0d 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java @@ -18,7 +18,6 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertNotNull; @@ -137,59 +136,6 @@ public void getFutureAfterAggregateFiresReturnsCompletedFuture() throws Exceptio } } - /** - * Bug-proving test for the underlying leak: repeated getFuture() calls - * whose returned futures are then dropped (the run-loop pattern: each - * iteration's anyOf result becomes garbage at end of iteration) must NOT - * cause the pending list to grow without bound. The opportunistic prune - * inside getFuture() collects entries whose WeakReference target has been - * collected. - * - *

Java does not guarantee that {@link System#gc()} actually runs, but - * in practice with HotSpot's default GC plus a brief sleep this is - * reliable. If it ever flakes on CI, increase the iteration count or the - * sleep, or migrate to a {@code -XX:+UseSerialGC} test profile. - */ - @Test - public void pendingListDoesNotGrowUnboundedlyWhenFreshFuturesAreDropped() - throws Exception { - Condition fallback = new FallbackCondition(executor, 60); // never fires - try (FDv2DataSource.Conditions conditions = - new FDv2DataSource.Conditions(Collections.singletonList(fallback))) { - int iterations = 10_000; - for (int i = 0; i < iterations; i++) { - CompletableFuture f = conditions.getFuture(); - // Simulate the run loop: race f against a fast-resolving sibling. - // The anyOf result is awaited and discarded; f becomes unreachable - // at end of iteration. - CompletableFuture sibling = CompletableFuture.completedFuture("ok"); - CompletableFuture.anyOf(f, sibling).get(1, TimeUnit.SECONDS); - // f goes out of scope here. - - // Periodically encourage GC + give the cleanup path a chance. - if (i % 1000 == 999) { - System.gc(); - Thread.sleep(10); - } - } - System.gc(); - Thread.sleep(50); - - // After 10k iterations, pendingSize() should not be anywhere near - // 10k. The opportunistic prune inside getFuture() runs on every - // call, so any entry whose WeakReference has been collected drops - // out. A small handful (< 100) of recently-added live refs is - // expected because the most recent iterations may not yet have - // been GC'd. Choose a generous ceiling to avoid CI flakiness while - // still being orders of magnitude below the pre-fix accumulation. - int finalSize = conditions.pendingSize(); - assertThat( - "pending list size should be bounded; was " + finalSize - + " after " + iterations + " iterations", - finalSize, lessThanOrEqualTo(500)); - } - } - private static FDv2SourceResult makeInterruptedResult() { return FDv2SourceResult.interrupted( new DataSourceStatusProvider.ErrorInfo( From 6a63b7d40e4b866c93b05990b1367176bf4c4e14 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 15 May 2026 14:27:53 -0700 Subject: [PATCH 3/4] fix: propagate exceptional aggregate completion through getFuture Bugbot pointed out that completedValue (volatile Object) used null as a sentinel for "not yet fired", and the whenComplete listener also stored null on exceptional completion. After exceptional completion a subsequent getFuture() entered the lock, saw pending == null (drained by the listener), and returned CompletableFuture.completedFuture(null) -- silently converting the exception into a null result. The run loop would then NPE on res.getClass(). Replace the null-sentinel pattern with an explicit volatile boolean isFired plus separate firedResult/firedThrowable fields. A new makeCompletedFuture() helper builds a fresh completed future mirroring whichever terminal state the aggregate reached. Adds a bug-proving test (getFutureFailsExceptionallyWhenAggregateFails- Exceptionally) that drives a manually-controlled condition to completeExceptionally and asserts both pre- and post-firing getFuture() results throw ExecutionException with the original cause. Verified the test fails on the pre-fix null-sentinel behavior ("expected ExecutionException, got normal completion"). --- .../sdk/server/FDv2DataSource.java | 64 +++++++++---- ...FDv2DataSourceConditionsAggregateTest.java | 90 +++++++++++++++++++ 2 files changed, 139 insertions(+), 15 deletions(-) diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index 2e86daa..a64b0d2 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -622,12 +622,30 @@ static class Conditions implements AutoCloseable { private final Object lock = new Object(); /** - * Holds the value the aggregate completed with, once it has completed. - * {@code volatile} so the fast path in {@link #getFuture()} avoids - * taking the lock. Set under {@code lock} together with clearing - * {@code pending} so the two stay consistent. + * Set to {@code true} once the aggregate has completed (either + * normally or exceptionally). {@code volatile} so the fast path in + * {@link #getFuture()} avoids taking the lock. Set under {@code lock} + * together with populating {@code firedResult}/{@code firedThrowable} + * and clearing {@code pending}, so a reader that observes + * {@code isFired == true} also observes the corresponding values via + * the JMM happens-before edge. */ - private volatile Object completedValue; + private volatile boolean isFired; + + /** + * Result value the aggregate completed with, or {@code null} if it + * completed exceptionally. Only meaningful when {@code isFired} is + * true. Written under {@code lock}; readable without the lock once + * {@code isFired} has been observed true (volatile happens-before). + */ + private Object firedResult; + + /** + * Throwable the aggregate completed exceptionally with, or + * {@code null} if it completed normally. Same visibility rules as + * {@code firedResult}. + */ + private Throwable firedThrowable; /** * Tracks futures previously returned by {@link #getFuture()} that have @@ -650,7 +668,9 @@ public Conditions(List conditions) { this.aggregate.whenComplete((result, throwable) -> { List>> snapshot; synchronized (lock) { - completedValue = (throwable == null) ? result : null; + firedResult = result; + firedThrowable = throwable; + isFired = true; snapshot = pending; pending = null; } @@ -673,22 +693,22 @@ public Conditions(List conditions) { /** * Returns a fresh future that will complete when the underlying - * aggregate condition fires, or an already-completed future if the - * aggregate has already fired by the time this method is called. + * aggregate condition fires, or an already-completed future (normal or + * exceptional) if the aggregate has already fired by the time this + * method is called. */ public CompletableFuture getFuture() { - Object v = completedValue; - if (v != null) { - return CompletableFuture.completedFuture(v); + if (isFired) { + return makeCompletedFuture(); } CompletableFuture fresh = new CompletableFuture<>(); synchronized (lock) { if (pending == null) { - // Raced with aggregate completion. completedValue is now - // guaranteed populated (set under lock before pending was - // nulled). - return CompletableFuture.completedFuture(completedValue); + // Raced with aggregate completion. isFired is now + // guaranteed true and firedResult/firedThrowable are + // populated (set under lock before pending was nulled). + return makeCompletedFuture(); } // Opportunistic prune of weak refs whose target has been // collected. Keeps pending bounded even if the aggregate never @@ -717,5 +737,19 @@ public void close() { } } } + + /** + * Materializes a new already-completed CompletableFuture mirroring + * whichever terminal state {@link #aggregate} reached. Caller must + * have observed {@code isFired == true}. + */ + private CompletableFuture makeCompletedFuture() { + if (firedThrowable != null) { + CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(firedThrowable); + return cf; + } + return CompletableFuture.completedFuture(firedResult); + } } } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java index e786c0d..cae52f8 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/FDv2DataSourceConditionsAggregateTest.java @@ -1,6 +1,7 @@ package com.launchdarkly.sdk.server; import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition; +import com.launchdarkly.sdk.server.FDv2DataSourceConditions.Condition.ConditionType; import com.launchdarkly.sdk.server.FDv2DataSourceConditions.FallbackCondition; import com.launchdarkly.sdk.server.FDv2DataSourceConditions.RecoveryCondition; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; @@ -13,6 +14,7 @@ import java.time.Instant; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -116,6 +118,42 @@ public void allFreshFuturesCompleteWhenAggregateFires() throws Exception { } } + /** + * Bug-proving test for the null-sentinel issue caught by Cursor Bugbot: + * if the underlying aggregate completes exceptionally, every + * future returned by getFuture() -- both those handed out before the + * exception and those requested after -- must complete exceptionally too. + * + *

Prior to this fix, the "fired" state was tracked via a {@code null} + * sentinel on a {@code completedValue} field, which also stayed + * {@code null} on exceptional completion. A subsequent getFuture() call + * would then return {@code CompletableFuture.completedFuture(null)} -- + * silently converting an exceptional completion into a normal + * {@code null} completion. The run loop's downstream + * {@code res.getClass().getName()} would then throw NPE. + */ + @Test + public void getFutureFailsExceptionallyWhenAggregateFailsExceptionally() + throws Exception { + ManualCondition manualCondition = new ManualCondition(); + try (FDv2DataSource.Conditions conditions = + new FDv2DataSource.Conditions(Collections.singletonList(manualCondition))) { + // Future requested BEFORE the exceptional completion. + CompletableFuture before = conditions.getFuture(); + + RuntimeException boom = new RuntimeException("simulated condition failure"); + manualCondition.future.completeExceptionally(boom); + + // Future requested AFTER the exceptional completion (exercises the + // fast path through makeCompletedFuture). This is the case bugbot + // caught: pre-fix, it returned completedFuture(null). + CompletableFuture after = conditions.getFuture(); + + assertThrowsExecutionExceptionWithCause(before, boom); + assertThrowsExecutionExceptionWithCause(after, boom); + } + } + /** * getFuture() called after the aggregate has already fired returns an * already-completed future synchronously (the fast path). @@ -145,4 +183,56 @@ private static FDv2SourceResult makeInterruptedResult() { Instant.now()), false); } + + /** + * Asserts that {@code future.get()} throws {@link ExecutionException} + * wrapping the expected cause. {@code CompletableFuture#get} surfaces + * exceptional completion as ExecutionException with the original + * exception as its cause. + */ + private static void assertThrowsExecutionExceptionWithCause( + CompletableFuture future, + Throwable expectedCause) throws Exception { + try { + future.get(2, TimeUnit.SECONDS); + throw new AssertionError("expected ExecutionException, got normal completion"); + } catch (ExecutionException ee) { + if (ee.getCause() != expectedCause) { + throw new AssertionError( + "expected cause to be " + expectedCause + " but was " + ee.getCause(), ee); + } + } + } + + /** + * Test-only Condition with an externally-controllable future. The + * existing FallbackCondition/RecoveryCondition only resolve normally + * (with {@code this}); to exercise the exceptional path through the + * aggregate's whenComplete listener we need a Condition we can fail + * directly. + */ + private static final class ManualCondition + implements Condition { + final CompletableFuture future = new CompletableFuture<>(); + + @Override + public CompletableFuture execute() { + return future; + } + + @Override + public void inform(FDv2SourceResult sourceResult) { + // Manually controlled; no auto-trigger from inform. + } + + @Override + public void close() { + // No timer to cancel. + } + + @Override + public ConditionType getType() { + return ConditionType.FALLBACK; + } + } } From a6c7c36ac2a1376ad249becdb03570acc3429920 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 15 May 2026 16:09:28 -0700 Subject: [PATCH 4/4] refactor: simplify Conditions per review feedback Two simplifications suggested by @beekld: 1. Return aggregate directly when it has already completed. Continuations registered on an already-completed CompletableFuture fire synchronously at registration time and are removed from the stack immediately by cleanStack, so the original leak (per-iteration anyOf accumulation on a never-completing aggregate) cannot re-occur in the post-completion path. Drops the isFired flag, firedResult/firedThrowable fields, and the makeCompletedFuture helper. 2. Replace the WeakReference list with a WeakHashMap-backed Set. Fresh pending futures get GC'd automatically when the caller's loop iteration drops its strong reference, with no manual prune loop in getFuture(). All five aggregate tests still pass; full server-sdk suite (1857 tests) still passes. Verified bug-proving discipline: temporarily reverting getFuture() to the pre-fix shared-instance behavior makes the two distinctness tests + the exceptional-path test go red, exactly as before. --- .../sdk/server/FDv2DataSource.java | 143 ++++++------------ 1 file changed, 48 insertions(+), 95 deletions(-) diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index a64b0d2..360be67 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -11,12 +11,12 @@ import com.launchdarkly.sdk.server.subsystems.DataSource; import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; -import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.Iterator; import java.util.List; +import java.util.Set; +import java.util.WeakHashMap; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -594,24 +594,28 @@ private void maybeReportUnexpectedExhaustion(String message) { /** * Helper class to manage the lifecycle of conditions with automatic cleanup. * - *

{@link #getFuture()} returns a fresh {@link CompletableFuture} - * per call rather than returning the same shared instance. This matters - * because the run loop calls {@code CompletableFuture.anyOf(getFuture(), - * synchronizerNext)} on every iteration: if {@code getFuture()} returned a - * shared instance, each {@code anyOf} call would permanently attach an - * {@code OrRelay} {@code Completion} to its {@code stack}. On a healthy - * primary synchronizer that streams ChangeSets without ever arming the - * fallback timer, the aggregate never completes, so those Completion nodes + *

Before the aggregate completes, {@link #getFuture()} returns a + * fresh {@link CompletableFuture} per call. This matters because + * the run loop calls {@code CompletableFuture.anyOf(getFuture(), + * synchronizerNext)} on every iteration: if {@code getFuture()} returned + * the shared underlying aggregate while it was still pending, each + * {@code anyOf} call would permanently attach an {@code OrRelay} + * {@code Completion} to its {@code stack}. On a healthy primary + * synchronizer that streams ChangeSets without ever arming the fallback + * timer, the aggregate never completes, so those Completion nodes would * accumulate monotonically for the synchronizer's full tenure -- a real * memory leak proportional to event rate. * - *

The fix: a single permanent listener on the underlying aggregate fans - * out completion to every fresh future handed out by {@link #getFuture()}. - * Fresh futures are tracked via {@link WeakReference} on a pending list, so - * a fresh future whose only strong references were in the caller's loop - * iteration becomes garbage-collectable once that iteration ends. Pending - * entries whose referent has been collected are pruned opportunistically on - * subsequent {@code getFuture()} calls and on {@link #close()}. + *

After the aggregate completes, {@link #getFuture()} returns the + * aggregate directly: any continuation registered on an already-completed + * CompletableFuture fires synchronously at registration time and is + * removed from the stack immediately by {@code cleanStack}, so the same + * accumulation cannot happen. + * + *

Fresh pre-completion futures are tracked in a {@link WeakHashMap}-backed + * set, so a fresh future whose only strong references were in the caller's + * loop iteration becomes garbage-collectable -- and automatically removed + * from {@code pending} -- once that iteration ends. * *

Package-private (rather than private) so that direct unit tests can * exercise the API surface and assert per-call distinctness. @@ -621,39 +625,16 @@ static class Conditions implements AutoCloseable { private final CompletableFuture aggregate; private final Object lock = new Object(); - /** - * Set to {@code true} once the aggregate has completed (either - * normally or exceptionally). {@code volatile} so the fast path in - * {@link #getFuture()} avoids taking the lock. Set under {@code lock} - * together with populating {@code firedResult}/{@code firedThrowable} - * and clearing {@code pending}, so a reader that observes - * {@code isFired == true} also observes the corresponding values via - * the JMM happens-before edge. - */ - private volatile boolean isFired; - - /** - * Result value the aggregate completed with, or {@code null} if it - * completed exceptionally. Only meaningful when {@code isFired} is - * true. Written under {@code lock}; readable without the lock once - * {@code isFired} has been observed true (volatile happens-before). - */ - private Object firedResult; - - /** - * Throwable the aggregate completed exceptionally with, or - * {@code null} if it completed normally. Same visibility rules as - * {@code firedResult}. - */ - private Throwable firedThrowable; - /** * Tracks futures previously returned by {@link #getFuture()} that have - * not yet been completed. {@code null} once the aggregate has fired - * (and all pending entries have been drained). Mutated only under + * not yet been completed. Held weakly via {@link WeakHashMap} so that + * fresh futures abandoned by the caller (the typical end-of-iteration + * case) become GC-collectable. Set to {@code null} once the aggregate + * has fired and the entries have been drained. Mutated only under * {@code lock}. */ - private List>> pending = new ArrayList<>(); + private Set> pending = + Collections.newSetFromMap(new WeakHashMap<>()); public Conditions(List conditions) { this.conditions = conditions; @@ -663,25 +644,22 @@ public Conditions(List conditions) { conditions.stream().map(Condition::execute).toArray(CompletableFuture[]::new)); // Single permanent listener. This is the only Completion node ever - // attached to aggregate.stack -- subsequent getFuture() calls do - // not touch the aggregate at all. + // attached to aggregate.stack while the aggregate is still pending + // -- subsequent pre-completion getFuture() calls do not touch the + // aggregate at all. this.aggregate.whenComplete((result, throwable) -> { - List>> snapshot; + List> snapshot; synchronized (lock) { - firedResult = result; - firedThrowable = throwable; - isFired = true; - snapshot = pending; + if (pending == null) { + return; + } + // Copy under the lock: the ArrayList holds strong + // references so entries that survived GC to this point + // stay alive until we complete them below. + snapshot = new ArrayList<>(pending); pending = null; } - if (snapshot == null) { - return; - } - for (WeakReference> ref : snapshot) { - CompletableFuture cf = ref.get(); - if (cf == null) { - continue; // Already GC'd -- nothing to complete. - } + for (CompletableFuture cf : snapshot) { if (throwable != null) { cf.completeExceptionally(throwable); } else { @@ -692,34 +670,23 @@ public Conditions(List conditions) { } /** - * Returns a fresh future that will complete when the underlying - * aggregate condition fires, or an already-completed future (normal or - * exceptional) if the aggregate has already fired by the time this - * method is called. + * Returns a future that will complete when the underlying aggregate + * condition fires. Pre-completion, this is a fresh future per call; + * post-completion, this is the aggregate itself (already done). */ public CompletableFuture getFuture() { - if (isFired) { - return makeCompletedFuture(); + if (aggregate.isDone()) { + return aggregate; } CompletableFuture fresh = new CompletableFuture<>(); synchronized (lock) { if (pending == null) { - // Raced with aggregate completion. isFired is now - // guaranteed true and firedResult/firedThrowable are - // populated (set under lock before pending was nulled). - return makeCompletedFuture(); - } - // Opportunistic prune of weak refs whose target has been - // collected. Keeps pending bounded even if the aggregate never - // fires. - Iterator>> it = pending.iterator(); - while (it.hasNext()) { - if (it.next().get() == null) { - it.remove(); - } + // Raced with aggregate completion between isDone() and + // the lock acquisition; aggregate is now done. + return aggregate; } - pending.add(new WeakReference<>(fresh)); + pending.add(fresh); } return fresh; } @@ -737,19 +704,5 @@ public void close() { } } } - - /** - * Materializes a new already-completed CompletableFuture mirroring - * whichever terminal state {@link #aggregate} reached. Caller must - * have observed {@code isFired == true}. - */ - private CompletableFuture makeCompletedFuture() { - if (firedThrowable != null) { - CompletableFuture cf = new CompletableFuture<>(); - cf.completeExceptionally(firedThrowable); - return cf; - } - return CompletableFuture.completedFuture(firedResult); - } } }