Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,34 +81,42 @@ public PrepareBalancerAndLoadQueues(
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
List<ImmutableDruidServer> currentServers = prepareCurrentServers();
taskMaster.resetPeonsForNewServers(currentServers);

final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
final SegmentLoadingConfig segmentLoadingConfig
= SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegmentCount());

final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);

final CoordinatorRunStats stats = params.getCoordinatorStats();
collectHistoricalStats(cluster, stats);
collectUsedSegmentStats(params, stats);
collectDebugStats(segmentLoadingConfig, stats);

final int numBalancerThreads = segmentLoadingConfig.getBalancerComputeThreads();
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(numBalancerThreads);
log.debug(
"Using balancer strategy[%s] with [%d] threads.",
balancerStrategy.getClass().getSimpleName(), numBalancerThreads
);

return params.buildFromExisting()
.withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy)
.withSegmentLoadingConfig(segmentLoadingConfig)
.withSegmentAssignerUsing(loadQueueManager)
.build();
// Prevent callbacks from firing while balancer/load queue accounting is taking place.
// This ensures snapshot validity at the expense of potentially stale snapshots (in some cases).
taskMaster.getCallbackLock().writeLock().lock();
try {
List<ImmutableDruidServer> currentServers = prepareCurrentServers();
taskMaster.resetPeonsForNewServers(currentServers);

final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
final SegmentLoadingConfig segmentLoadingConfig
= SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegmentCount());

final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);

final CoordinatorRunStats stats = params.getCoordinatorStats();
collectHistoricalStats(cluster, stats);
collectUsedSegmentStats(params, stats);
collectDebugStats(segmentLoadingConfig, stats);

final int numBalancerThreads = segmentLoadingConfig.getBalancerComputeThreads();
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(numBalancerThreads);
log.debug(
"Using balancer strategy[%s] with [%d] threads.",
balancerStrategy.getClass().getSimpleName(), numBalancerThreads
);

return params.buildFromExisting()
.withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy)
.withSegmentLoadingConfig(segmentLoadingConfig)
.withSegmentAssignerUsing(loadQueueManager)
.build();
}
finally {
taskMaster.getCallbackLock().writeLock().unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -123,6 +124,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon

private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
private final ExecutorService callBackExecutor;
private final ReadWriteLock callbackSync;
private final Supplier<SegmentLoadingMode> loadingModeSupplier;

private final ObjectWriter requestBodyWriter;
Expand All @@ -135,7 +137,8 @@ public HttpLoadQueuePeon(
HttpLoadQueuePeonConfig config,
Supplier<SegmentLoadingMode> loadingModeSupplier,
ScheduledExecutorService processingExecutor,
ExecutorService callBackExecutor
ExecutorService callBackExecutor,
ReadWriteLock callbackSync
)
{
this.jsonMapper = jsonMapper;
Expand All @@ -144,6 +147,7 @@ public HttpLoadQueuePeon(
this.config = config;
this.processingExecutor = processingExecutor;
this.callBackExecutor = callBackExecutor;
this.callbackSync = callbackSync;

this.serverId = baseUrl;
this.loadingModeSupplier = loadingModeSupplier;
Expand Down Expand Up @@ -637,7 +641,14 @@ private void executeCallbacks(SegmentHolder holder, boolean success)
{
callBackExecutor.execute(() -> {
for (LoadPeonCallback callback : holder.getCallbacks()) {
callback.execute(success);
// Load queue peons acquire read lock to increase tput (they operate safely on underlying state).
callbackSync.readLock().lock();
try {
callback.execute(success);
}
finally {
callbackSync.readLock().unlock();
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

/**
Expand All @@ -47,6 +49,7 @@ public class LoadQueueTaskMaster
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService peonExec;
private final ExecutorService callbackExec;
private final ReadWriteLock callbackLock;
private final HttpLoadQueuePeonConfig config;
private final HttpClient httpClient;
private final Supplier<CoordinatorDynamicConfig> coordinatorDynamicConfigSupplier;
Expand All @@ -68,6 +71,7 @@ public LoadQueueTaskMaster(
this.jsonMapper = jsonMapper;
this.peonExec = peonExec;
this.callbackExec = callbackExec;
this.callbackLock = new ReentrantReadWriteLock();
this.config = config;
this.httpClient = httpClient;
this.coordinatorDynamicConfigSupplier = coordinatorDynamicConfigSupplier;
Expand All @@ -82,7 +86,8 @@ private LoadQueuePeon createPeon(ImmutableDruidServer server)
config,
() -> coordinatorDynamicConfigSupplier.get().getLoadingModeForServer(server.getName()),
peonExec,
callbackExec
callbackExec,
callbackLock
);
}

Expand Down Expand Up @@ -154,4 +159,9 @@ public boolean isHttpLoading()
{
return true;
}

public ReadWriteLock getCallbackLock()
{
return callbackLock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
*/
Expand All @@ -112,6 +114,7 @@ public class DruidCoordinatorTest
private OverlordClient overlordClient;
private CompactionStatusTracker statusTracker;
private LatchableServiceEmitter serviceEmitter;
private ReadWriteLock callbackSyncLock;

@Before
public void setUp() throws Exception
Expand Down Expand Up @@ -152,6 +155,7 @@ public void setUp() throws Exception
leaderAnnouncerLatch = new CountDownLatch(1);
leaderUnannouncerLatch = new CountDownLatch(1);
serviceEmitter = new LatchableServiceEmitter();
callbackSyncLock = new ReentrantReadWriteLock();
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
createMetadataManager(configManager),
Expand Down Expand Up @@ -198,6 +202,7 @@ public void testCoordinatorRun() throws Exception
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(tier, 2), null);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(foreverLoadRule)).atLeastOnce();
EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes();

metadataRuleManager.stop();
EasyMock.expectLastCall().once();
Expand Down Expand Up @@ -335,6 +340,7 @@ public void testCoordinatorTieredRun() throws Exception
.andReturn(ImmutableList.of(hotServer, coldServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes();

EasyMock.replay(metadataRuleManager, serverInventoryView, loadQueueTaskMaster);

Expand Down Expand Up @@ -419,6 +425,7 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith
.andReturn(ImmutableList.of(hotServer, coldServer, brokerServer1, brokerServer2, peonServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes();

EasyMock.replay(metadataRuleManager, serverInventoryView, loadQueueTaskMaster);

Expand Down Expand Up @@ -651,6 +658,7 @@ public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(serverInventoryView.getInventory()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes();
EasyMock.replay(serverInventoryView, loadQueueTaskMaster, segmentsMetadataManager);

// Create CoordinatorCustomDutyGroups
Expand Down Expand Up @@ -719,6 +727,7 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(coldTier, 0), null);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(intervalLoadRule, foreverLoadRule)).atLeastOnce();
EasyMock.expect(loadQueueTaskMaster.getCallbackLock()).andReturn(callbackSyncLock).anyTimes();

metadataRuleManager.stop();
EasyMock.expectLastCall().once();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -91,7 +92,8 @@ public void setUp()
httpClient.processingExecutor,
true
),
httpClient.callbackExecutor
httpClient.callbackExecutor,
new ReentrantReadWriteLock()
);
httpLoadQueuePeon.start();
}
Expand Down Expand Up @@ -338,7 +340,8 @@ public void testBatchSize()
httpClient.processingExecutor,
true
),
httpClient.callbackExecutor
httpClient.callbackExecutor,
new ReentrantReadWriteLock()
);

Assert.assertEquals(1, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL));
Expand Down
Loading