From 696ef81d577ee5131d7b97aa33616efcebe0bb11 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Thu, 5 Feb 2026 15:56:07 -0800 Subject: [PATCH 1/5] feat: Support stream init diagnostics events for FDv2. --- .../sdk/server/DataSystemComponents.java | 3 +- .../sdk/server/StreamingSynchronizerImpl.java | 27 +- .../sdk/server/DataSystemExamplesTemp.java | 78 +++++ .../server/StreamingSynchronizerImplTest.java | 301 ++++++++++++++++-- 4 files changed, 379 insertions(+), 30 deletions(-) create mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java index 66a7cbb..33aba8a 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java @@ -98,7 +98,8 @@ public Synchronizer build(DataSourceBuildInputs context) { context.getSelectorSource(), payloadFilter, initialReconnectDelay, - context.getThreadPriority() + context.getThreadPriority(), + context.getDiagnosticStore() ); } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index 87628ea..105f9f1 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -13,6 +13,7 @@ import com.launchdarkly.logging.LDLogger; import com.launchdarkly.logging.LogValues; import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue; +import com.launchdarkly.sdk.internal.events.DiagnosticStore; import com.launchdarkly.sdk.internal.fdv2.payloads.FDv2Event; import com.launchdarkly.sdk.internal.fdv2.sources.FDv2ProtocolHandler; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; @@ -62,6 +63,8 @@ class StreamingSynchronizerImpl implements Synchronizer { private final AtomicBoolean started = new AtomicBoolean(false); private final int threadPriority; + private final DiagnosticStore diagnosticStore; + private volatile long streamStarted = 0; public StreamingSynchronizerImpl( HttpProperties httpProperties, @@ -71,7 +74,8 @@ public StreamingSynchronizerImpl( SelectorSource selectorSource, String payloadFilter, Duration initialReconnectDelaySeconds, - int threadPriority + int threadPriority, + DiagnosticStore diagnosticStore ) { this.httpProperties = httpProperties; this.selectorSource = selectorSource; @@ -80,6 +84,7 @@ public StreamingSynchronizerImpl( this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath); this.initialReconnectDelay = initialReconnectDelaySeconds; this.threadPriority = threadPriority; + this.diagnosticStore = diagnosticStore; // The stream will lazily start when `next` is called. } @@ -143,6 +148,7 @@ private void startStream() { @NotNull private Thread getRunThread() { Thread thread = new Thread(() -> { + streamStarted = System.currentTimeMillis(); try { for (StreamEvent event : eventSource.anyEvents()) { if (!handleEvent(event)) { @@ -215,6 +221,13 @@ public void close() { shutdownFuture.complete(FDv2SourceResult.shutdown()); } + private void recordStreamInit(boolean failed) { + if (diagnosticStore != null && streamStarted != 0) { + diagnosticStore.recordStreamInit(streamStarted, + System.currentTimeMillis() - streamStarted, failed); + } + } + private boolean handleEvent(StreamEvent event) { if (event instanceof MessageEvent) { handleMessage((MessageEvent) event); @@ -259,6 +272,8 @@ private void handleMessage(MessageEvent event) { logger, event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName()), true); + recordStreamInit(false); + streamStarted = 0; result = FDv2SourceResult.changeSet(converted, getFallback(event)); } catch (Exception e) { logger.error("Failed to convert FDv2 changeset: {}", LogValues.exceptionSummary(e)); @@ -337,8 +352,14 @@ private void interruptedWithException(Exception e, DataSourceStatusProvider.Erro } private boolean handleError(StreamException e) { + boolean streamFailed = true; + if (e instanceof StreamClosedByCallerException) { + // We closed it ourselves (shutdown was called or stream was deliberately restarted) + streamFailed = false; + } + recordStreamInit(streamFailed); + if (e instanceof StreamClosedByCallerException) { - // We closed it ourselves (shutdown was called) return false; } @@ -358,6 +379,7 @@ private boolean handleError(StreamException e) { } else { // Queue as INTERRUPTED to indicate temporary failure resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(e))); + streamStarted = System.currentTimeMillis(); return true; // allow reconnect } } @@ -372,6 +394,7 @@ private boolean handleError(StreamException e) { Instant.now() ); resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(e))); + streamStarted = System.currentTimeMillis(); return true; // allow reconnect } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java new file mode 100644 index 0000000..e34dac7 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java @@ -0,0 +1,78 @@ +package com.launchdarkly.sdk.server; + +/** + * Temporary file to verify data system configuration examples compile correctly. + * This file should be deleted after verification. + */ +public class DataSystemExamplesTemp { + + // Example 1: Default configuration + public void defaultConfiguration() { + LDConfig config = new LDConfig.Builder() + .dataSystem(Components.dataSystem().defaultMode()) + .build(); + + // SDK key is passed to LDClient constructor + // LDClient client = new LDClient("your-sdk-key", config); + } + + // Example 2: Polling only + public void pollingOnly() { + LDConfig config = new LDConfig.Builder() + .dataSystem(Components.dataSystem().polling()) + .build(); + } + + // Example 3: Streaming only + public void streamingOnly() { + LDConfig config = new LDConfig.Builder() + .dataSystem(Components.dataSystem().streaming()) + .build(); + } + + // Example 4: With a persistent store (pattern only - requires actual persistence integration) + // public void withPersistentStore() { + // // Using a hypothetical Redis integration: + // LDConfig config = new LDConfig.Builder() + // .dataSystem(Components.dataSystem().persistentStore( + // Components.persistentDataStore(Redis.dataStore()))) + // .build(); + // } + + // Example 5: Daemon mode (pattern only - requires actual persistence integration) + // public void daemonMode() { + // // Using a hypothetical Redis integration: + // LDConfig config = new LDConfig.Builder() + // .dataSystem(Components.dataSystem().daemon( + // Components.persistentDataStore(Redis.dataStore()))) + // .build(); + // } + + // Example 6: Offline mode + public void offlineMode() { + LDConfig config = new LDConfig.Builder() + .offline(true) + .build(); + } + + // Example 7: Offline with data system (data system is ignored) + public void offlineWithDataSystem() { + LDConfig config = new LDConfig.Builder() + .offline(true) + .dataSystem(Components.dataSystem().defaultMode()) + .build(); + } + + // Example 8: Custom configuration + public void customConfiguration() { + LDConfig config = new LDConfig.Builder() + .dataSystem(Components.dataSystem().custom() + .initializers(DataSystemComponents.pollingInitializer()) + .synchronizers( + DataSystemComponents.streamingSynchronizer(), + DataSystemComponents.pollingSynchronizer() + ) + .fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling())) + .build(); + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java index ad43e07..d305248 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java @@ -1,5 +1,7 @@ package com.launchdarkly.sdk.server; +import com.launchdarkly.sdk.LDValue; +import com.launchdarkly.sdk.internal.events.DiagnosticStore; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; import com.launchdarkly.sdk.internal.http.HttpProperties; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; @@ -16,11 +18,16 @@ import java.util.concurrent.TimeUnit; import static com.launchdarkly.sdk.server.ComponentsImpl.toHttpProperties; +import static com.launchdarkly.sdk.server.TestComponents.basicDiagnosticStore; import static com.launchdarkly.sdk.server.TestComponents.clientContext; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -68,7 +75,8 @@ public void receivesMultipleChangesets() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); // First changeset @@ -105,7 +113,8 @@ public void httpNonRecoverableError() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -134,7 +143,8 @@ public void httpRecoverableError() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -164,7 +174,8 @@ public void networkError() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -198,7 +209,8 @@ public void invalidEventData() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -230,7 +242,8 @@ public void shutdownBeforeEventReceived() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture nextFuture = synchronizer.next(); @@ -271,7 +284,8 @@ public void shutdownAfterEventReceived() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -314,7 +328,8 @@ public void goodbyeEventInResponse() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); // First result should be goodbye @@ -365,7 +380,8 @@ public void heartbeatEvent() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -404,7 +420,8 @@ public void selectorWithVersionAndState() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -459,7 +476,8 @@ public void selectorRefetchedOnReconnection() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); // First result should be an error from the 503 @@ -520,7 +538,8 @@ public void errorEventFromServer() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); // Error event should be logged but not queued, so we should get the changeset @@ -559,7 +578,8 @@ public void selectorWithVersionOnly() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -600,7 +620,8 @@ public void selectorWithEmptyState() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -634,7 +655,8 @@ public void closeCalledMultipleTimes() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); // Call close multiple times - should not throw exceptions @@ -671,7 +693,8 @@ public void invalidEventStructureCausesInterrupt() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -707,7 +730,8 @@ public void payloadFilterIsAddedToRequest() throws Exception { selectorSource, "myFilter", Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -749,7 +773,8 @@ public void payloadFilterWithSelectorBothAddedToRequest() throws Exception { selectorSource, "testFilter", Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -789,7 +814,8 @@ public void emptyPayloadFilterNotAddedToRequest() throws Exception { selectorSource, "", Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -829,7 +855,8 @@ public void nullPayloadFilterNotAddedToRequest() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -870,7 +897,8 @@ public void fdv1FallbackFlagSetToTrueInSuccessResponse() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -906,7 +934,8 @@ public void fdv1FallbackFlagSetToFalseWhenHeaderNotPresent() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -937,7 +966,8 @@ public void fdv1FallbackFlagSetToTrueInErrorResponse() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -975,7 +1005,8 @@ public void environmentIdExtractedFromHeaders() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -1014,7 +1045,8 @@ public void bothFdv1FallbackAndEnvironmentIdExtractedFromHeaders() throws Except selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -1052,7 +1084,8 @@ public void serializationExceptionWithoutFallbackHeader() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -1091,7 +1124,8 @@ public void serializationExceptionPreservesFallbackHeader() throws Exception { selectorSource, null, Duration.ofMillis(100), - Thread.NORM_PRIORITY + Thread.NORM_PRIORITY, + null ); CompletableFuture resultFuture = synchronizer.next(); @@ -1107,4 +1141,217 @@ public void serializationExceptionPreservesFallbackHeader() throws Exception { } } + @Test + public void streamInitDiagnosticRecordedOnSuccessfulChangeset() throws Exception { + DiagnosticStore diagnosticStore = basicDiagnosticStore(); + long startTime = System.currentTimeMillis(); + + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100), + Thread.NORM_PRIORITY, + diagnosticStore + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + + long timeAfterOpen = System.currentTimeMillis(); + LDValue event = diagnosticStore.createEventAndReset(0, 0).getJsonValue(); + LDValue streamInits = event.get("streamInits"); + assertEquals(1, streamInits.size()); + LDValue init = streamInits.get(0); + assertFalse(init.get("failed").booleanValue()); + assertThat(init.get("timestamp").longValue(), + allOf(greaterThanOrEqualTo(startTime), lessThanOrEqualTo(timeAfterOpen))); + assertThat(init.get("durationMillis").longValue(), lessThanOrEqualTo(timeAfterOpen - startTime)); + + synchronizer.close(); + } + } + + @Test + public void streamInitDiagnosticRecordedOnErrorDuringInit() throws Exception { + DiagnosticStore diagnosticStore = basicDiagnosticStore(); + long startTime = System.currentTimeMillis(); + + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + // First connection: 503 error, second connection: successful changeset + try (HttpServer server = HttpServer.start(Handlers.sequential( + Handlers.status(503), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen())))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100), + Thread.NORM_PRIORITY, + diagnosticStore + ); + + // First result should be the error + CompletableFuture result1Future = synchronizer.next(); + FDv2SourceResult result1 = result1Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result1.getResultType()); + + // Second result should be the successful changeset + CompletableFuture result2Future = synchronizer.next(); + FDv2SourceResult result2 = result2Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result2.getResultType()); + + long timeAfterOpen = System.currentTimeMillis(); + LDValue event = diagnosticStore.createEventAndReset(0, 0).getJsonValue(); + + LDValue streamInits = event.get("streamInits"); + assertEquals(2, streamInits.size()); + LDValue init0 = streamInits.get(0); + assertTrue(init0.get("failed").booleanValue()); + assertThat(init0.get("timestamp").longValue(), + allOf(greaterThanOrEqualTo(startTime), lessThanOrEqualTo(timeAfterOpen))); + assertThat(init0.get("durationMillis").longValue(), lessThanOrEqualTo(timeAfterOpen - startTime)); + + LDValue init1 = streamInits.get(1); + assertFalse(init1.get("failed").booleanValue()); + assertThat(init1.get("timestamp").longValue(), + allOf(greaterThanOrEqualTo(init0.get("timestamp").longValue()), lessThanOrEqualTo(timeAfterOpen))); + + synchronizer.close(); + } + } + + @Test + public void streamRestartNotRecordedAsFailed() throws Exception { + DiagnosticStore diagnosticStore = basicDiagnosticStore(); + + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred1 = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + String goodbyeEvent = makeEvent("goodbye", "{\"reason\":\"service-unavailable\"}"); + String payloadTransferred2 = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:101)\",\"version\":101}"); + + // First connection: changeset + goodbye, second connection: changeset + try (HttpServer server = HttpServer.start(Handlers.sequential( + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred1), + Handlers.SSE.event(goodbyeEvent), + Handlers.SSE.leaveOpen()), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred2), + Handlers.SSE.leaveOpen())))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100), + Thread.NORM_PRIORITY, + diagnosticStore + ); + + // First changeset + CompletableFuture result1Future = synchronizer.next(); + FDv2SourceResult result1 = result1Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result1.getResultType()); + + // Goodbye + CompletableFuture result2Future = synchronizer.next(); + FDv2SourceResult result2 = result2Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result2.getResultType()); + assertEquals(FDv2SourceResult.State.GOODBYE, result2.getStatus().getState()); + + // Second changeset after reconnect + CompletableFuture result3Future = synchronizer.next(); + FDv2SourceResult result3 = result3Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result3.getResultType()); + + LDValue event = diagnosticStore.createEventAndReset(0, 0).getJsonValue(); + LDValue streamInits = event.get("streamInits"); + assertEquals(2, streamInits.size()); + // Both inits should be successful (goodbye is a deliberate restart, not a failure) + assertFalse(streamInits.get(0).get("failed").booleanValue()); + assertFalse(streamInits.get(1).get("failed").booleanValue()); + + synchronizer.close(); + } + } + + @Test + public void nullDiagnosticStoreDoesNotCauseError() throws Exception { + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + try (HttpServer server = HttpServer.start(Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen()))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100), + Thread.NORM_PRIORITY, + null // null diagnosticStore + ); + + CompletableFuture resultFuture = synchronizer.next(); + FDv2SourceResult result = resultFuture.get(5, TimeUnit.SECONDS); + + assertNotNull(result); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType()); + assertNotNull(result.getChangeSet()); + + synchronizer.close(); + } + } + } From a4c77b3f9a56dff4c68b4afbb78e446a6f80ee2a Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 6 Feb 2026 08:37:17 -0800 Subject: [PATCH 2/5] Remove extraneous test file. --- .../sdk/server/DataSystemExamplesTemp.java | 78 ------------------- 1 file changed, 78 deletions(-) delete mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java deleted file mode 100644 index e34dac7..0000000 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.launchdarkly.sdk.server; - -/** - * Temporary file to verify data system configuration examples compile correctly. - * This file should be deleted after verification. - */ -public class DataSystemExamplesTemp { - - // Example 1: Default configuration - public void defaultConfiguration() { - LDConfig config = new LDConfig.Builder() - .dataSystem(Components.dataSystem().defaultMode()) - .build(); - - // SDK key is passed to LDClient constructor - // LDClient client = new LDClient("your-sdk-key", config); - } - - // Example 2: Polling only - public void pollingOnly() { - LDConfig config = new LDConfig.Builder() - .dataSystem(Components.dataSystem().polling()) - .build(); - } - - // Example 3: Streaming only - public void streamingOnly() { - LDConfig config = new LDConfig.Builder() - .dataSystem(Components.dataSystem().streaming()) - .build(); - } - - // Example 4: With a persistent store (pattern only - requires actual persistence integration) - // public void withPersistentStore() { - // // Using a hypothetical Redis integration: - // LDConfig config = new LDConfig.Builder() - // .dataSystem(Components.dataSystem().persistentStore( - // Components.persistentDataStore(Redis.dataStore()))) - // .build(); - // } - - // Example 5: Daemon mode (pattern only - requires actual persistence integration) - // public void daemonMode() { - // // Using a hypothetical Redis integration: - // LDConfig config = new LDConfig.Builder() - // .dataSystem(Components.dataSystem().daemon( - // Components.persistentDataStore(Redis.dataStore()))) - // .build(); - // } - - // Example 6: Offline mode - public void offlineMode() { - LDConfig config = new LDConfig.Builder() - .offline(true) - .build(); - } - - // Example 7: Offline with data system (data system is ignored) - public void offlineWithDataSystem() { - LDConfig config = new LDConfig.Builder() - .offline(true) - .dataSystem(Components.dataSystem().defaultMode()) - .build(); - } - - // Example 8: Custom configuration - public void customConfiguration() { - LDConfig config = new LDConfig.Builder() - .dataSystem(Components.dataSystem().custom() - .initializers(DataSystemComponents.pollingInitializer()) - .synchronizers( - DataSystemComponents.streamingSynchronizer(), - DataSystemComponents.pollingSynchronizer() - ) - .fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling())) - .build(); - } -} From d81ec8768e9200a2d5008f1a4c900f77f1eaea98 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 6 Feb 2026 10:13:53 -0800 Subject: [PATCH 3/5] PR feedback/tests. --- .../sdk/server/StreamingSynchronizerImpl.java | 15 +- .../sdk/server/DataSystemExamplesTemp.java | 78 +++++ .../server/StreamingSynchronizerImplTest.java | 284 ++++++++++++++++++ 3 files changed, 372 insertions(+), 5 deletions(-) create mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index 105f9f1..0eec0ff 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -169,6 +169,8 @@ private Thread getRunThread() { logger.error("Stream thread ended with exception: {}", LogValues.exceptionSummary(e)); logger.debug(LogValues.exceptionTrace(e)); + recordStreamInit(true); // Record failed init for unexpected thread exception + DataSourceStatusProvider.ErrorInfo errorInfo = new DataSourceStatusProvider.ErrorInfo( DataSourceStatusProvider.ErrorKind.UNKNOWN, 0, @@ -273,6 +275,7 @@ private void handleMessage(MessageEvent event) { event.getHeaders().value(HeaderConstants.ENVIRONMENT_ID.getHeaderName()), true); recordStreamInit(false); + // Reset to 0 after successful init to prevent duplicate recordings until next restart streamStarted = 0; result = FDv2SourceResult.changeSet(converted, getFallback(event)); } catch (Exception e) { @@ -285,6 +288,7 @@ private void handleMessage(MessageEvent event) { Instant.now() ); result = FDv2SourceResult.interrupted(conversionError, getFallback(event)); + recordStreamInit(true); // Record failed init before restarting restartStream(); } break; @@ -325,6 +329,7 @@ private void handleMessage(MessageEvent event) { ); result = FDv2SourceResult.interrupted(internalError, getFallback(event)); if(kind == DataSourceStatusProvider.ErrorKind.INVALID_DATA) { + recordStreamInit(true); // Record failed init before restarting restartStream(); } break; @@ -348,18 +353,17 @@ private void interruptedWithException(Exception e, DataSourceStatusProvider.Erro Instant.now() ); resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(event))); + recordStreamInit(true); // Record failed init before restarting restartStream(); } private boolean handleError(StreamException e) { - boolean streamFailed = true; - if (e instanceof StreamClosedByCallerException) { - // We closed it ourselves (shutdown was called or stream was deliberately restarted) - streamFailed = false; - } + // Check if this was a deliberate shutdown/restart rather than an actual error + boolean streamFailed = !(e instanceof StreamClosedByCallerException); recordStreamInit(streamFailed); if (e instanceof StreamClosedByCallerException) { + // We closed it ourselves (shutdown was called or stream was deliberately restarted) return false; } @@ -400,6 +404,7 @@ private boolean handleError(StreamException e) { private void restartStream() { Objects.requireNonNull(eventSource, "eventSource must not be null"); + streamStarted = System.currentTimeMillis(); eventSource.interrupt(); protocolHandler.reset(); } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java new file mode 100644 index 0000000..e34dac7 --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java @@ -0,0 +1,78 @@ +package com.launchdarkly.sdk.server; + +/** + * Temporary file to verify data system configuration examples compile correctly. + * This file should be deleted after verification. + */ +public class DataSystemExamplesTemp { + + // Example 1: Default configuration + public void defaultConfiguration() { + LDConfig config = new LDConfig.Builder() + .dataSystem(Components.dataSystem().defaultMode()) + .build(); + + // SDK key is passed to LDClient constructor + // LDClient client = new LDClient("your-sdk-key", config); + } + + // Example 2: Polling only + public void pollingOnly() { + LDConfig config = new LDConfig.Builder() + .dataSystem(Components.dataSystem().polling()) + .build(); + } + + // Example 3: Streaming only + public void streamingOnly() { + LDConfig config = new LDConfig.Builder() + .dataSystem(Components.dataSystem().streaming()) + .build(); + } + + // Example 4: With a persistent store (pattern only - requires actual persistence integration) + // public void withPersistentStore() { + // // Using a hypothetical Redis integration: + // LDConfig config = new LDConfig.Builder() + // .dataSystem(Components.dataSystem().persistentStore( + // Components.persistentDataStore(Redis.dataStore()))) + // .build(); + // } + + // Example 5: Daemon mode (pattern only - requires actual persistence integration) + // public void daemonMode() { + // // Using a hypothetical Redis integration: + // LDConfig config = new LDConfig.Builder() + // .dataSystem(Components.dataSystem().daemon( + // Components.persistentDataStore(Redis.dataStore()))) + // .build(); + // } + + // Example 6: Offline mode + public void offlineMode() { + LDConfig config = new LDConfig.Builder() + .offline(true) + .build(); + } + + // Example 7: Offline with data system (data system is ignored) + public void offlineWithDataSystem() { + LDConfig config = new LDConfig.Builder() + .offline(true) + .dataSystem(Components.dataSystem().defaultMode()) + .build(); + } + + // Example 8: Custom configuration + public void customConfiguration() { + LDConfig config = new LDConfig.Builder() + .dataSystem(Components.dataSystem().custom() + .initializers(DataSystemComponents.pollingInitializer()) + .synchronizers( + DataSystemComponents.streamingSynchronizer(), + DataSystemComponents.pollingSynchronizer() + ) + .fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling())) + .build(); + } +} diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java index d305248..8b27d2d 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java @@ -1354,4 +1354,288 @@ public void nullDiagnosticStoreDoesNotCauseError() throws Exception { } } + @Test + public void multipleRestartsRecordMultipleDiagnostics() throws Exception { + DiagnosticStore diagnosticStore = basicDiagnosticStore(); + + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred1 = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + String goodbyeEvent1 = makeEvent("goodbye", "{\"reason\":\"service-unavailable\"}"); + String payloadTransferred2 = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:101)\",\"version\":101}"); + String goodbyeEvent2 = makeEvent("goodbye", "{\"reason\":\"service-unavailable\"}"); + String payloadTransferred3 = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:102)\",\"version\":102}"); + + // Three connections: changeset+goodbye, changeset+goodbye, changeset + try (HttpServer server = HttpServer.start(Handlers.sequential( + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred1), + Handlers.SSE.event(goodbyeEvent1), + Handlers.SSE.leaveOpen()), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred2), + Handlers.SSE.event(goodbyeEvent2), + Handlers.SSE.leaveOpen()), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred3), + Handlers.SSE.leaveOpen())))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100), + Thread.NORM_PRIORITY, + diagnosticStore + ); + + // First changeset + CompletableFuture result1Future = synchronizer.next(); + FDv2SourceResult result1 = result1Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result1.getResultType()); + + // First goodbye + CompletableFuture result2Future = synchronizer.next(); + FDv2SourceResult result2 = result2Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result2.getResultType()); + + // Second changeset + CompletableFuture result3Future = synchronizer.next(); + FDv2SourceResult result3 = result3Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result3.getResultType()); + + // Second goodbye + CompletableFuture result4Future = synchronizer.next(); + FDv2SourceResult result4 = result4Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result4.getResultType()); + + // Third changeset + CompletableFuture result5Future = synchronizer.next(); + FDv2SourceResult result5 = result5Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result5.getResultType()); + + LDValue event = diagnosticStore.createEventAndReset(0, 0).getJsonValue(); + LDValue streamInits = event.get("streamInits"); + assertEquals(3, streamInits.size()); + // All three inits should be successful + assertFalse(streamInits.get(0).get("failed").booleanValue()); + assertFalse(streamInits.get(1).get("failed").booleanValue()); + assertFalse(streamInits.get(2).get("failed").booleanValue()); + + synchronizer.close(); + } + } + + @Test + public void streamRestartAfterInvalidDataRecordsMultipleDiagnostics() throws Exception { + DiagnosticStore diagnosticStore = basicDiagnosticStore(); + + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String invalidPayload = makeEvent("payload-transferred", "{malformed json}"); + String validPayload = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + // First connection: invalid data, second connection: valid changeset + try (HttpServer server = HttpServer.start(Handlers.sequential( + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(invalidPayload), + Handlers.SSE.leaveOpen()), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(validPayload), + Handlers.SSE.leaveOpen())))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100), + Thread.NORM_PRIORITY, + diagnosticStore + ); + + // First result should be interrupted due to invalid data + CompletableFuture result1Future = synchronizer.next(); + FDv2SourceResult result1 = result1Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result1.getResultType()); + assertEquals(FDv2SourceResult.State.INTERRUPTED, result1.getStatus().getState()); + + // Second result should be the valid changeset + CompletableFuture result2Future = synchronizer.next(); + FDv2SourceResult result2 = result2Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result2.getResultType()); + + LDValue event = diagnosticStore.createEventAndReset(0, 0).getJsonValue(); + LDValue streamInits = event.get("streamInits"); + assertEquals(2, streamInits.size()); + // First init should be failed (invalid data), second should be successful + assertTrue(streamInits.get(0).get("failed").booleanValue()); + assertFalse(streamInits.get(1).get("failed").booleanValue()); + + synchronizer.close(); + } + } + + @Test + public void multipleErrorsRecordMultipleDiagnostics() throws Exception { + DiagnosticStore diagnosticStore = basicDiagnosticStore(); + long startTime = System.currentTimeMillis(); + + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + + // Three connections: 503, 503, successful changeset + try (HttpServer server = HttpServer.start(Handlers.sequential( + Handlers.status(503), + Handlers.status(503), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred), + Handlers.SSE.leaveOpen())))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100), + Thread.NORM_PRIORITY, + diagnosticStore + ); + + // First error + CompletableFuture result1Future = synchronizer.next(); + FDv2SourceResult result1 = result1Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result1.getResultType()); + + // Second error + CompletableFuture result2Future = synchronizer.next(); + FDv2SourceResult result2 = result2Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result2.getResultType()); + + // Successful changeset + CompletableFuture result3Future = synchronizer.next(); + FDv2SourceResult result3 = result3Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result3.getResultType()); + + long timeAfterOpen = System.currentTimeMillis(); + LDValue event = diagnosticStore.createEventAndReset(0, 0).getJsonValue(); + LDValue streamInits = event.get("streamInits"); + assertEquals(3, streamInits.size()); + + // First two inits should be failed, last should be successful + LDValue init0 = streamInits.get(0); + assertTrue(init0.get("failed").booleanValue()); + assertThat(init0.get("timestamp").longValue(), + allOf(greaterThanOrEqualTo(startTime), lessThanOrEqualTo(timeAfterOpen))); + + LDValue init1 = streamInits.get(1); + assertTrue(init1.get("failed").booleanValue()); + assertThat(init1.get("timestamp").longValue(), + allOf(greaterThanOrEqualTo(init0.get("timestamp").longValue()), lessThanOrEqualTo(timeAfterOpen))); + + LDValue init2 = streamInits.get(2); + assertFalse(init2.get("failed").booleanValue()); + assertThat(init2.get("timestamp").longValue(), + allOf(greaterThanOrEqualTo(init1.get("timestamp").longValue()), lessThanOrEqualTo(timeAfterOpen))); + + synchronizer.close(); + } + } + + @Test + public void errorAfterSuccessfulChangesetRecordsNewDiagnostic() throws Exception { + DiagnosticStore diagnosticStore = basicDiagnosticStore(); + + String serverIntent = makeEvent("server-intent", "{\"payloads\":[{\"id\":\"payload-1\",\"target\":100,\"intentCode\":\"xfer-full\",\"reason\":\"payload-missing\"}]}"); + String payloadTransferred1 = makeEvent("payload-transferred", "{\"state\":\"(p:payload-1:100)\",\"version\":100}"); + String goodbyeEvent = makeEvent("goodbye", "{\"reason\":\"service-unavailable\"}"); + + // First connection: successful changeset + goodbye, second connection: 503 error, third connection: successful + try (HttpServer server = HttpServer.start(Handlers.sequential( + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred1), + Handlers.SSE.event(goodbyeEvent), + Handlers.SSE.leaveOpen()), + Handlers.status(503), + Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(serverIntent), + Handlers.SSE.event(payloadTransferred1), + Handlers.SSE.leaveOpen())))) { + + HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp()); + SelectorSource selectorSource = mockSelectorSource(); + + StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl( + httpProperties, + server.getUri(), + "/stream", + testLogger, + selectorSource, + null, + Duration.ofMillis(100), + Thread.NORM_PRIORITY, + diagnosticStore + ); + + // First successful changeset + CompletableFuture result1Future = synchronizer.next(); + FDv2SourceResult result1 = result1Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result1.getResultType()); + + // Goodbye + CompletableFuture result2Future = synchronizer.next(); + FDv2SourceResult result2 = result2Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result2.getResultType()); + + // Error + CompletableFuture result3Future = synchronizer.next(); + FDv2SourceResult result3 = result3Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.STATUS, result3.getResultType()); + + // Second successful changeset + CompletableFuture result4Future = synchronizer.next(); + FDv2SourceResult result4 = result4Future.get(5, TimeUnit.SECONDS); + assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result4.getResultType()); + + LDValue event = diagnosticStore.createEventAndReset(0, 0).getJsonValue(); + LDValue streamInits = event.get("streamInits"); + assertEquals(3, streamInits.size()); + // First init: successful, second init: failed (503), third init: successful + assertFalse(streamInits.get(0).get("failed").booleanValue()); + assertTrue(streamInits.get(1).get("failed").booleanValue()); + assertFalse(streamInits.get(2).get("failed").booleanValue()); + + synchronizer.close(); + } + } + } From 9a84b519a1d465d3be98356624355d42de673b2f Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 6 Feb 2026 14:01:19 -0800 Subject: [PATCH 4/5] Simplify --- .../sdk/server/StreamingSynchronizerImpl.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index 0eec0ff..421ad0d 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -288,8 +288,7 @@ private void handleMessage(MessageEvent event) { Instant.now() ); result = FDv2SourceResult.interrupted(conversionError, getFallback(event)); - recordStreamInit(true); // Record failed init before restarting - restartStream(); + restartStream(true); } break; @@ -305,7 +304,7 @@ private void handleMessage(MessageEvent event) { logger.info("Goodbye was received from the LaunchDarkly connection with reason: '{}'.", reason); result = FDv2SourceResult.goodbye(reason, getFallback(event)); // We drop this current connection and attempt to restart the stream. - restartStream(); + restartStream(false); // Not a failure - deliberate server-initiated restart break; case INTERNAL_ERROR: @@ -329,8 +328,7 @@ private void handleMessage(MessageEvent event) { ); result = FDv2SourceResult.interrupted(internalError, getFallback(event)); if(kind == DataSourceStatusProvider.ErrorKind.INVALID_DATA) { - recordStreamInit(true); // Record failed init before restarting - restartStream(); + restartStream(true); } break; @@ -353,8 +351,7 @@ private void interruptedWithException(Exception e, DataSourceStatusProvider.Erro Instant.now() ); resultQueue.put(FDv2SourceResult.interrupted(errorInfo, getFallback(event))); - recordStreamInit(true); // Record failed init before restarting - restartStream(); + restartStream(true); } private boolean handleError(StreamException e) { @@ -402,8 +399,9 @@ private boolean handleError(StreamException e) { return true; // allow reconnect } - private void restartStream() { + private void restartStream(boolean failed) { Objects.requireNonNull(eventSource, "eventSource must not be null"); + recordStreamInit(failed); streamStarted = System.currentTimeMillis(); eventSource.interrupt(); protocolHandler.reset(); From ee478d9178316d7714f790da267efd9400d7c0db Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 6 Feb 2026 14:02:21 -0800 Subject: [PATCH 5/5] Remove extraneous sample --- .../sdk/server/DataSystemExamplesTemp.java | 78 ------------------- 1 file changed, 78 deletions(-) delete mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java deleted file mode 100644 index e34dac7..0000000 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DataSystemExamplesTemp.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.launchdarkly.sdk.server; - -/** - * Temporary file to verify data system configuration examples compile correctly. - * This file should be deleted after verification. - */ -public class DataSystemExamplesTemp { - - // Example 1: Default configuration - public void defaultConfiguration() { - LDConfig config = new LDConfig.Builder() - .dataSystem(Components.dataSystem().defaultMode()) - .build(); - - // SDK key is passed to LDClient constructor - // LDClient client = new LDClient("your-sdk-key", config); - } - - // Example 2: Polling only - public void pollingOnly() { - LDConfig config = new LDConfig.Builder() - .dataSystem(Components.dataSystem().polling()) - .build(); - } - - // Example 3: Streaming only - public void streamingOnly() { - LDConfig config = new LDConfig.Builder() - .dataSystem(Components.dataSystem().streaming()) - .build(); - } - - // Example 4: With a persistent store (pattern only - requires actual persistence integration) - // public void withPersistentStore() { - // // Using a hypothetical Redis integration: - // LDConfig config = new LDConfig.Builder() - // .dataSystem(Components.dataSystem().persistentStore( - // Components.persistentDataStore(Redis.dataStore()))) - // .build(); - // } - - // Example 5: Daemon mode (pattern only - requires actual persistence integration) - // public void daemonMode() { - // // Using a hypothetical Redis integration: - // LDConfig config = new LDConfig.Builder() - // .dataSystem(Components.dataSystem().daemon( - // Components.persistentDataStore(Redis.dataStore()))) - // .build(); - // } - - // Example 6: Offline mode - public void offlineMode() { - LDConfig config = new LDConfig.Builder() - .offline(true) - .build(); - } - - // Example 7: Offline with data system (data system is ignored) - public void offlineWithDataSystem() { - LDConfig config = new LDConfig.Builder() - .offline(true) - .dataSystem(Components.dataSystem().defaultMode()) - .build(); - } - - // Example 8: Custom configuration - public void customConfiguration() { - LDConfig config = new LDConfig.Builder() - .dataSystem(Components.dataSystem().custom() - .initializers(DataSystemComponents.pollingInitializer()) - .synchronizers( - DataSystemComponents.streamingSynchronizer(), - DataSystemComponents.pollingSynchronizer() - ) - .fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling())) - .build(); - } -}