From 4e6d45f4e689a38274f26654b37dc4ba5d09f160 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 2 Jun 2026 19:54:41 +0800 Subject: [PATCH 1/6] Improve ConfigNode leader warm-up gating --- .../org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../heartbeat/DataNodeHeartbeatHandler.java | 93 ++++++----- .../ConfigRegionStateMachine.java | 25 ++- .../confignode/manager/ConfigManager.java | 8 +- .../confignode/manager/load/LoadManager.java | 110 ++++++++++++- .../manager/load/cache/AbstractLoadCache.java | 4 + .../manager/load/cache/LoadCache.java | 146 ++++++++++++++++++ .../cache/consensus/ConsensusGroupCache.java | 2 +- .../manager/partition/PartitionManager.java | 16 +- .../manager/load/LoadManagerTest.java | 62 ++++++++ .../db/protocol/client/ConfigNodeClient.java | 17 ++ 11 files changed, 435 insertions(+), 49 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 035c648132f1a..dad3ef44e23d1 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -209,6 +209,7 @@ public enum TSStatusCode { CAN_NOT_CONNECT_AINODE(1011), NO_AVAILABLE_REPLICA(1012), NO_AVAILABLE_AINODE(1013), + CONFIG_NODE_LEADER_WARMING_UP(1014), // Sync, Load TsFile LOAD_FILE_ERROR(1100), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index e7a31b1dc73eb..18b8206cbb2e0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.cluster.NodeStatus; @@ -27,7 +28,6 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.load.LoadManager; -import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator; @@ -36,6 +36,7 @@ import org.apache.thrift.async.AsyncMethodCallback; +import java.util.Collections; import java.util.Map; import java.util.function.Consumer; @@ -89,46 +90,55 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { RegionStatus regionStatus = RegionStatus.valueOf(heartbeatResp.getStatus()); - heartbeatResp - .getJudgedLeaders() - .forEach( - (regionGroupId, isLeader) -> { - - // Do not allow regions to inherit the Removing state from datanode - RegionStatus nextRegionStatus = regionStatus; - if (nextRegionStatus == RegionStatus.Removing) { - nextRegionStatus = - loadManager - .getLoadCache() - .getRegionCacheLastSampleStatus(regionGroupId, nodeId); - } - - // Update RegionGroupCache - loadManager - .getLoadCache() - .cacheRegionHeartbeatSample( - regionGroupId, - nodeId, - new RegionHeartbeatSample( - heartbeatResp.getHeartbeatTimestamp(), - // Region will inherit DataNode's status - nextRegionStatus), - false); - - if (((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) - && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) - || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)) - && Boolean.TRUE.equals(isLeader)) { - // Update ConsensusGroupCache when necessary - loadManager - .getLoadCache() - .cacheConsensusSample( - regionGroupId, - new ConsensusGroupHeartbeatSample( - heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); - } - }); + Map judgedLeaders = + heartbeatResp.isSetJudgedLeaders() + ? heartbeatResp.getJudgedLeaders() + : Collections.emptyMap(); + judgedLeaders.forEach( + (regionGroupId, isLeader) -> { + + // Do not allow regions to inherit the Removing state from datanode + RegionStatus nextRegionStatus = regionStatus; + if (nextRegionStatus == RegionStatus.Removing) { + nextRegionStatus = + loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId); + } + + // Update RegionGroupCache + loadManager + .getLoadCache() + .cacheRegionHeartbeatSample( + regionGroupId, + nodeId, + new RegionHeartbeatSample( + heartbeatResp.getHeartbeatTimestamp(), + // Region will inherit DataNode's status + nextRegionStatus), + false); + + boolean shouldCacheConsensusSample = + (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) + && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) + || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) + && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE); + long logicalTimestamp = + heartbeatResp.isSetConsensusLogicalTimeMap() + && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId) + ? heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId) + : heartbeatResp.getHeartbeatTimestamp(); + loadManager + .getLoadCache() + .cacheConsensusGroupHeartbeatSample( + regionGroupId, + nodeId, + Boolean.TRUE.equals(isLeader), + logicalTimestamp, + shouldCacheConsensusSample); + }); + loadManager + .getLoadCache() + .cacheUnreportedDataNodeRegionHeartbeatSamples( + nodeId, judgedLeaders.keySet(), heartbeatResp.getHeartbeatTimestamp()); if (heartbeatResp.getRegionDeviceUsageMap() != null) { deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap()); @@ -170,6 +180,7 @@ public void onError(Exception e) { if (ThriftClient.isConnectionBroken(e)) { loadManager.forceUpdateNodeCache( NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + loadManager.getLoadCache().cacheDataNodeHeartbeatFailureSample(nodeId, System.nanoTime()); } loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index fe687d17556f7..2b0dc1610e6c2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -288,13 +288,34 @@ public void notifyLeaderReady() { ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); - // Always start load services first - configManager.getLoadManager().startLoadServices(); + // Always start load services first and wait for its first full warm-up before serving. + long loadReadyEpoch = configManager.getLoadManager().startLoadServices(); if (CONF.isEnableTopologyProbing()) { configManager.getLoadManager().startTopologyService(); } + threadPool.submit(() -> startLeaderServicesAfterLoadReady(loadReadyEpoch)); + } + + private void startLeaderServicesAfterLoadReady(long loadReadyEpoch) { + if (!configManager.getLoadManager().waitForLoadReady(loadReadyEpoch)) { + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + + "skip starting leader services because load warm-up is interrupted", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + return; + } + if (!configManager.getConsensusManager().isLeaderReady()) { + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + + "skip starting leader services because consensus leader is no longer ready", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + return; + } + // Start leader scheduling services configManager.getProcedureManager().startExecutor(); threadPool.submit( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 2db0255e35ac4..ea2ffc1c2c1cf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1247,7 +1247,13 @@ protected TSStatus confirmLeader() { "ConsensusManager of target-ConfigNode is not initialized, " + "please make sure the target-ConfigNode has been started successfully."); } - return getConsensusManager().confirmLeader(); + TSStatus status = getConsensusManager().confirmLeader(); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + && !getLoadManager().isLoadReady()) { + return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) + .setMessage(getLoadManager().getLoadReadyReason()); + } + return status; } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index e97f32bdbda85..148a4bc8ae8a9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -48,10 +48,16 @@ import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.stream.Collectors; /** * The {@link LoadManager} at ConfigNodeGroup-Leader is active. It proactively implements the @@ -59,6 +65,9 @@ */ public class LoadManager { + private static final long LOAD_READY_CHECK_INTERVAL_MS = + Math.max(10, Math.min(100, StatisticsService.STATISTICS_UPDATE_INTERVAL / 10)); + protected final IManager configManager; /** Balancers. */ @@ -74,6 +83,10 @@ public class LoadManager { private final StatisticsService statisticsService; private final EventService eventService; private final TopologyService topologyService; + private final AtomicBoolean loadServicesStarted; + private final AtomicLong loadReadyEpoch; + private final AtomicBoolean loadReady; + private volatile String loadReadyReason; public LoadManager(IManager configManager) { this.configManager = configManager; @@ -90,6 +103,10 @@ public LoadManager(IManager configManager) { this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator()); this.eventService.register(routeBalancer); this.eventService.register(topologyService); + this.loadServicesStarted = new AtomicBoolean(false); + this.loadReadyEpoch = new AtomicLong(0); + this.loadReady = new AtomicBoolean(false); + this.loadReadyReason = "ConfigNode leader load services are not started."; } protected void setHeartbeatService(IManager configManager, LoadCache loadCache) { @@ -146,15 +163,24 @@ public void reBalanceDataPartitionPolicy(String database) { partitionBalancer.reBalanceDataPartitionPolicy(database); } - public void startLoadServices() { + public long startLoadServices() { + long epoch = loadReadyEpoch.incrementAndGet(); + loadReady.set(false); + loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat sampling."; loadCache.initHeartbeatCache(configManager); + loadServicesStarted.set(true); heartbeatService.startHeartbeatService(); statisticsService.startLoadStatisticsService(); eventService.startEventService(); partitionBalancer.setupPartitionBalancer(); + return epoch; } public void stopLoadServices() { + loadReadyEpoch.incrementAndGet(); + loadServicesStarted.set(false); + loadReady.set(false); + loadReadyReason = "ConfigNode leader load services are stopped."; heartbeatService.stopHeartbeatService(); statisticsService.stopLoadStatisticsService(); eventService.stopEventService(); @@ -163,6 +189,88 @@ public void stopLoadServices() { routeBalancer.clearRegionPriority(); } + public boolean waitForLoadReady(long epoch) { + while (epoch == loadReadyEpoch.get() && !Thread.currentThread().isInterrupted()) { + if (tryUpdateLoadReady()) { + return true; + } + try { + TimeUnit.MILLISECONDS.sleep(LOAD_READY_CHECK_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + return false; + } + + public boolean isLoadReady() { + return loadReady.get() || tryUpdateLoadReady(); + } + + public String getLoadReadyReason() { + return loadReadyReason; + } + + private synchronized boolean tryUpdateLoadReady() { + if (loadReady.get()) { + return true; + } + if (!loadServicesStarted.get()) { + loadReadyReason = "ConfigNode leader load services are not started."; + return false; + } + + loadCache.updateNodeStatistics(false); + loadCache.updateRegionGroupStatistics(); + loadCache.updateConsensusGroupStatistics(); + eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary(); + eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary(); + eventService.checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary(); + + List unreadyReasons = loadCache.getLoadWarmUpUnreadyReasons(); + if (!unreadyReasons.isEmpty() + && unreadyReasons.stream().anyMatch(reason -> !reason.startsWith("consensusGroups="))) { + loadReadyReason = "ConfigNode leader is warming up LoadCache: " + unreadyReasons; + return false; + } + + routeBalancer.balanceRegionLeaderAndPriority(); + + unreadyReasons = loadCache.getLoadWarmUpUnreadyReasons(); + if (!unreadyReasons.isEmpty()) { + loadReadyReason = "ConfigNode leader is warming up LoadCache: " + unreadyReasons; + return false; + } + + List unreadyRegionPriorities = getUnreadyRegionPriorities(); + if (!unreadyRegionPriorities.isEmpty()) { + loadReadyReason = + "ConfigNode leader is warming up region priority: " + + unreadyRegionPriorities.subList(0, Math.min(10, unreadyRegionPriorities.size())) + + (unreadyRegionPriorities.size() > 10 + ? "...(" + (unreadyRegionPriorities.size() - 10) + " more)" + : ""); + return false; + } + + loadReadyReason = "ConfigNode leader load services are ready."; + loadReady.set(true); + return true; + } + + private List getUnreadyRegionPriorities() { + List regionGroupIds = loadCache.getAllRegionGroupIds(); + if (regionGroupIds.isEmpty()) { + return Collections.emptyList(); + } + Map regionPriorityMap = + routeBalancer.getRegionPriorityMap(); + return regionGroupIds.stream() + .filter(regionGroupId -> !regionPriorityMap.containsKey(regionGroupId)) + .collect(Collectors.toCollection(ArrayList::new)); + } + public void startTopologyService() { topologyService.startTopologyService(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java index d61a004352033..e5ab6445f98e7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java @@ -97,6 +97,10 @@ public AbstractHeartbeatSample getLastSample() { return slidingWindow.isEmpty() ? null : slidingWindow.get(slidingWindow.size() - 1); } + public boolean hasHeartbeatSample() { + return getLastSample() != null; + } + /** * Update currentStatistics based on the latest heartbeat sample that cached in the slidingWindow. */ diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 818171c89bcc8..6be880dd08508 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -44,6 +44,7 @@ import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionCache; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; @@ -82,6 +83,7 @@ public class LoadCache { Math.max( ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2), TimeUnit.SECONDS.toMillis(10)); + private static final int MAX_UNREADY_ENTITY_PRINT = 10; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); @@ -98,6 +100,8 @@ public class LoadCache { private final Map> regionRawSizeMap; // Map private final Map consensusGroupCacheMap; + // Map> + private final Map> consensusGroupHeartbeatSampledNodeMap; // Map private final Map> confirmedConfigNodeMap; private Map> topologyGraph; @@ -110,6 +114,7 @@ public LoadCache() { this.regionSizeMap = new ConcurrentHashMap<>(); this.regionRawSizeMap = new ConcurrentHashMap<>(); this.consensusGroupCacheMap = new ConcurrentHashMap<>(); + this.consensusGroupHeartbeatSampledNodeMap = new ConcurrentHashMap<>(); this.confirmedConfigNodeMap = new ConcurrentHashMap<>(); this.topologyGraph = new HashMap<>(); this.topologyUpdated = new AtomicBoolean(false); @@ -175,6 +180,7 @@ private void initRegionGroupHeartbeatCache( Map> regionReplicaMap) { regionGroupCacheMap.clear(); consensusGroupCacheMap.clear(); + consensusGroupHeartbeatSampledNodeMap.clear(); regionReplicaMap.forEach( (database, regionReplicaSets) -> regionReplicaSets.forEach( @@ -192,6 +198,8 @@ private void initRegionGroupHeartbeatCache( .collect(Collectors.toSet()), isStrongConsistency)); consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); + consensusGroupHeartbeatSampledNodeMap.put( + regionGroupId, ConcurrentHashMap.newKeySet()); })); } @@ -200,6 +208,7 @@ public void clearHeartbeatCache() { nodeCacheMap.clear(); regionGroupCacheMap.clear(); consensusGroupCacheMap.clear(); + consensusGroupHeartbeatSampledNodeMap.clear(); } /** @@ -302,6 +311,7 @@ public void createRegionGroupHeartbeatCache( regionGroupId, new RegionGroupCache(database, regionGroupId, dataNodeIds, isStrongConsistency)); consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); + consensusGroupHeartbeatSampledNodeMap.put(regionGroupId, ConcurrentHashMap.newKeySet()); } /** @@ -364,6 +374,69 @@ public void cacheConsensusSample( .ifPresent(group -> group.cacheHeartbeatSample(sample)); } + public void cacheConsensusGroupHeartbeatSample( + TConsensusGroupId regionGroupId, + int nodeId, + boolean isLeader, + long logicalTimestamp, + boolean cacheLeader) { + consensusGroupHeartbeatSampledNodeMap + .computeIfAbsent(regionGroupId, empty -> ConcurrentHashMap.newKeySet()) + .add(nodeId); + if (cacheLeader && isLeader) { + cacheConsensusSample( + regionGroupId, new ConsensusGroupHeartbeatSample(logicalTimestamp, nodeId)); + } else if (isConsensusGroupHeartbeatFullySampled(regionGroupId) + && !Optional.ofNullable(consensusGroupCacheMap.get(regionGroupId)) + .map(AbstractLoadCache::hasHeartbeatSample) + .orElse(false)) { + cacheConsensusSample( + regionGroupId, + new ConsensusGroupHeartbeatSample( + logicalTimestamp, ConsensusGroupCache.UN_READY_LEADER_ID)); + } + } + + private boolean isConsensusGroupHeartbeatFullySampled(TConsensusGroupId regionGroupId) { + return Optional.ofNullable(regionGroupCacheMap.get(regionGroupId)) + .map(RegionGroupCache::getRegionLocations) + .map( + regionLocations -> + consensusGroupHeartbeatSampledNodeMap + .getOrDefault(regionGroupId, Collections.emptySet()) + .containsAll(regionLocations)) + .orElse(false); + } + + public void cacheDataNodeHeartbeatFailureSample(int nodeId, long sampleTimestamp) { + cacheUnreportedDataNodeRegionHeartbeatSamples(nodeId, Collections.emptySet(), sampleTimestamp); + } + + public void cacheUnreportedDataNodeRegionHeartbeatSamples( + int nodeId, Set reportedRegionGroupIds, long sampleTimestamp) { + getRegionGroupIdsByDataNodeId(nodeId) + .forEach( + regionGroupId -> { + if (reportedRegionGroupIds.contains(regionGroupId)) { + return; + } + cacheRegionHeartbeatSample( + regionGroupId, + nodeId, + new RegionHeartbeatSample(sampleTimestamp, RegionStatus.Unknown), + false); + cacheConsensusGroupHeartbeatSample( + regionGroupId, nodeId, false, sampleTimestamp, false); + }); + } + + private List getRegionGroupIdsByDataNodeId(int nodeId) { + return regionGroupCacheMap.entrySet().stream() + .filter(entry -> entry.getValue().getRegionLocations().contains(nodeId)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + /** Update the NodeStatistics of all Nodes. */ public void updateNodeStatistics(boolean forceUpdate) { nodeCacheMap @@ -450,6 +523,10 @@ public Map> getCurrentRegionLocationMap( return regionGroupIdsMap; } + public List getAllRegionGroupIds() { + return new ArrayList<>(regionGroupCacheMap.keySet()); + } + /** * Get the RegionGroupStatistics of all RegionGroups. * @@ -496,6 +573,74 @@ public Map getCurrentConsensusGroup return consensusGroupStatisticsMap; } + public boolean isLoadWarmUpReady() { + return getLoadWarmUpUnreadyReasons().isEmpty(); + } + + public List getLoadWarmUpUnreadyReasons() { + List unreadyReasons = new ArrayList<>(); + List unreadyNodes = new ArrayList<>(); + nodeCacheMap.forEach( + (nodeId, nodeCache) -> { + if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { + return; + } + if (!nodeCache.hasHeartbeatSample() + || nodeCache.getCurrentStatistics().getStatisticsNanoTimestamp() == Long.MIN_VALUE) { + unreadyNodes.add(nodeId); + } + }); + addUnreadyReason(unreadyReasons, "nodes", unreadyNodes); + + List unreadyRegions = new ArrayList<>(); + List unreadyRegionGroups = new ArrayList<>(); + regionGroupCacheMap.forEach( + (regionGroupId, regionGroupCache) -> { + regionGroupCache + .getRegionLocations() + .forEach( + dataNodeId -> { + RegionCache regionCache = regionGroupCache.getRegionCache(dataNodeId); + if (regionCache == null || !regionCache.hasHeartbeatSample()) { + unreadyRegions.add(regionGroupId + "@" + dataNodeId); + } + }); + if (!regionGroupCache + .getCurrentStatistics() + .getRegionStatisticsMap() + .keySet() + .containsAll(regionGroupCache.getRegionLocations())) { + unreadyRegionGroups.add(regionGroupId); + } + }); + addUnreadyReason(unreadyReasons, "regions", unreadyRegions); + addUnreadyReason(unreadyReasons, "regionGroups", unreadyRegionGroups); + + List unreadyConsensusGroups = new ArrayList<>(); + consensusGroupCacheMap.forEach( + (consensusGroupId, consensusGroupCache) -> { + if (!consensusGroupCache.hasHeartbeatSample() + || consensusGroupCache.getCurrentStatistics().getStatisticsNanoTimestamp() == 0) { + unreadyConsensusGroups.add(consensusGroupId); + } + }); + addUnreadyReason(unreadyReasons, "consensusGroups", unreadyConsensusGroups); + return unreadyReasons; + } + + private void addUnreadyReason(List reasons, String entityName, List unreadyEntities) { + if (unreadyEntities.isEmpty()) { + return; + } + List entitiesToPrint = + unreadyEntities.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT, unreadyEntities.size())); + String suffix = + unreadyEntities.size() > MAX_UNREADY_ENTITY_PRINT + ? "...(" + (unreadyEntities.size() - MAX_UNREADY_ENTITY_PRINT) + " more)" + : ""; + reasons.add(entityName + "=" + entitiesToPrint + suffix); + } + /** * Safely get NodeStatus by NodeId. * @@ -714,6 +859,7 @@ public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { regionGroupCacheMap.remove(consensusGroupId); consensusGroupCacheMap.remove(consensusGroupId); + consensusGroupHeartbeatSampledNodeMap.remove(consensusGroupId); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java index aa924dc29b585..7dcacc4fd805c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java @@ -41,7 +41,7 @@ public synchronized void updateCurrentStatistics(boolean forceUpdate) { synchronized (slidingWindow) { lastSample = (ConsensusGroupHeartbeatSample) getLastSample(); } - if (lastSample != null && lastSample.getLeaderId() != UN_READY_LEADER_ID) { + if (lastSample != null) { currentStatistics.set( new ConsensusGroupStatistics(System.nanoTime(), lastSample.getLeaderId())); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 5be81256b5c39..c9989b5deaeb5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -300,7 +300,7 @@ public SchemaPartitionResp getOrCreateSchemaPartition(final GetOrCreateSchemaPar assignedSchemaPartition = getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); } catch (final NoAvailableRegionGroupException e) { - status = getConsensusManager().confirmLeader(); + status = confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // The allocation might fail due to leadership change resp.setStatus(status); @@ -445,7 +445,7 @@ public DataPartitionResp getOrCreateDataPartition(final GetOrCreateDataPartition assignedDataPartition = getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap); } catch (DatabaseNotExistsException | NoAvailableRegionGroupException e) { - status = getConsensusManager().confirmLeader(); + status = confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // The allocation might fail due to leadership change resp.setStatus(status); @@ -543,7 +543,7 @@ public void markDataPartitionTableIntegrityCheckProcedureFinished() { } private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { - TSStatus status = getConsensusManager().confirmLeader(); + TSStatus status = confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Here we check the leadership second time // since the RegionGroup creating process might take some time @@ -1597,6 +1597,16 @@ public ScheduledExecutorService getRegionMaintainer() { return regionMaintainer; } + private TSStatus confirmLeader() { + TSStatus status = getConsensusManager().confirmLeader(); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + && !getLoadManager().isLoadReady()) { + return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) + .setMessage(getLoadManager().getLoadReadyReason()); + } + return status; + } + private ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java index fd62c31ae3678..6380fec06f7d2 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java @@ -292,4 +292,66 @@ public void testConsensusGroupCache() throws InterruptedException { new Pair<>(new ConsensusGroupStatistics(newLeaderId), null), differentConsensusGroupStatisticsMap.get(regionGroupId)); } + + @Test + public void testLoadWarmUpRequiresAllEntitySamples() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 100); + Set dataNodeIds = Stream.of(11, 12).collect(Collectors.toSet()); + + dataNodeIds.forEach( + dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); + + Assert.assertFalse(loadCache.isLoadWarmUpReady()); + + dataNodeIds.forEach( + dataNodeId -> + loadCache.cacheDataNodeHeartbeatSample( + dataNodeId, new NodeHeartbeatSample(NodeStatus.Running))); + loadCache.updateNodeStatistics(false); + loadCache.cacheRegionHeartbeatSample( + regionGroupId, 11, new RegionHeartbeatSample(RegionStatus.Running), false); + loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, 11, true, 1, true); + loadCache.updateRegionGroupStatistics(); + loadCache.updateConsensusGroupStatistics(); + + Assert.assertFalse(loadCache.isLoadWarmUpReady()); + Assert.assertTrue(loadCache.getLoadWarmUpUnreadyReasons().toString().contains("regions=")); + + loadCache.cacheRegionHeartbeatSample( + regionGroupId, 12, new RegionHeartbeatSample(RegionStatus.Running), false); + loadCache.updateRegionGroupStatistics(); + + Assert.assertTrue( + loadCache.getLoadWarmUpUnreadyReasons().toString(), loadCache.isLoadWarmUpReady()); + } + + @Test + public void testConsensusGroupWarmUpAcceptsFullySampledWithoutLeader() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 101); + Set dataNodeIds = Stream.of(21, 22).collect(Collectors.toSet()); + + dataNodeIds.forEach( + dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); + dataNodeIds.forEach( + dataNodeId -> { + loadCache.cacheDataNodeHeartbeatSample( + dataNodeId, new NodeHeartbeatSample(NodeStatus.Running)); + loadCache.cacheRegionHeartbeatSample( + regionGroupId, dataNodeId, new RegionHeartbeatSample(RegionStatus.Running), false); + loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, dataNodeId, false, 1, true); + }); + loadCache.updateNodeStatistics(false); + loadCache.updateRegionGroupStatistics(); + loadCache.updateConsensusGroupStatistics(); + + Assert.assertTrue( + loadCache.getLoadWarmUpUnreadyReasons().toString(), loadCache.isLoadWarmUpReady()); + Assert.assertEquals( + ConsensusGroupStatistics.generateDefaultConsensusGroupStatistics(), + loadCache.getCurrentConsensusGroupStatisticsMap().get(regionGroupId)); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 1f0b09f0b8689..545ee026871e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -401,6 +401,23 @@ private boolean updateConfigNodeLeader(TSStatus status) { } return true; } + if (status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) { + if (!isFirstInitiated) { + logger.info( + "ConfigNode leader {} is warming up before serving DataNode {}, will wait and retry." + + " Reason: {}", + configNode, + config.getAddressAndPort(), + status.getMessage()); + } + try { + Thread.sleep(WAIT_CN_LEADER_ELECTION_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn(DataNodeMiscMessages.UNEXPECTED_INTERRUPTION_CONNECT_CONFIG_NODE_BREAK); + } + return true; + } return false; } finally { isFirstInitiated = false; From a57680d254239d3b7f962f2600042d48ce537b52 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 3 Jun 2026 17:48:50 +0800 Subject: [PATCH 2/6] Refine ConfigNode leader warm-up readiness --- .../heartbeat/ConfigNodeHeartbeatHandler.java | 7 +- .../heartbeat/DataNodeHeartbeatHandler.java | 9 +- .../ConfigRegionStateMachine.java | 27 ++++-- .../confignode/manager/ConfigManager.java | 8 +- .../manager/consensus/ConsensusManager.java | 69 ++++++++------ .../confignode/manager/load/LoadManager.java | 91 ++++++------------- .../manager/load/cache/LoadCache.java | 49 ++-------- .../manager/partition/PartitionManager.java | 16 +--- .../manager/load/LoadManagerTest.java | 58 +++++++----- 9 files changed, 136 insertions(+), 198 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java index 96225e4fb7bdb..97ef4c56f2993 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java @@ -19,7 +19,6 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; -import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.confignode.manager.load.LoadManager; @@ -47,10 +46,8 @@ public void onComplete(TConfigNodeHeartbeatResp resp) { @Override public void onError(Exception e) { - if (ThriftClient.isConnectionBroken(e)) { - loadManager.forceUpdateNodeCache( - NodeType.ConfigNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); - } + loadManager.forceUpdateNodeCache( + NodeType.ConfigNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 18b8206cbb2e0..8b258f24138e4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -21,7 +21,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.cluster.RegionStatus; @@ -177,11 +176,9 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { @Override public void onError(Exception e) { - if (ThriftClient.isConnectionBroken(e)) { - loadManager.forceUpdateNodeCache( - NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); - loadManager.getLoadCache().cacheDataNodeHeartbeatFailureSample(nodeId, System.nanoTime()); - } + loadManager.forceUpdateNodeCache( + NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + loadManager.getLoadCache().cacheDataNodeHeartbeatFailureSample(nodeId, System.nanoTime()); loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 2b0dc1610e6c2..08d834f6c8654 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -64,6 +64,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -76,6 +77,7 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.CONFIG_NODE_RECOVER.getName()); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private final ConfigPlanExecutor executor; + private final AtomicBoolean leaderServicesReady; private ConfigManager configManager; /** Variables for {@link ConfigNode} Simple Consensus. */ @@ -98,6 +100,7 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) { this.executor = executor; + this.leaderServicesReady = new AtomicBoolean(false); this.configManager = configManager; this.currentNodeTEndPoint = new TEndPoint() @@ -231,6 +234,7 @@ public void loadSnapshot(final File latestSnapshotRootDir) { @Override public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { + leaderServicesReady.set(false); // We get currentNodeId here because the currentNodeId // couldn't initialize earlier than the ConfigRegionStateMachine int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); @@ -246,6 +250,7 @@ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { @Override public void notifyNotLeader() { + leaderServicesReady.set(false); // We get currentNodeId here because the currentNodeId // couldn't initialize earlier than the ConfigRegionStateMachine int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); @@ -283,29 +288,29 @@ public void notifyNotLeader() { @Override public void notifyLeaderReady() { + leaderServicesReady.set(false); LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER, ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); - // Always start load services first and wait for its first full warm-up before serving. - long loadReadyEpoch = configManager.getLoadManager().startLoadServices(); + // Always start load services first. ConsensusManager gates external serving until warm-up. + configManager.getLoadManager().startLoadServices(); if (CONF.isEnableTopologyProbing()) { configManager.getLoadManager().startTopologyService(); } - threadPool.submit(() -> startLeaderServicesAfterLoadReady(loadReadyEpoch)); + threadPool.submit(this::startLeaderServicesAfterLoadReady); } - private void startLeaderServicesAfterLoadReady(long loadReadyEpoch) { - if (!configManager.getLoadManager().waitForLoadReady(loadReadyEpoch)) { + private void startLeaderServicesAfterLoadReady() { + if (!configManager.getLoadManager().isLoadReady()) { LOGGER.info( - ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER - + "skip starting leader services because load warm-up is interrupted", + "Current ConfigNode(nodeId: {}, ip: {}) starts leader services while load warm-up is" + + " still in progress.", ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); - return; } if (!configManager.getConsensusManager().isLeaderReady()) { LOGGER.info( @@ -355,12 +360,18 @@ private void startLeaderServicesAfterLoadReady(long loadReadyEpoch) { // Do check async because sync will be slow and block every other things. threadPool.submit(() -> configManager.getClusterManager().checkClusterId()); + leaderServicesReady.set(true); + LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); } + public boolean areLeaderServicesReady() { + return leaderServicesReady.get(); + } + @Override public void start() { if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index ea2ffc1c2c1cf..2db0255e35ac4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1247,13 +1247,7 @@ protected TSStatus confirmLeader() { "ConsensusManager of target-ConfigNode is not initialized, " + "please make sure the target-ConfigNode has been started successfully."); } - TSStatus status = getConsensusManager().confirmLeader(); - if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - && !getLoadManager().isLoadReady()) { - return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) - .setMessage(getLoadManager().getLoadReadyReason()); - } - return status; + return getConsensusManager().confirmLeader(); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 8b4eeed5a1b58..e85f9692d0621 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -77,12 +77,14 @@ public class ConsensusManager { new ConfigRegionId(CONF.getConfigRegionId()); private final IManager configManager; + private final ConfigRegionStateMachine stateMachine; private IConsensus consensusImpl; private boolean isInitialized; public ConsensusManager(IManager configManager, ConfigRegionStateMachine stateMachine) { this.configManager = configManager; + this.stateMachine = stateMachine; setConsensusLayer(stateMachine); } @@ -424,39 +426,50 @@ public boolean isLeaderExist() { * NEED_REDIRECTION otherwise */ public TSStatus confirmLeader() { - TSStatus result = new TSStatus(); - if (isLeaderReady()) { - result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } else { - result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); - if (isLeader()) { - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < MAX_WAIT_READY_TIME_MS) { - if (isLeaderReady()) { - result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - return result; - } - try { - Thread.sleep(RETRY_WAIT_TIME_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn( - ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY); - break; - } - } - result.setMessage( - "The current ConfigNode is leader but not ready yet, please try again later."); - } else { - result.setMessage( - "The current ConfigNode is not leader, please redirect to a new ConfigNode."); - } + if (!isLeader()) { + TSStatus result = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); + result.setMessage( + "The current ConfigNode is not leader, please redirect to a new ConfigNode."); TConfigNodeLocation leaderLocation = getLeaderLocation(); if (leaderLocation != null) { result.setRedirectNode(leaderLocation.getInternalEndPoint()); } + return result; } - return result; + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < MAX_WAIT_READY_TIME_MS) { + if (isLeaderReady()) { + break; + } + try { + Thread.sleep(RETRY_WAIT_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn( + ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY); + break; + } + } + + if (!isLeaderReady()) { + return getLeaderWarmingUpStatus( + "The current ConfigNode is leader but consensus is not ready yet."); + } + if (!stateMachine.areLeaderServicesReady()) { + return getLeaderWarmingUpStatus( + "The current ConfigNode is leader but leader services are not ready yet."); + } + if (!configManager.getLoadManager().isLoadReady()) { + return getLeaderWarmingUpStatus(configManager.getLoadManager().getLoadReadyReason()); + } + + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + private TSStatus getLeaderWarmingUpStatus(String message) { + return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) + .setMessage(message); } public ConsensusGroupId getConsensusGroupId() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 148a4bc8ae8a9..c6bf610612580 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -48,8 +48,6 @@ import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -57,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import java.util.stream.Collectors; /** * The {@link LoadManager} at ConfigNodeGroup-Leader is active. It proactively implements the @@ -65,8 +62,7 @@ */ public class LoadManager { - private static final long LOAD_READY_CHECK_INTERVAL_MS = - Math.max(10, Math.min(100, StatisticsService.STATISTICS_UPDATE_INTERVAL / 10)); + private static final long FIRST_HEARTBEAT_READY_TOLERANCE_MS = TimeUnit.SECONDS.toMillis(30); protected final IManager configManager; @@ -84,7 +80,7 @@ public class LoadManager { private final EventService eventService; private final TopologyService topologyService; private final AtomicBoolean loadServicesStarted; - private final AtomicLong loadReadyEpoch; + private final AtomicLong loadReadyStartTimeMillis; private final AtomicBoolean loadReady; private volatile String loadReadyReason; @@ -104,7 +100,7 @@ public LoadManager(IManager configManager) { this.eventService.register(routeBalancer); this.eventService.register(topologyService); this.loadServicesStarted = new AtomicBoolean(false); - this.loadReadyEpoch = new AtomicLong(0); + this.loadReadyStartTimeMillis = new AtomicLong(0); this.loadReady = new AtomicBoolean(false); this.loadReadyReason = "ConfigNode leader load services are not started."; } @@ -163,9 +159,9 @@ public void reBalanceDataPartitionPolicy(String database) { partitionBalancer.reBalanceDataPartitionPolicy(database); } - public long startLoadServices() { - long epoch = loadReadyEpoch.incrementAndGet(); + public void startLoadServices() { loadReady.set(false); + loadReadyStartTimeMillis.set(System.currentTimeMillis()); loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat sampling."; loadCache.initHeartbeatCache(configManager); loadServicesStarted.set(true); @@ -173,12 +169,11 @@ public long startLoadServices() { statisticsService.startLoadStatisticsService(); eventService.startEventService(); partitionBalancer.setupPartitionBalancer(); - return epoch; } public void stopLoadServices() { - loadReadyEpoch.incrementAndGet(); loadServicesStarted.set(false); + loadReadyStartTimeMillis.set(0); loadReady.set(false); loadReadyReason = "ConfigNode leader load services are stopped."; heartbeatService.stopHeartbeatService(); @@ -189,21 +184,6 @@ public void stopLoadServices() { routeBalancer.clearRegionPriority(); } - public boolean waitForLoadReady(long epoch) { - while (epoch == loadReadyEpoch.get() && !Thread.currentThread().isInterrupted()) { - if (tryUpdateLoadReady()) { - return true; - } - try { - TimeUnit.MILLISECONDS.sleep(LOAD_READY_CHECK_INTERVAL_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - return false; - } - public boolean isLoadReady() { return loadReady.get() || tryUpdateLoadReady(); } @@ -222,55 +202,32 @@ private synchronized boolean tryUpdateLoadReady() { } loadCache.updateNodeStatistics(false); - loadCache.updateRegionGroupStatistics(); - loadCache.updateConsensusGroupStatistics(); eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary(); - eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary(); - eventService.checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary(); - - List unreadyReasons = loadCache.getLoadWarmUpUnreadyReasons(); - if (!unreadyReasons.isEmpty() - && unreadyReasons.stream().anyMatch(reason -> !reason.startsWith("consensusGroups="))) { - loadReadyReason = "ConfigNode leader is warming up LoadCache: " + unreadyReasons; - return false; - } - - routeBalancer.balanceRegionLeaderAndPriority(); - unreadyReasons = loadCache.getLoadWarmUpUnreadyReasons(); - if (!unreadyReasons.isEmpty()) { - loadReadyReason = "ConfigNode leader is warming up LoadCache: " + unreadyReasons; - return false; + List unreadyReasons = loadCache.getNodeHeartbeatUnreadyReasons(); + if (unreadyReasons.isEmpty()) { + loadReadyReason = "ConfigNode leader load services are ready."; + loadReady.set(true); + return true; } - List unreadyRegionPriorities = getUnreadyRegionPriorities(); - if (!unreadyRegionPriorities.isEmpty()) { + long elapsedMillis = System.currentTimeMillis() - loadReadyStartTimeMillis.get(); + if (elapsedMillis < FIRST_HEARTBEAT_READY_TOLERANCE_MS) { loadReadyReason = - "ConfigNode leader is warming up region priority: " - + unreadyRegionPriorities.subList(0, Math.min(10, unreadyRegionPriorities.size())) - + (unreadyRegionPriorities.size() > 10 - ? "...(" + (unreadyRegionPriorities.size() - 10) + " more)" - : ""); + "ConfigNode leader is waiting for first heartbeat from registered ConfigNodes/DataNodes: " + + unreadyReasons; return false; } - loadReadyReason = "ConfigNode leader load services are ready."; + loadReadyReason = + "ConfigNode leader load services are ready after waiting " + + FIRST_HEARTBEAT_READY_TOLERANCE_MS + + "ms for first heartbeat. Missing heartbeats: " + + unreadyReasons; loadReady.set(true); return true; } - private List getUnreadyRegionPriorities() { - List regionGroupIds = loadCache.getAllRegionGroupIds(); - if (regionGroupIds.isEmpty()) { - return Collections.emptyList(); - } - Map regionPriorityMap = - routeBalancer.getRegionPriorityMap(); - return regionGroupIds.stream() - .filter(regionGroupId -> !regionPriorityMap.containsKey(regionGroupId)) - .collect(Collectors.toCollection(ArrayList::new)); - } - public void startTopologyService() { topologyService.startTopologyService(); } @@ -595,6 +552,14 @@ public RouteBalancer getRouteBalancer() { return routeBalancer; } + @TestOnly + void markLoadServicesStartedForTest(long loadReadyStartTimeMillis) { + loadServicesStarted.set(true); + loadReady.set(false); + this.loadReadyStartTimeMillis.set(loadReadyStartTimeMillis); + loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat sampling."; + } + @TestOnly public EventService getEventService() { return eventService; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 6be880dd08508..940804948aeea 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -44,7 +44,6 @@ import org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; -import org.apache.iotdb.confignode.manager.load.cache.region.RegionCache; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; @@ -573,58 +572,22 @@ public Map getCurrentConsensusGroup return consensusGroupStatisticsMap; } - public boolean isLoadWarmUpReady() { - return getLoadWarmUpUnreadyReasons().isEmpty(); - } - - public List getLoadWarmUpUnreadyReasons() { - List unreadyReasons = new ArrayList<>(); + public List getNodeHeartbeatUnreadyReasons() { List unreadyNodes = new ArrayList<>(); nodeCacheMap.forEach( (nodeId, nodeCache) -> { if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { return; } - if (!nodeCache.hasHeartbeatSample() - || nodeCache.getCurrentStatistics().getStatisticsNanoTimestamp() == Long.MIN_VALUE) { + if ((nodeCache instanceof ConfigNodeHeartbeatCache + || nodeCache instanceof DataNodeHeartbeatCache) + && !nodeCache.hasHeartbeatSample()) { unreadyNodes.add(nodeId); } }); + List unreadyReasons = new ArrayList<>(); + Collections.sort(unreadyNodes); addUnreadyReason(unreadyReasons, "nodes", unreadyNodes); - - List unreadyRegions = new ArrayList<>(); - List unreadyRegionGroups = new ArrayList<>(); - regionGroupCacheMap.forEach( - (regionGroupId, regionGroupCache) -> { - regionGroupCache - .getRegionLocations() - .forEach( - dataNodeId -> { - RegionCache regionCache = regionGroupCache.getRegionCache(dataNodeId); - if (regionCache == null || !regionCache.hasHeartbeatSample()) { - unreadyRegions.add(regionGroupId + "@" + dataNodeId); - } - }); - if (!regionGroupCache - .getCurrentStatistics() - .getRegionStatisticsMap() - .keySet() - .containsAll(regionGroupCache.getRegionLocations())) { - unreadyRegionGroups.add(regionGroupId); - } - }); - addUnreadyReason(unreadyReasons, "regions", unreadyRegions); - addUnreadyReason(unreadyReasons, "regionGroups", unreadyRegionGroups); - - List unreadyConsensusGroups = new ArrayList<>(); - consensusGroupCacheMap.forEach( - (consensusGroupId, consensusGroupCache) -> { - if (!consensusGroupCache.hasHeartbeatSample() - || consensusGroupCache.getCurrentStatistics().getStatisticsNanoTimestamp() == 0) { - unreadyConsensusGroups.add(consensusGroupId); - } - }); - addUnreadyReason(unreadyReasons, "consensusGroups", unreadyConsensusGroups); return unreadyReasons; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index c9989b5deaeb5..5be81256b5c39 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -300,7 +300,7 @@ public SchemaPartitionResp getOrCreateSchemaPartition(final GetOrCreateSchemaPar assignedSchemaPartition = getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap); } catch (final NoAvailableRegionGroupException e) { - status = confirmLeader(); + status = getConsensusManager().confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // The allocation might fail due to leadership change resp.setStatus(status); @@ -445,7 +445,7 @@ public DataPartitionResp getOrCreateDataPartition(final GetOrCreateDataPartition assignedDataPartition = getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap); } catch (DatabaseNotExistsException | NoAvailableRegionGroupException e) { - status = confirmLeader(); + status = getConsensusManager().confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // The allocation might fail due to leadership change resp.setStatus(status); @@ -543,7 +543,7 @@ public void markDataPartitionTableIntegrityCheckProcedureFinished() { } private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) { - TSStatus status = confirmLeader(); + TSStatus status = getConsensusManager().confirmLeader(); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Here we check the leadership second time // since the RegionGroup creating process might take some time @@ -1597,16 +1597,6 @@ public ScheduledExecutorService getRegionMaintainer() { return regionMaintainer; } - private TSStatus confirmLeader() { - TSStatus status = getConsensusManager().confirmLeader(); - if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() - && !getLoadManager().isLoadReady()) { - return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) - .setMessage(getLoadManager().getLoadReadyReason()); - } - return status; - } - private ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java index 6380fec06f7d2..e2c51df9693be 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java @@ -48,6 +48,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -294,41 +295,34 @@ public void testConsensusGroupCache() throws InterruptedException { } @Test - public void testLoadWarmUpRequiresAllEntitySamples() { + public void testLoadWarmUpRequiresOnlyConfigNodeAndDataNodeSamples() { LoadCache loadCache = new LoadCache(); TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 100); Set dataNodeIds = Stream.of(11, 12).collect(Collectors.toSet()); + loadCache.createNodeHeartbeatCache(NodeType.ConfigNode, 10); dataNodeIds.forEach( dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createNodeHeartbeatCache(NodeType.AINode, 13); loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); - Assert.assertFalse(loadCache.isLoadWarmUpReady()); + Assert.assertEquals( + Collections.singletonList("nodes=[10, 11, 12]"), + loadCache.getNodeHeartbeatUnreadyReasons()); + + loadCache.cacheConfigNodeHeartbeatSample(10, new NodeHeartbeatSample(NodeStatus.Unknown)); dataNodeIds.forEach( dataNodeId -> loadCache.cacheDataNodeHeartbeatSample( dataNodeId, new NodeHeartbeatSample(NodeStatus.Running))); loadCache.updateNodeStatistics(false); - loadCache.cacheRegionHeartbeatSample( - regionGroupId, 11, new RegionHeartbeatSample(RegionStatus.Running), false); - loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, 11, true, 1, true); - loadCache.updateRegionGroupStatistics(); - loadCache.updateConsensusGroupStatistics(); - Assert.assertFalse(loadCache.isLoadWarmUpReady()); - Assert.assertTrue(loadCache.getLoadWarmUpUnreadyReasons().toString().contains("regions=")); - - loadCache.cacheRegionHeartbeatSample( - regionGroupId, 12, new RegionHeartbeatSample(RegionStatus.Running), false); - loadCache.updateRegionGroupStatistics(); - - Assert.assertTrue( - loadCache.getLoadWarmUpUnreadyReasons().toString(), loadCache.isLoadWarmUpReady()); + Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty()); } @Test - public void testConsensusGroupWarmUpAcceptsFullySampledWithoutLeader() { + public void testRegionAndConsensusGroupsDoNotBlockLoadWarmUp() { LoadCache loadCache = new LoadCache(); TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 101); Set dataNodeIds = Stream.of(21, 22).collect(Collectors.toSet()); @@ -340,18 +334,32 @@ public void testConsensusGroupWarmUpAcceptsFullySampledWithoutLeader() { dataNodeId -> { loadCache.cacheDataNodeHeartbeatSample( dataNodeId, new NodeHeartbeatSample(NodeStatus.Running)); - loadCache.cacheRegionHeartbeatSample( - regionGroupId, dataNodeId, new RegionHeartbeatSample(RegionStatus.Running), false); - loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, dataNodeId, false, 1, true); }); loadCache.updateNodeStatistics(false); loadCache.updateRegionGroupStatistics(); loadCache.updateConsensusGroupStatistics(); - Assert.assertTrue( - loadCache.getLoadWarmUpUnreadyReasons().toString(), loadCache.isLoadWarmUpReady()); - Assert.assertEquals( - ConsensusGroupStatistics.generateDefaultConsensusGroupStatistics(), - loadCache.getCurrentConsensusGroupStatisticsMap().get(regionGroupId)); + Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty()); + } + + @Test + public void testLoadWarmUpToleratesMissingFirstHeartbeatAfterThirtySeconds() { + LOAD_CACHE.clearHeartbeatCache(); + LOAD_CACHE.createNodeHeartbeatCache(NodeType.DataNode, 31); + + try { + LOAD_MANAGER.markLoadServicesStartedForTest(System.currentTimeMillis()); + + Assert.assertFalse(LOAD_MANAGER.isLoadReady()); + Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("waiting for first heartbeat")); + + LOAD_MANAGER.markLoadServicesStartedForTest( + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(31)); + + Assert.assertTrue(LOAD_MANAGER.isLoadReady()); + Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("Missing heartbeats")); + } finally { + LOAD_MANAGER.stopLoadServices(); + } } } From 2501efbe2a4e28334c3be2fb9d4c4b18116212df Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Mon, 8 Jun 2026 10:54:44 +0800 Subject: [PATCH 3/6] Fix ConfigNode leader warm-up recovery --- .../ainode/iotdb/ainode/core/constant.py | 1 + .../ainode/iotdb/ainode/core/rpc/client.py | 11 +- .../heartbeat/DataNodeHeartbeatHandler.java | 8 +- .../ConfigRegionStateMachine.java | 200 +++++++++++------- .../confignode/manager/ConfigManager.java | 4 + .../confignode/manager/ProcedureManager.java | 13 +- .../manager/consensus/ConsensusManager.java | 10 +- .../manager/load/cache/LoadCache.java | 3 +- .../procedure/ProcedureExecutor.java | 13 +- .../iotdb/confignode/service/ConfigNode.java | 8 +- .../manager/load/LoadManagerTest.java | 18 ++ .../db/protocol/client/ConfigNodeClient.java | 28 ++- 12 files changed, 230 insertions(+), 87 deletions(-) diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py b/iotdb-core/ainode/iotdb/ainode/core/constant.py index 4a2ee543d1f8d..1eb79085bcfa0 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/constant.py +++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py @@ -80,6 +80,7 @@ class TSStatusCode(Enum): SUCCESS_STATUS = 200 REDIRECTION_RECOMMEND = 400 + CONFIG_NODE_LEADER_WARMING_UP = 1014 MODEL_EXISTED_ERROR = 1503 MODEL_NOT_EXIST_ERROR = 1504 CREATE_MODEL_ERROR = 1505 diff --git a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py index ea6362ef080af..81bb81d50bb34 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py +++ b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py @@ -66,6 +66,7 @@ def __init__(self, config_leader: TEndPoint): "Fail to connect to any config node. Please check status of ConfigNodes" ) self._RETRY_NUM = 10 + self._STARTUP_RETRY_NUM = 60 self._RETRY_INTERVAL_IN_S = 1 self._try_to_connect() @@ -163,6 +164,12 @@ def _update_config_node_leader(self, status: TSStatus) -> bool: else: self._config_leader = None return True + if status.code == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.get_status_code(): + logger.info( + "ConfigNode leader is warming up before serving AINode, will wait and retry. Reason: {}", + status.message, + ) + return True return False def node_register( @@ -177,7 +184,7 @@ def node_register( versionInfo=version_info, ) - for _ in range(0, self._RETRY_NUM): + for _ in range(0, self._STARTUP_RETRY_NUM): try: resp = self._client.registerAINode(req) if not self._update_config_node_leader(resp.status): @@ -208,7 +215,7 @@ def node_restart( versionInfo=version_info, ) - for _ in range(0, self._RETRY_NUM): + for _ in range(0, self._STARTUP_RETRY_NUM): try: resp = self._client.restartAINode(req) if not self._update_config_node_leader(resp.status): diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 8b258f24138e4..3d5da9600a737 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -120,9 +120,11 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE); - long logicalTimestamp = + boolean hasConsensusLogicalTimestamp = heartbeatResp.isSetConsensusLogicalTimeMap() - && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId) + && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId); + long logicalTimestamp = + hasConsensusLogicalTimestamp ? heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId) : heartbeatResp.getHeartbeatTimestamp(); loadManager @@ -132,7 +134,7 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { nodeId, Boolean.TRUE.equals(isLeader), logicalTimestamp, - shouldCacheConsensusSample); + shouldCacheConsensusSample && hasConsensusLogicalTimestamp); }); loadManager .getLoadCache() diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 08d834f6c8654..1c758bf404658 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -65,6 +65,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -78,6 +79,8 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private final ConfigPlanExecutor executor; private final AtomicBoolean leaderServicesReady; + private final AtomicLong leaderServicesEpoch; + private final Object leaderServicesLock; private ConfigManager configManager; /** Variables for {@link ConfigNode} Simple Consensus. */ @@ -101,6 +104,8 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) { this.executor = executor; this.leaderServicesReady = new AtomicBoolean(false); + this.leaderServicesEpoch = new AtomicLong(0); + this.leaderServicesLock = new Object(); this.configManager = configManager; this.currentNodeTEndPoint = new TEndPoint() @@ -234,11 +239,11 @@ public void loadSnapshot(final File latestSnapshotRootDir) { @Override public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { - leaderServicesReady.set(false); // We get currentNodeId here because the currentNodeId // couldn't initialize earlier than the ConfigRegionStateMachine int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); if (currentNodeId != newLeaderId) { + invalidateLeaderServices(); LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + "the new leader is [nodeId:{}]", @@ -250,7 +255,7 @@ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { @Override public void notifyNotLeader() { - leaderServicesReady.set(false); + invalidateLeaderServices(); // We get currentNodeId here because the currentNodeId // couldn't initialize earlier than the ConfigRegionStateMachine int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); @@ -259,25 +264,30 @@ public void notifyNotLeader() { + "start cleaning up related services", currentNodeId, currentNodeTEndPoint); - // Stop leader scheduling services - configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); - configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); - configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync(); - configManager.getLoadManager().stopTopologyService(); - configManager.getLoadManager().stopLoadServices(); - configManager.getProcedureManager().stopExecutor(); - configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); - configManager.getPartitionManager().stopRegionCleaner(); - configManager.getCQManager().stopCQScheduler(); - configManager.getClusterSchemaManager().clearSchemaQuotaCache(); - // Remove Metric after leader change - configManager.removeMetrics(); - - // Shutdown leader related service for config pipe - PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); - - // Clean receiver file dir - PipeConfigNodeAgent.receiver().cleanPipeReceiverDir(); + synchronized (leaderServicesLock) { + // Stop leader scheduling services + configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); + configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); + configManager + .getSubscriptionManager() + .getSubscriptionCoordinator() + .stopSubscriptionMetaSync(); + configManager.getLoadManager().stopTopologyService(); + configManager.getLoadManager().stopLoadServices(); + configManager.getProcedureManager().stopExecutor(); + configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); + configManager.getPartitionManager().stopRegionCleaner(); + configManager.getCQManager().stopCQScheduler(); + configManager.getClusterSchemaManager().clearSchemaQuotaCache(); + // Remove Metric after leader change + configManager.removeMetrics(); + + // Shutdown leader related service for config pipe + PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); + + // Clean receiver file dir + PipeConfigNodeAgent.receiver().cleanPipeReceiverDir(); + } LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER @@ -288,7 +298,7 @@ public void notifyNotLeader() { @Override public void notifyLeaderReady() { - leaderServicesReady.set(false); + final long epoch = nextLeaderServicesEpoch(); LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER, ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), @@ -301,10 +311,10 @@ public void notifyLeaderReady() { configManager.getLoadManager().startTopologyService(); } - threadPool.submit(this::startLeaderServicesAfterLoadReady); + threadPool.submit(() -> startLeaderServicesAfterLoadReady(epoch)); } - private void startLeaderServicesAfterLoadReady() { + private void startLeaderServicesAfterLoadReady(final long epoch) { if (!configManager.getLoadManager().isLoadReady()) { LOGGER.info( "Current ConfigNode(nodeId: {}, ip: {}) starts leader services while load warm-up is" @@ -312,64 +322,110 @@ private void startLeaderServicesAfterLoadReady() { ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); } - if (!configManager.getConsensusManager().isLeaderReady()) { + synchronized (leaderServicesLock) { + if (!isCurrentLeaderServicesEpoch(epoch)) { + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + + "skip starting leader services because the leader epoch is stale", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + return; + } + + // Start leader scheduling services + submitIfLeaderServicesEpochCurrent( + epoch, () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()); + configManager.getRetryFailedTasksThread().startRetryFailedTasksService(); + configManager.getPartitionManager().startRegionCleaner(); + // Add Metric after leader ready + configManager.addMetrics(); + + // Activate leader related service for config pipe + PipeConfigNodeAgent.runtime().notifyLeaderReady(); + + // we do cq recovery async for performance: + // cq recovery may be time-consuming, we use another thread to do it in + // make notifyLeaderChanged not blocked by it + submitIfLeaderServicesEpochCurrent( + epoch, () -> configManager.getCQManager().startCQScheduler()); + + submitIfLeaderServicesEpochCurrent( + epoch, + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()); + submitIfLeaderServicesEpochCurrent( + epoch, + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()); + submitIfLeaderServicesEpochCurrent( + epoch, + () -> + configManager + .getPipeManager() + .getPipeRuntimeCoordinator() + .onConfigRegionGroupLeaderChanged()); + + submitIfLeaderServicesEpochCurrent( + epoch, + () -> + configManager + .getSubscriptionManager() + .getSubscriptionCoordinator() + .startSubscriptionMetaSync()); + + // To adapt old version, we check cluster ID after state machine has been fully recovered. + // Do check async because sync will be slow and block every other things. + submitIfLeaderServicesEpochCurrent( + epoch, () -> configManager.getClusterManager().checkClusterId()); + + if (!isCurrentLeaderServicesEpoch(epoch)) { + return; + } + configManager + .getProcedureManager() + .startExecutor( + () -> { + if (isCurrentLeaderServicesEpoch(epoch)) { + leaderServicesReady.set(true); + } + }); + if (!leaderServicesReady.get()) { + return; + } + } + + if (isCurrentLeaderServicesEpoch(epoch)) { LOGGER.info( - ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER - + "skip starting leader services because consensus leader is no longer ready", + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); - return; } + } - // Start leader scheduling services - configManager.getProcedureManager().startExecutor(); - threadPool.submit( - () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()); - configManager.getRetryFailedTasksThread().startRetryFailedTasksService(); - configManager.getPartitionManager().startRegionCleaner(); - // Add Metric after leader ready - configManager.addMetrics(); - - // Activate leader related service for config pipe - PipeConfigNodeAgent.runtime().notifyLeaderReady(); - - // we do cq recovery async for performance: - // cq recovery may be time-consuming, we use another thread to do it in - // make notifyLeaderChanged not blocked by it - threadPool.submit(() -> configManager.getCQManager().startCQScheduler()); - - threadPool.submit( - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()); - threadPool.submit( - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()); - threadPool.submit( - () -> - configManager - .getPipeManager() - .getPipeRuntimeCoordinator() - .onConfigRegionGroupLeaderChanged()); - - threadPool.submit( - () -> - configManager - .getSubscriptionManager() - .getSubscriptionCoordinator() - .startSubscriptionMetaSync()); + public boolean areLeaderServicesReady() { + return leaderServicesReady.get(); + } - // To adapt old version, we check cluster ID after state machine has been fully recovered. - // Do check async because sync will be slow and block every other things. - threadPool.submit(() -> configManager.getClusterManager().checkClusterId()); + private long nextLeaderServicesEpoch() { + leaderServicesReady.set(false); + return leaderServicesEpoch.incrementAndGet(); + } - leaderServicesReady.set(true); + private void invalidateLeaderServices() { + leaderServicesReady.set(false); + leaderServicesEpoch.incrementAndGet(); + } - LOGGER.info( - ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, - ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - currentNodeTEndPoint); + private boolean isCurrentLeaderServicesEpoch(final long epoch) { + return leaderServicesEpoch.get() == epoch + && configManager.getConsensusManager().isLeaderReady(); } - public boolean areLeaderServicesReady() { - return leaderServicesReady.get(); + private void submitIfLeaderServicesEpochCurrent(final long epoch, final Runnable task) { + threadPool.submit( + () -> { + if (isCurrentLeaderServicesEpoch(epoch)) { + task.run(); + } + }); } @Override diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 2db0255e35ac4..3f60b84a74e1c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1247,6 +1247,10 @@ protected TSStatus confirmLeader() { "ConsensusManager of target-ConfigNode is not initialized, " + "please make sure the target-ConfigNode has been started successfully."); } + // Procedure recovery replays metadata writes before external load warm-up is complete. + if (procedureManager.isProcedureExecutionThread()) { + return getConsensusManager().confirmLeaderForInternalProcedure(); + } return getConsensusManager().confirmLeader(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 3f0ba82fa768c..9d0a7cd55f4e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -226,14 +226,21 @@ public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo } public void startExecutor() { + startExecutor(null); + } + + public void startExecutor(final Runnable beforeStartingWorkers) { if (!executor.isRunning()) { executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount()); - executor.startWorkers(); executor.startCompletedCleaner( CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(), CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL()); executor.addInternalProcedure(partitionTableCleaner); store.start(); + if (beforeStartingWorkers != null) { + beforeStartingWorkers.run(); + } + executor.startWorkers(); LOGGER.info(ManagerMessages.PROCEDUREMANAGER_IS_STARTED_SUCCESSFULLY); } } @@ -250,6 +257,10 @@ public void stopExecutor() { } } + public boolean isProcedureExecutionThread() { + return ProcedureExecutor.isProcedureExecutionThread(); + } + @TestOnly public TSStatus createManyDatabases() { this.executor.submitProcedure(new CreateManyDatabasesProcedure()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index e85f9692d0621..01957c0bcfb95 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -426,6 +426,14 @@ public boolean isLeaderExist() { * NEED_REDIRECTION otherwise */ public TSStatus confirmLeader() { + return confirmLeader(true); + } + + public TSStatus confirmLeaderForInternalProcedure() { + return confirmLeader(false); + } + + private TSStatus confirmLeader(final boolean checkLoadReady) { if (!isLeader()) { TSStatus result = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); result.setMessage( @@ -460,7 +468,7 @@ public TSStatus confirmLeader() { return getLeaderWarmingUpStatus( "The current ConfigNode is leader but leader services are not ready yet."); } - if (!configManager.getLoadManager().isLoadReady()) { + if (checkLoadReady && !configManager.getLoadManager().isLoadReady()) { return getLeaderWarmingUpStatus(configManager.getLoadManager().getLoadReadyReason()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 940804948aeea..f49d32ad364ed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -385,7 +385,8 @@ public void cacheConsensusGroupHeartbeatSample( if (cacheLeader && isLeader) { cacheConsensusSample( regionGroupId, new ConsensusGroupHeartbeatSample(logicalTimestamp, nodeId)); - } else if (isConsensusGroupHeartbeatFullySampled(regionGroupId) + } else if (cacheLeader + && isConsensusGroupHeartbeatFullySampled(regionGroupId) && !Optional.ofNullable(consensusGroupCacheMap.get(regionGroupId)) .map(AbstractLoadCache::hasHeartbeatSample) .orElse(false)) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 11bb7f382d41e..82afea3859fd4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -56,6 +56,8 @@ public class ProcedureExecutor { private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); + private static final ThreadLocal PROCEDURE_EXECUTION_CONTEXT = + ThreadLocal.withInitial(() -> false); private final ConcurrentHashMap> completed = new ConcurrentHashMap<>(); @@ -96,6 +98,10 @@ public ProcedureExecutor(final Env environment, final IProcedureStore store this(environment, store, new SimpleProcedureScheduler()); } + public static boolean isProcedureExecutionThread() { + return PROCEDURE_EXECUTION_CONTEXT.get(); + } + public void init(int numThreads) { this.corePoolSize = numThreads; this.maxPoolSize = 10 * numThreads; @@ -784,7 +790,12 @@ public void run() { this.activeProcedure.set(procedure); activeExecutorCount.incrementAndGet(); startTime.set(System.currentTimeMillis()); - executeProcedure(procedure); + PROCEDURE_EXECUTION_CONTEXT.set(true); + try { + executeProcedure(procedure); + } finally { + PROCEDURE_EXECUTION_CONTEXT.remove(); + } activeExecutorCount.decrementAndGet(); LOG.trace( "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 037f138286a18..da766541786cb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -89,7 +89,7 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); - private static final int STARTUP_RETRY_NUM = 10; + private static final int STARTUP_RETRY_NUM = 20; private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3); private static final int SCHEDULE_WAITING_RETRY_NUM = (int) (COMMON_CONFIG.getCnConnectionTimeoutInMS() / STARTUP_RETRY_INTERVAL_IN_MS); @@ -414,6 +414,12 @@ private void sendRegisterConfigNodeRequest() throws StartupException, IOExceptio } else if (status.getCode() == TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) { LOGGER.warn( ConfigNodeMessages.THE_RESULT_OF_REGISTER_SELF_CONFIGNODE_IS_RETRY, status, retry); + } else if (status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) { + LOGGER.info( + "ConfigNode leader is warming up before serving the registering ConfigNode, will wait" + + " and retry. Status: {}, retry: {}", + status, + retry); } else { throw new StartupException(status.getMessage()); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java index e2c51df9693be..9f355e8ada5df 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java @@ -342,6 +342,24 @@ public void testRegionAndConsensusGroupsDoNotBlockLoadWarmUp() { Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty()); } + @Test + public void testDisabledConsensusCachingDoesNotBlockLaterLogicalSample() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 102); + int dataNodeId = 41; + + loadCache.createRegionGroupHeartbeatCache( + "root.warmup", regionGroupId, Collections.singleton(dataNodeId)); + loadCache.cacheConsensusGroupHeartbeatSample( + regionGroupId, dataNodeId, false, System.nanoTime(), false); + loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, dataNodeId, true, 1, true); + loadCache.updateConsensusGroupStatistics(); + + Assert.assertEquals( + dataNodeId, + loadCache.getCurrentConsensusGroupStatisticsMap().get(regionGroupId).getLeaderId()); + } + @Test public void testLoadWarmUpToleratesMissingFirstHeartbeatAfterThirtySeconds() { LOAD_CACHE.clearHeartbeatCache(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 545ee026871e6..de69996e8af64 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -222,6 +222,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie "Failed to connect to ConfigNode %s from DataNode %s when executing %s, Exception:"; private static final long RETRY_INTERVAL_MS = 1000L; private static final long WAIT_CN_LEADER_ELECTION_INTERVAL_MS = 2000L; + private static final long REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS = 60_000L; private static final String UNSUPPORTED_INVOCATION = "This method is not supported for invocation by DataNode"; @@ -424,6 +425,10 @@ private boolean updateConfigNodeLeader(TSStatus status) { } } + private boolean isConfigNodeLeaderWarmingUp(TSStatus status) { + return status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode(); + } + /** * The frame of execute RPC, include logic of retry and exception handling. * @@ -495,20 +500,33 @@ public TGetClusterIdResp getClusterId() throws TException { @Override public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException { - for (int i = 0; i < RETRY_NUM; i++) { + boolean leaderWarmingUpObserved = false; + long leaderWarmingUpRetryDeadline = 0; + for (int i = 0; + i < RETRY_NUM + || (leaderWarmingUpObserved + && System.currentTimeMillis() < leaderWarmingUpRetryDeadline); + i++) { try { TDataNodeRegisterResp resp = client.registerDataNode(req); if (!updateConfigNodeLeader(resp.status)) { return resp; } + if (isConfigNodeLeaderWarmingUp(resp.status) && !leaderWarmingUpObserved) { + leaderWarmingUpObserved = true; + leaderWarmingUpRetryDeadline = + System.currentTimeMillis() + REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS; + } // set latest config node list - List newConfigNodes = new ArrayList<>(); - for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) { - newConfigNodes.add(configNodeLocation.getInternalEndPoint()); + if (resp.isSetConfigNodeList()) { + List newConfigNodes = new ArrayList<>(); + for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) { + newConfigNodes.add(configNodeLocation.getInternalEndPoint()); + } + configNodes = newConfigNodes; } - configNodes = newConfigNodes; } catch (TException e) { String message = String.format( From c0840dac7fd2e4b3e7a21f0f84b2054e4dd16f37 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 9 Jun 2026 10:02:10 +0800 Subject: [PATCH 4/6] Simplify consensus leader warm-up sampling --- .../heartbeat/DataNodeHeartbeatHandler.java | 23 +++++----- .../manager/load/cache/LoadCache.java | 46 ------------------- .../manager/load/LoadManagerTest.java | 18 -------- 3 files changed, 11 insertions(+), 76 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 3d5da9600a737..0e03de61da120 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -27,6 +27,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.load.LoadManager; +import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator; @@ -123,18 +124,16 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { boolean hasConsensusLogicalTimestamp = heartbeatResp.isSetConsensusLogicalTimeMap() && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId); - long logicalTimestamp = - hasConsensusLogicalTimestamp - ? heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId) - : heartbeatResp.getHeartbeatTimestamp(); - loadManager - .getLoadCache() - .cacheConsensusGroupHeartbeatSample( - regionGroupId, - nodeId, - Boolean.TRUE.equals(isLeader), - logicalTimestamp, - shouldCacheConsensusSample && hasConsensusLogicalTimestamp); + if (shouldCacheConsensusSample + && hasConsensusLogicalTimestamp + && Boolean.TRUE.equals(isLeader)) { + loadManager + .getLoadCache() + .cacheConsensusSample( + regionGroupId, + new ConsensusGroupHeartbeatSample( + heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); + } }); loadManager .getLoadCache() diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index f49d32ad364ed..e53760bcec49a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -99,8 +99,6 @@ public class LoadCache { private final Map> regionRawSizeMap; // Map private final Map consensusGroupCacheMap; - // Map> - private final Map> consensusGroupHeartbeatSampledNodeMap; // Map private final Map> confirmedConfigNodeMap; private Map> topologyGraph; @@ -113,7 +111,6 @@ public LoadCache() { this.regionSizeMap = new ConcurrentHashMap<>(); this.regionRawSizeMap = new ConcurrentHashMap<>(); this.consensusGroupCacheMap = new ConcurrentHashMap<>(); - this.consensusGroupHeartbeatSampledNodeMap = new ConcurrentHashMap<>(); this.confirmedConfigNodeMap = new ConcurrentHashMap<>(); this.topologyGraph = new HashMap<>(); this.topologyUpdated = new AtomicBoolean(false); @@ -179,7 +176,6 @@ private void initRegionGroupHeartbeatCache( Map> regionReplicaMap) { regionGroupCacheMap.clear(); consensusGroupCacheMap.clear(); - consensusGroupHeartbeatSampledNodeMap.clear(); regionReplicaMap.forEach( (database, regionReplicaSets) -> regionReplicaSets.forEach( @@ -197,8 +193,6 @@ private void initRegionGroupHeartbeatCache( .collect(Collectors.toSet()), isStrongConsistency)); consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); - consensusGroupHeartbeatSampledNodeMap.put( - regionGroupId, ConcurrentHashMap.newKeySet()); })); } @@ -207,7 +201,6 @@ public void clearHeartbeatCache() { nodeCacheMap.clear(); regionGroupCacheMap.clear(); consensusGroupCacheMap.clear(); - consensusGroupHeartbeatSampledNodeMap.clear(); } /** @@ -310,7 +303,6 @@ public void createRegionGroupHeartbeatCache( regionGroupId, new RegionGroupCache(database, regionGroupId, dataNodeIds, isStrongConsistency)); consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache()); - consensusGroupHeartbeatSampledNodeMap.put(regionGroupId, ConcurrentHashMap.newKeySet()); } /** @@ -373,41 +365,6 @@ public void cacheConsensusSample( .ifPresent(group -> group.cacheHeartbeatSample(sample)); } - public void cacheConsensusGroupHeartbeatSample( - TConsensusGroupId regionGroupId, - int nodeId, - boolean isLeader, - long logicalTimestamp, - boolean cacheLeader) { - consensusGroupHeartbeatSampledNodeMap - .computeIfAbsent(regionGroupId, empty -> ConcurrentHashMap.newKeySet()) - .add(nodeId); - if (cacheLeader && isLeader) { - cacheConsensusSample( - regionGroupId, new ConsensusGroupHeartbeatSample(logicalTimestamp, nodeId)); - } else if (cacheLeader - && isConsensusGroupHeartbeatFullySampled(regionGroupId) - && !Optional.ofNullable(consensusGroupCacheMap.get(regionGroupId)) - .map(AbstractLoadCache::hasHeartbeatSample) - .orElse(false)) { - cacheConsensusSample( - regionGroupId, - new ConsensusGroupHeartbeatSample( - logicalTimestamp, ConsensusGroupCache.UN_READY_LEADER_ID)); - } - } - - private boolean isConsensusGroupHeartbeatFullySampled(TConsensusGroupId regionGroupId) { - return Optional.ofNullable(regionGroupCacheMap.get(regionGroupId)) - .map(RegionGroupCache::getRegionLocations) - .map( - regionLocations -> - consensusGroupHeartbeatSampledNodeMap - .getOrDefault(regionGroupId, Collections.emptySet()) - .containsAll(regionLocations)) - .orElse(false); - } - public void cacheDataNodeHeartbeatFailureSample(int nodeId, long sampleTimestamp) { cacheUnreportedDataNodeRegionHeartbeatSamples(nodeId, Collections.emptySet(), sampleTimestamp); } @@ -425,8 +382,6 @@ public void cacheUnreportedDataNodeRegionHeartbeatSamples( nodeId, new RegionHeartbeatSample(sampleTimestamp, RegionStatus.Unknown), false); - cacheConsensusGroupHeartbeatSample( - regionGroupId, nodeId, false, sampleTimestamp, false); }); } @@ -823,7 +778,6 @@ public int countRegionWithSpecifiedStatus(TConsensusGroupType type, RegionStatus public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { regionGroupCacheMap.remove(consensusGroupId); consensusGroupCacheMap.remove(consensusGroupId); - consensusGroupHeartbeatSampledNodeMap.remove(consensusGroupId); } /** diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java index 9f355e8ada5df..e2c51df9693be 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java @@ -342,24 +342,6 @@ public void testRegionAndConsensusGroupsDoNotBlockLoadWarmUp() { Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty()); } - @Test - public void testDisabledConsensusCachingDoesNotBlockLaterLogicalSample() { - LoadCache loadCache = new LoadCache(); - TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 102); - int dataNodeId = 41; - - loadCache.createRegionGroupHeartbeatCache( - "root.warmup", regionGroupId, Collections.singleton(dataNodeId)); - loadCache.cacheConsensusGroupHeartbeatSample( - regionGroupId, dataNodeId, false, System.nanoTime(), false); - loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, dataNodeId, true, 1, true); - loadCache.updateConsensusGroupStatistics(); - - Assert.assertEquals( - dataNodeId, - loadCache.getCurrentConsensusGroupStatisticsMap().get(regionGroupId).getLeaderId()); - } - @Test public void testLoadWarmUpToleratesMissingFirstHeartbeatAfterThirtySeconds() { LOAD_CACHE.clearHeartbeatCache(); From de7eac98a5b833098c19a82749fe179990513c99 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 9 Jun 2026 10:51:47 +0800 Subject: [PATCH 5/6] Refine ConfigNode leader warm-up flow --- .../heartbeat/AINodeHeartbeatHandler.java | 2 +- .../heartbeat/ConfigNodeHeartbeatHandler.java | 7 +- .../heartbeat/DataNodeHeartbeatHandler.java | 17 +++-- .../ConfigRegionStateMachine.java | 45 ++++++++++--- .../manager/load/cache/LoadCache.java | 65 +++++++------------ .../load/service/HeartbeatService.java | 8 +++ 6 files changed, 86 insertions(+), 58 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java index 9d8e0b6e8474f..03e6c0bfe3111 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java @@ -50,7 +50,7 @@ public void onComplete(TAIHeartbeatResp aiHeartbeatResp) { public void onError(Exception e) { if (ThriftClient.isConnectionBroken(e)) { loadManager.forceUpdateNodeCache( - NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + NodeType.AINode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); } loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java index 97ef4c56f2993..96225e4fb7bdb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/ConfigNodeHeartbeatHandler.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; +import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.confignode.manager.load.LoadManager; @@ -46,8 +47,10 @@ public void onComplete(TConfigNodeHeartbeatResp resp) { @Override public void onError(Exception e) { - loadManager.forceUpdateNodeCache( - NodeType.ConfigNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + if (ThriftClient.isConnectionBroken(e)) { + loadManager.forceUpdateNodeCache( + NodeType.ConfigNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + } loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 0e03de61da120..97fc92b095aa2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.cluster.NodeType; import org.apache.iotdb.commons.cluster.RegionStatus; @@ -38,6 +39,7 @@ import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.function.Consumer; public class DataNodeHeartbeatHandler implements AsyncMethodCallback { @@ -51,6 +53,7 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback regionGroupIdsOnDataNode; private final Map deviceNum; private final Map timeSeriesNum; @@ -64,6 +67,7 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback regionGroupIdsOnDataNode, Map deviceNum, Map timeSeriesNum, Map regionDisk, @@ -73,6 +77,7 @@ public DataNodeHeartbeatHandler( this.nodeId = nodeId; this.loadManager = loadManager; + this.regionGroupIdsOnDataNode = regionGroupIdsOnDataNode; this.deviceNum = deviceNum; this.timeSeriesNum = timeSeriesNum; this.regionDisk = regionDisk; @@ -138,7 +143,10 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { loadManager .getLoadCache() .cacheUnreportedDataNodeRegionHeartbeatSamples( - nodeId, judgedLeaders.keySet(), heartbeatResp.getHeartbeatTimestamp()); + nodeId, + regionGroupIdsOnDataNode, + judgedLeaders.keySet(), + heartbeatResp.getHeartbeatTimestamp()); if (heartbeatResp.getRegionDeviceUsageMap() != null) { deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap()); @@ -177,9 +185,10 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { @Override public void onError(Exception e) { - loadManager.forceUpdateNodeCache( - NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); - loadManager.getLoadCache().cacheDataNodeHeartbeatFailureSample(nodeId, System.nanoTime()); + if (ThriftClient.isConnectionBroken(e)) { + loadManager.forceUpdateNodeCache( + NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + } loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 1c758bf404658..397eb8c908e68 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -77,6 +77,9 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev private static final ExecutorService threadPool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.CONFIG_NODE_RECOVER.getName()); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final long WAIT_LOAD_READY_TIMEOUT_MS = + CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS() / 2; + private static final long WAIT_LOAD_READY_INTERVAL_MS = 100; private final ConfigPlanExecutor executor; private final AtomicBoolean leaderServicesReady; private final AtomicLong leaderServicesEpoch; @@ -311,17 +314,10 @@ public void notifyLeaderReady() { configManager.getLoadManager().startTopologyService(); } - threadPool.submit(() -> startLeaderServicesAfterLoadReady(epoch)); + threadPool.submit(() -> startLeaderServicesAndWaitLoadReady(epoch)); } - private void startLeaderServicesAfterLoadReady(final long epoch) { - if (!configManager.getLoadManager().isLoadReady()) { - LOGGER.info( - "Current ConfigNode(nodeId: {}, ip: {}) starts leader services while load warm-up is" - + " still in progress.", - ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - currentNodeTEndPoint); - } + private void startLeaderServicesAndWaitLoadReady(final long epoch) { synchronized (leaderServicesLock) { if (!isCurrentLeaderServicesEpoch(epoch)) { LOGGER.info( @@ -392,6 +388,19 @@ private void startLeaderServicesAfterLoadReady(final long epoch) { } } + boolean loadReady = waitForLoadReady(epoch); + if (!isCurrentLeaderServicesEpoch(epoch)) { + return; + } + if (!loadReady) { + LOGGER.info( + "Current ConfigNode(nodeId: {}, ip: {}) finished starting leader services while load" + + " warm-up is still in progress: {}", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint, + configManager.getLoadManager().getLoadReadyReason()); + } + if (isCurrentLeaderServicesEpoch(epoch)) { LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, @@ -400,6 +409,24 @@ private void startLeaderServicesAfterLoadReady(final long epoch) { } } + private boolean waitForLoadReady(final long epoch) { + long startTime = System.currentTimeMillis(); + while (isCurrentLeaderServicesEpoch(epoch) + && System.currentTimeMillis() - startTime < WAIT_LOAD_READY_TIMEOUT_MS) { + if (configManager.getLoadManager().isLoadReady()) { + return true; + } + try { + TimeUnit.MILLISECONDS.sleep(WAIT_LOAD_READY_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Unexpected interruption while waiting for ConfigNode leader load warm-up.", e); + return false; + } + } + return isCurrentLeaderServicesEpoch(epoch) && configManager.getLoadManager().isLoadReady(); + } + public boolean areLeaderServicesReady() { return leaderServicesReady.get(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index e53760bcec49a..c58e02559483d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -365,31 +365,22 @@ public void cacheConsensusSample( .ifPresent(group -> group.cacheHeartbeatSample(sample)); } - public void cacheDataNodeHeartbeatFailureSample(int nodeId, long sampleTimestamp) { - cacheUnreportedDataNodeRegionHeartbeatSamples(nodeId, Collections.emptySet(), sampleTimestamp); - } - public void cacheUnreportedDataNodeRegionHeartbeatSamples( - int nodeId, Set reportedRegionGroupIds, long sampleTimestamp) { - getRegionGroupIdsByDataNodeId(nodeId) - .forEach( - regionGroupId -> { - if (reportedRegionGroupIds.contains(regionGroupId)) { - return; - } - cacheRegionHeartbeatSample( - regionGroupId, - nodeId, - new RegionHeartbeatSample(sampleTimestamp, RegionStatus.Unknown), - false); - }); - } - - private List getRegionGroupIdsByDataNodeId(int nodeId) { - return regionGroupCacheMap.entrySet().stream() - .filter(entry -> entry.getValue().getRegionLocations().contains(nodeId)) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); + int nodeId, + Set regionGroupIdsOnDataNode, + Set reportedRegionGroupIds, + long sampleTimestamp) { + regionGroupIdsOnDataNode.forEach( + regionGroupId -> { + if (reportedRegionGroupIds.contains(regionGroupId)) { + return; + } + cacheRegionHeartbeatSample( + regionGroupId, + nodeId, + new RegionHeartbeatSample(sampleTimestamp, RegionStatus.Unknown), + false); + }); } /** Update the NodeStatistics of all Nodes. */ @@ -478,10 +469,6 @@ public Map> getCurrentRegionLocationMap( return regionGroupIdsMap; } - public List getAllRegionGroupIds() { - return new ArrayList<>(regionGroupCacheMap.keySet()); - } - /** * Get the RegionGroupStatistics of all RegionGroups. * @@ -541,23 +528,17 @@ public List getNodeHeartbeatUnreadyReasons() { unreadyNodes.add(nodeId); } }); - List unreadyReasons = new ArrayList<>(); - Collections.sort(unreadyNodes); - addUnreadyReason(unreadyReasons, "nodes", unreadyNodes); - return unreadyReasons; - } - - private void addUnreadyReason(List reasons, String entityName, List unreadyEntities) { - if (unreadyEntities.isEmpty()) { - return; + if (unreadyNodes.isEmpty()) { + return Collections.emptyList(); } - List entitiesToPrint = - unreadyEntities.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT, unreadyEntities.size())); + Collections.sort(unreadyNodes); + List nodesToPrint = + unreadyNodes.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT, unreadyNodes.size())); String suffix = - unreadyEntities.size() > MAX_UNREADY_ENTITY_PRINT - ? "...(" + (unreadyEntities.size() - MAX_UNREADY_ENTITY_PRINT) + " more)" + unreadyNodes.size() > MAX_UNREADY_ENTITY_PRINT + ? "...(" + (unreadyNodes.size() - MAX_UNREADY_ENTITY_PRINT) + " more)" : ""; - reasons.add(entityName + "=" + entitiesToPrint + suffix); + return Collections.singletonList("nodes=" + nodesToPrint + suffix); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index a491b3960c3c3..cc5a2e2303db2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -22,6 +22,7 @@ import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq; import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; @@ -261,6 +262,7 @@ private void pingRegisteredDataNodes( new DataNodeHeartbeatHandler( dataNodeId, configManager.getLoadManager(), + getRegionGroupIds(dataNodeId), configManager.getClusterQuotaManager().getDeviceNum(), configManager.getClusterQuotaManager().getTimeSeriesNum(), configManager.getClusterQuotaManager().getRegionDisk(), @@ -275,6 +277,12 @@ private void pingRegisteredDataNodes( } } + private Set getRegionGroupIds(int dataNodeId) { + return configManager.getPartitionManager().getAllReplicaSets(dataNodeId).stream() + .map(replicaSet -> replicaSet.getRegionId()) + .collect(Collectors.toSet()); + } + /** * Send heartbeat requests to all the Registered AINodes. * From f54b4847f090444d3002e12f3c1d6ad70c74955d Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 9 Jun 2026 12:37:35 +0800 Subject: [PATCH 6/6] Clean up leader warm-up heartbeat flow --- .../heartbeat/DataNodeHeartbeatHandler.java | 119 +++++++++++------- .../ConfigRegionStateMachine.java | 95 +++++++------- .../manager/consensus/ConsensusManager.java | 37 +++--- .../manager/load/cache/LoadCache.java | 18 --- .../load/service/HeartbeatService.java | 8 -- 5 files changed, 140 insertions(+), 137 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 97fc92b095aa2..52053cdab4564 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -39,7 +39,6 @@ import java.util.Collections; import java.util.Map; -import java.util.Set; import java.util.function.Consumer; public class DataNodeHeartbeatHandler implements AsyncMethodCallback { @@ -53,7 +52,6 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback regionGroupIdsOnDataNode; private final Map deviceNum; private final Map timeSeriesNum; @@ -67,7 +65,6 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback regionGroupIdsOnDataNode, Map deviceNum, Map timeSeriesNum, Map regionDisk, @@ -77,7 +74,6 @@ public DataNodeHeartbeatHandler( this.nodeId = nodeId; this.loadManager = loadManager; - this.regionGroupIdsOnDataNode = regionGroupIdsOnDataNode; this.deviceNum = deviceNum; this.timeSeriesNum = timeSeriesNum; this.regionDisk = regionDisk; @@ -88,11 +84,21 @@ public DataNodeHeartbeatHandler( @Override public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { - // Update NodeCache + cacheNodeHeartbeatSample(heartbeatResp); + cacheRegionGroupHeartbeatSamples(heartbeatResp); + cacheUsageSamples(heartbeatResp); + cachePipeHeartbeat(heartbeatResp); + cacheConfirmedConfigNodeEndPoints(heartbeatResp); + cacheRegionSizeSamples(heartbeatResp); + } + + private void cacheNodeHeartbeatSample(TDataNodeHeartbeatResp heartbeatResp) { loadManager .getLoadCache() .cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp)); + } + private void cacheRegionGroupHeartbeatSamples(TDataNodeHeartbeatResp heartbeatResp) { RegionStatus regionStatus = RegionStatus.valueOf(heartbeatResp.getStatus()); Map judgedLeaders = @@ -101,53 +107,63 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { : Collections.emptyMap(); judgedLeaders.forEach( (regionGroupId, isLeader) -> { - - // Do not allow regions to inherit the Removing state from datanode - RegionStatus nextRegionStatus = regionStatus; - if (nextRegionStatus == RegionStatus.Removing) { - nextRegionStatus = - loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId); - } - - // Update RegionGroupCache - loadManager - .getLoadCache() - .cacheRegionHeartbeatSample( - regionGroupId, - nodeId, - new RegionHeartbeatSample( - heartbeatResp.getHeartbeatTimestamp(), - // Region will inherit DataNode's status - nextRegionStatus), - false); - - boolean shouldCacheConsensusSample = - (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) - && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) - || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE); - boolean hasConsensusLogicalTimestamp = - heartbeatResp.isSetConsensusLogicalTimeMap() - && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId); - if (shouldCacheConsensusSample - && hasConsensusLogicalTimestamp - && Boolean.TRUE.equals(isLeader)) { - loadManager - .getLoadCache() - .cacheConsensusSample( - regionGroupId, - new ConsensusGroupHeartbeatSample( - heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); - } + cacheRegionHeartbeatSample(heartbeatResp, regionStatus, regionGroupId); + cacheConsensusSampleIfNeeded(heartbeatResp, regionGroupId, isLeader); }); + } + + private void cacheRegionHeartbeatSample( + TDataNodeHeartbeatResp heartbeatResp, + RegionStatus dataNodeRegionStatus, + TConsensusGroupId regionGroupId) { loadManager .getLoadCache() - .cacheUnreportedDataNodeRegionHeartbeatSamples( + .cacheRegionHeartbeatSample( + regionGroupId, nodeId, - regionGroupIdsOnDataNode, - judgedLeaders.keySet(), - heartbeatResp.getHeartbeatTimestamp()); + new RegionHeartbeatSample( + heartbeatResp.getHeartbeatTimestamp(), + getRegionHeartbeatStatus(regionGroupId, dataNodeRegionStatus)), + false); + } + private RegionStatus getRegionHeartbeatStatus( + TConsensusGroupId regionGroupId, RegionStatus dataNodeRegionStatus) { + return dataNodeRegionStatus == RegionStatus.Removing + ? loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId) + : dataNodeRegionStatus; + } + + private void cacheConsensusSampleIfNeeded( + TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId, Boolean isLeader) { + if (!Boolean.TRUE.equals(isLeader) + || !shouldCacheConsensusSample(regionGroupId) + || !hasConsensusLogicalTimestamp(heartbeatResp, regionGroupId)) { + return; + } + + loadManager + .getLoadCache() + .cacheConsensusSample( + regionGroupId, + new ConsensusGroupHeartbeatSample( + heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); + } + + private boolean shouldCacheConsensusSample(TConsensusGroupId regionGroupId) { + return (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) + && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) + || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) + && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE); + } + + private boolean hasConsensusLogicalTimestamp( + TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId) { + return heartbeatResp.isSetConsensusLogicalTimeMap() + && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId); + } + + private void cacheUsageSamples(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getRegionDeviceUsageMap() != null) { deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap()); deviceUsageRespProcess.accept(heartbeatResp.getRegionDeviceUsageMap()); @@ -159,6 +175,9 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getRegionDisk() != null) { regionDisk.putAll(heartbeatResp.getRegionDisk()); } + } + + private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getPipeMetaList() != null) { pipeRuntimeCoordinator.parseHeartbeat( nodeId, @@ -167,12 +186,18 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { heartbeatResp.getPipeRemainingEventCountList(), heartbeatResp.getPipeRemainingTimeList()); } + } + + private void cacheConfirmedConfigNodeEndPoints(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) { loadManager .getLoadCache() .updateConfirmedConfigNodeEndPoints( nodeId, heartbeatResp.getConfirmedConfigNodeEndPoints()); } + } + + private void cacheRegionSizeSamples(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.isSetRegionDisk()) { loadManager.getLoadCache().updateRegionSizeMap(nodeId, heartbeatResp.getRegionDisk()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index 397eb8c908e68..e0429ebf46520 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -60,7 +60,6 @@ import java.nio.file.StandardCopyOption; import java.util.Arrays; import java.util.Comparator; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -101,8 +100,8 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev private static final long LOG_FILE_MAX_SIZE = CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax(); private final TEndPoint currentNodeTEndPoint; - private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+"); - private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$"); + private static final Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+"); + private static final Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$"); public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) { this.executor = executor; @@ -126,9 +125,9 @@ public void setConfigManager(ConfigManager configManager) { @Override public TSStatus write(IConsensusRequest request) { - return Optional.ofNullable(request) - .map(o -> write((ConfigPhysicalPlan) request)) - .orElseGet(() -> new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + return request == null + ? new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) + : write((ConfigPhysicalPlan) request); } /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */ @@ -314,10 +313,26 @@ public void notifyLeaderReady() { configManager.getLoadManager().startTopologyService(); } - threadPool.submit(() -> startLeaderServicesAndWaitLoadReady(epoch)); + threadPool.submit(() -> startLeaderServicesAndWaitForLoadReady(epoch)); } - private void startLeaderServicesAndWaitLoadReady(final long epoch) { + private void startLeaderServicesAndWaitForLoadReady(final long epoch) { + if (!startLeaderServices(epoch)) { + return; + } + + boolean loadReady = waitForLoadReady(epoch); + if (!isCurrentLeaderServicesEpoch(epoch)) { + return; + } + logLoadWarmUpIfNeeded(loadReady); + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + } + + private boolean startLeaderServices(final long epoch) { synchronized (leaderServicesLock) { if (!isCurrentLeaderServicesEpoch(epoch)) { LOGGER.info( @@ -325,7 +340,7 @@ private void startLeaderServicesAndWaitLoadReady(final long epoch) { + "skip starting leader services because the leader epoch is stale", ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); - return; + return false; } // Start leader scheduling services @@ -373,25 +388,23 @@ private void startLeaderServicesAndWaitLoadReady(final long epoch) { epoch, () -> configManager.getClusterManager().checkClusterId()); if (!isCurrentLeaderServicesEpoch(epoch)) { - return; + return false; } configManager .getProcedureManager() - .startExecutor( - () -> { - if (isCurrentLeaderServicesEpoch(epoch)) { - leaderServicesReady.set(true); - } - }); - if (!leaderServicesReady.get()) { - return; - } + .startExecutor(() -> markLeaderServicesReadyIfEpochCurrent(epoch)); + markLeaderServicesReadyIfEpochCurrent(epoch); + return leaderServicesReady.get(); } + } - boolean loadReady = waitForLoadReady(epoch); - if (!isCurrentLeaderServicesEpoch(epoch)) { - return; + private void markLeaderServicesReadyIfEpochCurrent(final long epoch) { + if (isCurrentLeaderServicesEpoch(epoch)) { + leaderServicesReady.set(true); } + } + + private void logLoadWarmUpIfNeeded(final boolean loadReady) { if (!loadReady) { LOGGER.info( "Current ConfigNode(nodeId: {}, ip: {}) finished starting leader services while load" @@ -400,13 +413,6 @@ private void startLeaderServicesAndWaitLoadReady(final long epoch) { currentNodeTEndPoint, configManager.getLoadManager().getLoadReadyReason()); } - - if (isCurrentLeaderServicesEpoch(epoch)) { - LOGGER.info( - ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, - ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - currentNodeTEndPoint); - } } private boolean waitForLoadReady(final long epoch) { @@ -416,17 +422,24 @@ private boolean waitForLoadReady(final long epoch) { if (configManager.getLoadManager().isLoadReady()) { return true; } - try { - TimeUnit.MILLISECONDS.sleep(WAIT_LOAD_READY_INTERVAL_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn("Unexpected interruption while waiting for ConfigNode leader load warm-up.", e); + if (!sleepForLoadReady()) { return false; } } return isCurrentLeaderServicesEpoch(epoch) && configManager.getLoadManager().isLoadReady(); } + private boolean sleepForLoadReady() { + try { + TimeUnit.MILLISECONDS.sleep(WAIT_LOAD_READY_INTERVAL_MS); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Unexpected interruption while waiting for ConfigNode leader load warm-up.", e); + return false; + } + } + public boolean areLeaderServicesReady() { return leaderServicesReady.get(); } @@ -528,7 +541,7 @@ private void initStandAloneConfigNode() { dir.mkdirs(); String[] list = new File(CURRENT_FILE_DIR).list(); if (list != null && list.length != 0) { - Arrays.sort(list, new FileComparator()); + Arrays.sort(list, Comparator.comparingLong(ConfigRegionStateMachine::parseEndIndex)); for (String logFileName : list) { File logFile = SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + logFileName); @@ -612,17 +625,7 @@ private void createLogFile(int startIndex) { } } - static class FileComparator implements Comparator { - - @Override - public int compare(String filename1, String filename2) { - long id1 = parseEndIndex(filename1); - long id2 = parseEndIndex(filename2); - return Long.compare(id1, id2); - } - } - - static long parseEndIndex(String filename) { + private static long parseEndIndex(String filename) { if (filename.startsWith("log_inprogress_")) { Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename); if (matcher.find()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 01957c0bcfb95..86fd9d499269f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -429,10 +429,6 @@ public TSStatus confirmLeader() { return confirmLeader(true); } - public TSStatus confirmLeaderForInternalProcedure() { - return confirmLeader(false); - } - private TSStatus confirmLeader(final boolean checkLoadReady) { if (!isLeader()) { TSStatus result = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); @@ -445,20 +441,7 @@ private TSStatus confirmLeader(final boolean checkLoadReady) { return result; } - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < MAX_WAIT_READY_TIME_MS) { - if (isLeaderReady()) { - break; - } - try { - Thread.sleep(RETRY_WAIT_TIME_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn( - ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY); - break; - } - } + waitForLeaderReady(); if (!isLeaderReady()) { return getLeaderWarmingUpStatus( @@ -475,6 +458,24 @@ private TSStatus confirmLeader(final boolean checkLoadReady) { return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } + public TSStatus confirmLeaderForInternalProcedure() { + return confirmLeader(false); + } + + private void waitForLeaderReady() { + long startTime = System.currentTimeMillis(); + while (!isLeaderReady() && System.currentTimeMillis() - startTime < MAX_WAIT_READY_TIME_MS) { + try { + Thread.sleep(RETRY_WAIT_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn( + ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY); + return; + } + } + } + private TSStatus getLeaderWarmingUpStatus(String message) { return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) .setMessage(message); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index c58e02559483d..3416246811edc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -365,24 +365,6 @@ public void cacheConsensusSample( .ifPresent(group -> group.cacheHeartbeatSample(sample)); } - public void cacheUnreportedDataNodeRegionHeartbeatSamples( - int nodeId, - Set regionGroupIdsOnDataNode, - Set reportedRegionGroupIds, - long sampleTimestamp) { - regionGroupIdsOnDataNode.forEach( - regionGroupId -> { - if (reportedRegionGroupIds.contains(regionGroupId)) { - return; - } - cacheRegionHeartbeatSample( - regionGroupId, - nodeId, - new RegionHeartbeatSample(sampleTimestamp, RegionStatus.Unknown), - false); - }); - } - /** Update the NodeStatistics of all Nodes. */ public void updateNodeStatistics(boolean forceUpdate) { nodeCacheMap diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index cc5a2e2303db2..a491b3960c3c3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -22,7 +22,6 @@ import org.apache.iotdb.ainode.rpc.thrift.TAIHeartbeatReq; import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; @@ -262,7 +261,6 @@ private void pingRegisteredDataNodes( new DataNodeHeartbeatHandler( dataNodeId, configManager.getLoadManager(), - getRegionGroupIds(dataNodeId), configManager.getClusterQuotaManager().getDeviceNum(), configManager.getClusterQuotaManager().getTimeSeriesNum(), configManager.getClusterQuotaManager().getRegionDisk(), @@ -277,12 +275,6 @@ private void pingRegisteredDataNodes( } } - private Set getRegionGroupIds(int dataNodeId) { - return configManager.getPartitionManager().getAllReplicaSets(dataNodeId).stream() - .map(replicaSet -> replicaSet.getRegionId()) - .collect(Collectors.toSet()); - } - /** * Send heartbeat requests to all the Registered AINodes. *