diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java index 89c1899..66a7cbb 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSystemComponents.java @@ -97,7 +97,8 @@ public Synchronizer build(DataSourceBuildInputs context) { context.getBaseLogger(), context.getSelectorSource(), payloadFilter, - initialReconnectDelay + initialReconnectDelay, + context.getThreadPriority() ); } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java index e0d8773..ab11d95 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java @@ -61,6 +61,8 @@ class StreamingSynchronizerImpl implements Synchronizer { private final AtomicBoolean started = new AtomicBoolean(false); + private final int threadPriority; + public StreamingSynchronizerImpl( HttpProperties httpProperties, URI baseUri, @@ -68,7 +70,8 @@ public StreamingSynchronizerImpl( LDLogger logger, SelectorSource selectorSource, String payloadFilter, - Duration initialReconnectDelaySeconds + Duration initialReconnectDelaySeconds, + int threadPriority ) { this.httpProperties = httpProperties; this.selectorSource = selectorSource; @@ -76,6 +79,7 @@ public StreamingSynchronizerImpl( this.payloadFilter = payloadFilter; this.streamUri = HttpHelpers.concatenateUriPath(baseUri, requestPath); this.initialReconnectDelay = initialReconnectDelaySeconds; + this.threadPriority = threadPriority; // The stream will lazily start when `next` is called. } @@ -173,8 +177,7 @@ private Thread getRunThread() { } }); thread.setName("LaunchDarkly-FDv2-streaming-synchronizer"); - // TODO: Implement thread priority. - //streamThread.setPriority(); + thread.setPriority(threadPriority); thread.setDaemon(true); return thread; } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java index d7c2ae7..1fe090b 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java @@ -67,7 +67,8 @@ public void receivesMultipleChangesets() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); // First changeset @@ -103,7 +104,8 @@ public void httpNonRecoverableError() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -131,7 +133,8 @@ public void httpRecoverableError() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -159,7 +162,8 @@ public void networkError() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -192,7 +196,8 @@ public void invalidEventData() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -223,7 +228,8 @@ public void shutdownBeforeEventReceived() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture nextFuture = synchronizer.next(); @@ -262,7 +268,8 @@ public void shutdownAfterEventReceived() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -304,7 +311,8 @@ public void goodbyeEventInResponse() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); // First result should be goodbye @@ -353,7 +361,8 @@ public void heartbeatEvent() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -391,7 +400,8 @@ public void selectorWithVersionAndState() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -445,7 +455,8 @@ public void selectorRefetchedOnReconnection() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); // First result should be an error from the 503 @@ -505,7 +516,8 @@ public void errorEventFromServer() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); // Error event should be logged but not queued, so we should get the changeset @@ -543,7 +555,8 @@ public void selectorWithVersionOnly() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -583,7 +596,8 @@ public void selectorWithEmptyState() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -616,7 +630,8 @@ public void closeCalledMultipleTimes() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); // Call close multiple times - should not throw exceptions @@ -652,7 +667,8 @@ public void invalidEventStructureCausesInterrupt() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -687,7 +703,8 @@ public void payloadFilterIsAddedToRequest() throws Exception { testLogger, selectorSource, "myFilter", - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -728,7 +745,8 @@ public void payloadFilterWithSelectorBothAddedToRequest() throws Exception { testLogger, selectorSource, "testFilter", - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -767,7 +785,8 @@ public void emptyPayloadFilterNotAddedToRequest() throws Exception { testLogger, selectorSource, "", - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -806,7 +825,8 @@ public void nullPayloadFilterNotAddedToRequest() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -846,7 +866,8 @@ public void fdv1FallbackFlagSetToTrueInSuccessResponse() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -881,7 +902,8 @@ public void fdv1FallbackFlagSetToFalseWhenHeaderNotPresent() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -911,7 +933,8 @@ public void fdv1FallbackFlagSetToTrueInErrorResponse() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -948,7 +971,8 @@ public void environmentIdExtractedFromHeaders() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -986,7 +1010,8 @@ public void bothFdv1FallbackAndEnvironmentIdExtractedFromHeaders() throws Except testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -1023,7 +1048,8 @@ public void serializationExceptionWithoutFallbackHeader() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next(); @@ -1061,7 +1087,8 @@ public void serializationExceptionPreservesFallbackHeader() throws Exception { testLogger, selectorSource, null, - Duration.ofMillis(100) + Duration.ofMillis(100), + Thread.NORM_PRIORITY ); CompletableFuture resultFuture = synchronizer.next();