diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java index 8e99d15c07ef..b9a320dc8da3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java @@ -81,34 +81,42 @@ public PrepareBalancerAndLoadQueues( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { - List currentServers = prepareCurrentServers(); - taskMaster.resetPeonsForNewServers(currentServers); - - final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); - final SegmentLoadingConfig segmentLoadingConfig - = SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegmentCount()); - - final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers); - cancelLoadsOnDecommissioningServers(cluster); - - final CoordinatorRunStats stats = params.getCoordinatorStats(); - collectHistoricalStats(cluster, stats); - collectUsedSegmentStats(params, stats); - collectDebugStats(segmentLoadingConfig, stats); - - final int numBalancerThreads = segmentLoadingConfig.getBalancerComputeThreads(); - final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(numBalancerThreads); - log.debug( - "Using balancer strategy[%s] with [%d] threads.", - balancerStrategy.getClass().getSimpleName(), numBalancerThreads - ); - - return params.buildFromExisting() - .withDruidCluster(cluster) - .withBalancerStrategy(balancerStrategy) - .withSegmentLoadingConfig(segmentLoadingConfig) - .withSegmentAssignerUsing(loadQueueManager) - .build(); + // Prevent callbacks from firing while balancer/load queue accounting is taking place. + // This ensures snapshot validity at the expense of potentially stale snapshots (in some cases). + taskMaster.getCallbackLock().writeLock().lock(); + try { + List currentServers = prepareCurrentServers(); + taskMaster.resetPeonsForNewServers(currentServers); + + final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); + final SegmentLoadingConfig segmentLoadingConfig + = SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegmentCount()); + + final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers); + cancelLoadsOnDecommissioningServers(cluster); + + final CoordinatorRunStats stats = params.getCoordinatorStats(); + collectHistoricalStats(cluster, stats); + collectUsedSegmentStats(params, stats); + collectDebugStats(segmentLoadingConfig, stats); + + final int numBalancerThreads = segmentLoadingConfig.getBalancerComputeThreads(); + final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(numBalancerThreads); + log.debug( + "Using balancer strategy[%s] with [%d] threads.", + balancerStrategy.getClass().getSimpleName(), numBalancerThreads + ); + + return params.buildFromExisting() + .withDruidCluster(cluster) + .withBalancerStrategy(balancerStrategy) + .withSegmentLoadingConfig(segmentLoadingConfig) + .withSegmentAssignerUsing(loadQueueManager) + .build(); + } + finally { + taskMaster.getCallbackLock().writeLock().unlock(); + } } /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index e4fa849beb18..f7c41360d22e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Supplier; /** @@ -123,6 +124,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false); private final ExecutorService callBackExecutor; + private final ReadWriteLock callbackSync; private final Supplier loadingModeSupplier; private final ObjectWriter requestBodyWriter; @@ -135,7 +137,8 @@ public HttpLoadQueuePeon( HttpLoadQueuePeonConfig config, Supplier loadingModeSupplier, ScheduledExecutorService processingExecutor, - ExecutorService callBackExecutor + ExecutorService callBackExecutor, + ReadWriteLock callbackSync ) { this.jsonMapper = jsonMapper; @@ -144,6 +147,7 @@ public HttpLoadQueuePeon( this.config = config; this.processingExecutor = processingExecutor; this.callBackExecutor = callBackExecutor; + this.callbackSync = callbackSync; this.serverId = baseUrl; this.loadingModeSupplier = loadingModeSupplier; @@ -637,7 +641,14 @@ private void executeCallbacks(SegmentHolder holder, boolean success) { callBackExecutor.execute(() -> { for (LoadPeonCallback callback : holder.getCallbacks()) { - callback.execute(success); + // Load queue peons acquire read lock to increase tput (they operate safely on underlying state). + callbackSync.readLock().lock(); + try { + callback.execute(success); + } + finally { + callbackSync.readLock().unlock(); + } } }); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java index d9fdca36c67d..411eea0c4f34 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java @@ -35,6 +35,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; /** @@ -47,6 +49,7 @@ public class LoadQueueTaskMaster private final ObjectMapper jsonMapper; private final ScheduledExecutorService peonExec; private final ExecutorService callbackExec; + private final ReadWriteLock callbackLock; private final HttpLoadQueuePeonConfig config; private final HttpClient httpClient; private final Supplier coordinatorDynamicConfigSupplier; @@ -68,6 +71,7 @@ public LoadQueueTaskMaster( this.jsonMapper = jsonMapper; this.peonExec = peonExec; this.callbackExec = callbackExec; + this.callbackLock = new ReentrantReadWriteLock(); this.config = config; this.httpClient = httpClient; this.coordinatorDynamicConfigSupplier = coordinatorDynamicConfigSupplier; @@ -82,7 +86,8 @@ private LoadQueuePeon createPeon(ImmutableDruidServer server) config, () -> coordinatorDynamicConfigSupplier.get().getLoadingModeForServer(server.getName()), peonExec, - callbackExec + callbackExec, + callbackLock ); } @@ -154,4 +159,9 @@ public boolean isHttpLoading() { return true; } + + public ReadWriteLock getCallbackLock() + { + return callbackLock; + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 39213ff8616d..93d904448cbe 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -88,6 +88,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** */ @@ -112,6 +114,7 @@ public class DruidCoordinatorTest private OverlordClient overlordClient; private CompactionStatusTracker statusTracker; private LatchableServiceEmitter serviceEmitter; + private ReadWriteLock callbackSyncLock; @Before public void setUp() throws Exception @@ -152,6 +155,7 @@ public void setUp() throws Exception leaderAnnouncerLatch = new CountDownLatch(1); leaderUnannouncerLatch = new CountDownLatch(1); serviceEmitter = new LatchableServiceEmitter(); + callbackSyncLock = new ReentrantReadWriteLock(); coordinator = new DruidCoordinator( druidCoordinatorConfig, createMetadataManager(configManager), @@ -198,6 +202,7 @@ public void testCoordinatorRun() throws Exception Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(tier, 2), null); EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString())) .andReturn(ImmutableList.of(foreverLoadRule)).atLeastOnce(); + EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes(); metadataRuleManager.stop(); EasyMock.expectLastCall().once(); @@ -335,6 +340,7 @@ public void testCoordinatorTieredRun() throws Exception .andReturn(ImmutableList.of(hotServer, coldServer)) .atLeastOnce(); EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); + EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes(); EasyMock.replay(metadataRuleManager, serverInventoryView, loadQueueTaskMaster); @@ -419,6 +425,7 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith .andReturn(ImmutableList.of(hotServer, coldServer, brokerServer1, brokerServer2, peonServer)) .atLeastOnce(); EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); + EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes(); EasyMock.replay(metadataRuleManager, serverInventoryView, loadQueueTaskMaster); @@ -651,6 +658,7 @@ public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes(); EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); EasyMock.expect(serverInventoryView.getInventory()).andReturn(Collections.emptyList()).anyTimes(); + EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes(); EasyMock.replay(serverInventoryView, loadQueueTaskMaster, segmentsMetadataManager); // Create CoordinatorCustomDutyGroups @@ -719,6 +727,7 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(coldTier, 0), null); EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString())) .andReturn(ImmutableList.of(intervalLoadRule, foreverLoadRule)).atLeastOnce(); + EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes(); metadataRuleManager.stop(); EasyMock.expectLastCall().once(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index cb4a8f1d7f3e..171ce63f03f5 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -58,6 +58,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -91,7 +92,8 @@ public void setUp() httpClient.processingExecutor, true ), - httpClient.callbackExecutor + httpClient.callbackExecutor, + new ReentrantReadWriteLock() ); httpLoadQueuePeon.start(); } @@ -338,7 +340,8 @@ public void testBatchSize() httpClient.processingExecutor, true ), - httpClient.callbackExecutor + httpClient.callbackExecutor, + new ReentrantReadWriteLock() ); Assert.assertEquals(1, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));