diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 035c648132f1a..dad3ef44e23d1 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -209,6 +209,7 @@ public enum TSStatusCode { CAN_NOT_CONNECT_AINODE(1011), NO_AVAILABLE_REPLICA(1012), NO_AVAILABLE_AINODE(1013), + CONFIG_NODE_LEADER_WARMING_UP(1014), // Sync, Load TsFile LOAD_FILE_ERROR(1100), diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py b/iotdb-core/ainode/iotdb/ainode/core/constant.py index 4a2ee543d1f8d..1eb79085bcfa0 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/constant.py +++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py @@ -80,6 +80,7 @@ class TSStatusCode(Enum): SUCCESS_STATUS = 200 REDIRECTION_RECOMMEND = 400 + CONFIG_NODE_LEADER_WARMING_UP = 1014 MODEL_EXISTED_ERROR = 1503 MODEL_NOT_EXIST_ERROR = 1504 CREATE_MODEL_ERROR = 1505 diff --git a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py index ea6362ef080af..81bb81d50bb34 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py +++ b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py @@ -66,6 +66,7 @@ def __init__(self, config_leader: TEndPoint): "Fail to connect to any config node. Please check status of ConfigNodes" ) self._RETRY_NUM = 10 + self._STARTUP_RETRY_NUM = 60 self._RETRY_INTERVAL_IN_S = 1 self._try_to_connect() @@ -163,6 +164,12 @@ def _update_config_node_leader(self, status: TSStatus) -> bool: else: self._config_leader = None return True + if status.code == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.get_status_code(): + logger.info( + "ConfigNode leader is warming up before serving AINode, will wait and retry. Reason: {}", + status.message, + ) + return True return False def node_register( @@ -177,7 +184,7 @@ def node_register( versionInfo=version_info, ) - for _ in range(0, self._RETRY_NUM): + for _ in range(0, self._STARTUP_RETRY_NUM): try: resp = self._client.registerAINode(req) if not self._update_config_node_leader(resp.status): @@ -208,7 +215,7 @@ def node_restart( versionInfo=version_info, ) - for _ in range(0, self._RETRY_NUM): + for _ in range(0, self._STARTUP_RETRY_NUM): try: resp = self._client.restartAINode(req) if not self._update_config_node_leader(resp.status): diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java index 9d8e0b6e8474f..03e6c0bfe3111 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/AINodeHeartbeatHandler.java @@ -50,7 +50,7 @@ public void onComplete(TAIHeartbeatResp aiHeartbeatResp) { public void onError(Exception e) { if (ThriftClient.isConnectionBroken(e)) { loadManager.forceUpdateNodeCache( - NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); + NodeType.AINode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown)); } loadManager.getLoadCache().resetHeartbeatProcessing(nodeId); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index e7a31b1dc73eb..52053cdab4564 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.cluster.NodeStatus; @@ -36,6 +37,7 @@ import org.apache.thrift.async.AsyncMethodCallback; +import java.util.Collections; import java.util.Map; import java.util.function.Consumer; @@ -82,54 +84,86 @@ public DataNodeHeartbeatHandler( @Override public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { - // Update NodeCache + cacheNodeHeartbeatSample(heartbeatResp); + cacheRegionGroupHeartbeatSamples(heartbeatResp); + cacheUsageSamples(heartbeatResp); + cachePipeHeartbeat(heartbeatResp); + cacheConfirmedConfigNodeEndPoints(heartbeatResp); + cacheRegionSizeSamples(heartbeatResp); + } + + private void cacheNodeHeartbeatSample(TDataNodeHeartbeatResp heartbeatResp) { loadManager .getLoadCache() .cacheDataNodeHeartbeatSample(nodeId, new NodeHeartbeatSample(heartbeatResp)); + } + private void cacheRegionGroupHeartbeatSamples(TDataNodeHeartbeatResp heartbeatResp) { RegionStatus regionStatus = RegionStatus.valueOf(heartbeatResp.getStatus()); - heartbeatResp - .getJudgedLeaders() - .forEach( - (regionGroupId, isLeader) -> { - - // Do not allow regions to inherit the Removing state from datanode - RegionStatus nextRegionStatus = regionStatus; - if (nextRegionStatus == RegionStatus.Removing) { - nextRegionStatus = - loadManager - .getLoadCache() - .getRegionCacheLastSampleStatus(regionGroupId, nodeId); - } - - // Update RegionGroupCache - loadManager - .getLoadCache() - .cacheRegionHeartbeatSample( - regionGroupId, - nodeId, - new RegionHeartbeatSample( - heartbeatResp.getHeartbeatTimestamp(), - // Region will inherit DataNode's status - nextRegionStatus), - false); - - if (((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) - && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) - || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)) - && Boolean.TRUE.equals(isLeader)) { - // Update ConsensusGroupCache when necessary - loadManager - .getLoadCache() - .cacheConsensusSample( - regionGroupId, - new ConsensusGroupHeartbeatSample( - heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); - } - }); + Map judgedLeaders = + heartbeatResp.isSetJudgedLeaders() + ? heartbeatResp.getJudgedLeaders() + : Collections.emptyMap(); + judgedLeaders.forEach( + (regionGroupId, isLeader) -> { + cacheRegionHeartbeatSample(heartbeatResp, regionStatus, regionGroupId); + cacheConsensusSampleIfNeeded(heartbeatResp, regionGroupId, isLeader); + }); + } + + private void cacheRegionHeartbeatSample( + TDataNodeHeartbeatResp heartbeatResp, + RegionStatus dataNodeRegionStatus, + TConsensusGroupId regionGroupId) { + loadManager + .getLoadCache() + .cacheRegionHeartbeatSample( + regionGroupId, + nodeId, + new RegionHeartbeatSample( + heartbeatResp.getHeartbeatTimestamp(), + getRegionHeartbeatStatus(regionGroupId, dataNodeRegionStatus)), + false); + } + + private RegionStatus getRegionHeartbeatStatus( + TConsensusGroupId regionGroupId, RegionStatus dataNodeRegionStatus) { + return dataNodeRegionStatus == RegionStatus.Removing + ? loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId) + : dataNodeRegionStatus; + } + + private void cacheConsensusSampleIfNeeded( + TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId, Boolean isLeader) { + if (!Boolean.TRUE.equals(isLeader) + || !shouldCacheConsensusSample(regionGroupId) + || !hasConsensusLogicalTimestamp(heartbeatResp, regionGroupId)) { + return; + } + + loadManager + .getLoadCache() + .cacheConsensusSample( + regionGroupId, + new ConsensusGroupHeartbeatSample( + heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId)); + } + + private boolean shouldCacheConsensusSample(TConsensusGroupId regionGroupId) { + return (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType()) + && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE) + || (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) + && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE); + } + + private boolean hasConsensusLogicalTimestamp( + TDataNodeHeartbeatResp heartbeatResp, TConsensusGroupId regionGroupId) { + return heartbeatResp.isSetConsensusLogicalTimeMap() + && heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId); + } + private void cacheUsageSamples(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getRegionDeviceUsageMap() != null) { deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap()); deviceUsageRespProcess.accept(heartbeatResp.getRegionDeviceUsageMap()); @@ -141,6 +175,9 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getRegionDisk() != null) { regionDisk.putAll(heartbeatResp.getRegionDisk()); } + } + + private void cachePipeHeartbeat(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.getPipeMetaList() != null) { pipeRuntimeCoordinator.parseHeartbeat( nodeId, @@ -149,12 +186,18 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) { heartbeatResp.getPipeRemainingEventCountList(), heartbeatResp.getPipeRemainingTimeList()); } + } + + private void cacheConfirmedConfigNodeEndPoints(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) { loadManager .getLoadCache() .updateConfirmedConfigNodeEndPoints( nodeId, heartbeatResp.getConfirmedConfigNodeEndPoints()); } + } + + private void cacheRegionSizeSamples(TDataNodeHeartbeatResp heartbeatResp) { if (heartbeatResp.isSetRegionDisk()) { loadManager.getLoadCache().updateRegionSizeMap(nodeId, heartbeatResp.getRegionDisk()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index fe687d17556f7..e0429ebf46520 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -60,10 +60,11 @@ import java.nio.file.StandardCopyOption; import java.util.Arrays; import java.util.Comparator; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -75,7 +76,13 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev private static final ExecutorService threadPool = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.CONFIG_NODE_RECOVER.getName()); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final long WAIT_LOAD_READY_TIMEOUT_MS = + CommonDescriptor.getInstance().getConfig().getCnConnectionTimeoutInMS() / 2; + private static final long WAIT_LOAD_READY_INTERVAL_MS = 100; private final ConfigPlanExecutor executor; + private final AtomicBoolean leaderServicesReady; + private final AtomicLong leaderServicesEpoch; + private final Object leaderServicesLock; private ConfigManager configManager; /** Variables for {@link ConfigNode} Simple Consensus. */ @@ -93,11 +100,14 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev private static final long LOG_FILE_MAX_SIZE = CONF.getConfigNodeSimpleConsensusLogSegmentSizeMax(); private final TEndPoint currentNodeTEndPoint; - private static Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+"); - private static Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$"); + private static final Pattern LOG_INPROGRESS_PATTERN = Pattern.compile("\\d+"); + private static final Pattern LOG_PATTERN = Pattern.compile("(?<=_)(\\d+)$"); public ConfigRegionStateMachine(ConfigManager configManager, ConfigPlanExecutor executor) { this.executor = executor; + this.leaderServicesReady = new AtomicBoolean(false); + this.leaderServicesEpoch = new AtomicLong(0); + this.leaderServicesLock = new Object(); this.configManager = configManager; this.currentNodeTEndPoint = new TEndPoint() @@ -115,9 +125,9 @@ public void setConfigManager(ConfigManager configManager) { @Override public TSStatus write(IConsensusRequest request) { - return Optional.ofNullable(request) - .map(o -> write((ConfigPhysicalPlan) request)) - .orElseGet(() -> new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + return request == null + ? new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) + : write((ConfigPhysicalPlan) request); } /** Transmit {@link ConfigPhysicalPlan} to {@link ConfigPlanExecutor} */ @@ -235,6 +245,7 @@ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { // couldn't initialize earlier than the ConfigRegionStateMachine int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); if (currentNodeId != newLeaderId) { + invalidateLeaderServices(); LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + "the new leader is [nodeId:{}]", @@ -246,6 +257,7 @@ public void notifyLeaderChanged(ConsensusGroupId groupId, int newLeaderId) { @Override public void notifyNotLeader() { + invalidateLeaderServices(); // We get currentNodeId here because the currentNodeId // couldn't initialize earlier than the ConfigRegionStateMachine int currentNodeId = ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(); @@ -254,25 +266,30 @@ public void notifyNotLeader() { + "start cleaning up related services", currentNodeId, currentNodeTEndPoint); - // Stop leader scheduling services - configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); - configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); - configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync(); - configManager.getLoadManager().stopTopologyService(); - configManager.getLoadManager().stopLoadServices(); - configManager.getProcedureManager().stopExecutor(); - configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); - configManager.getPartitionManager().stopRegionCleaner(); - configManager.getCQManager().stopCQScheduler(); - configManager.getClusterSchemaManager().clearSchemaQuotaCache(); - // Remove Metric after leader change - configManager.removeMetrics(); - - // Shutdown leader related service for config pipe - PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); - - // Clean receiver file dir - PipeConfigNodeAgent.receiver().cleanPipeReceiverDir(); + synchronized (leaderServicesLock) { + // Stop leader scheduling services + configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); + configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); + configManager + .getSubscriptionManager() + .getSubscriptionCoordinator() + .stopSubscriptionMetaSync(); + configManager.getLoadManager().stopTopologyService(); + configManager.getLoadManager().stopLoadServices(); + configManager.getProcedureManager().stopExecutor(); + configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); + configManager.getPartitionManager().stopRegionCleaner(); + configManager.getCQManager().stopCQScheduler(); + configManager.getClusterSchemaManager().clearSchemaQuotaCache(); + // Remove Metric after leader change + configManager.removeMetrics(); + + // Shutdown leader related service for config pipe + PipeConfigNodeAgent.runtime().notifyLeaderUnavailable(); + + // Clean receiver file dir + PipeConfigNodeAgent.receiver().cleanPipeReceiverDir(); + } LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER @@ -283,63 +300,174 @@ public void notifyNotLeader() { @Override public void notifyLeaderReady() { + final long epoch = nextLeaderServicesEpoch(); LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_BECOMES_CONFIG_REGION_LEADER, ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); - // Always start load services first + // Always start load services first. ConsensusManager gates external serving until warm-up. configManager.getLoadManager().startLoadServices(); if (CONF.isEnableTopologyProbing()) { configManager.getLoadManager().startTopologyService(); } - // Start leader scheduling services - configManager.getProcedureManager().startExecutor(); - threadPool.submit( - () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()); - configManager.getRetryFailedTasksThread().startRetryFailedTasksService(); - configManager.getPartitionManager().startRegionCleaner(); - // Add Metric after leader ready - configManager.addMetrics(); - - // Activate leader related service for config pipe - PipeConfigNodeAgent.runtime().notifyLeaderReady(); - - // we do cq recovery async for performance: - // cq recovery may be time-consuming, we use another thread to do it in - // make notifyLeaderChanged not blocked by it - threadPool.submit(() -> configManager.getCQManager().startCQScheduler()); - - threadPool.submit( - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()); - threadPool.submit( - () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()); - threadPool.submit( - () -> - configManager - .getPipeManager() - .getPipeRuntimeCoordinator() - .onConfigRegionGroupLeaderChanged()); - - threadPool.submit( - () -> - configManager - .getSubscriptionManager() - .getSubscriptionCoordinator() - .startSubscriptionMetaSync()); + threadPool.submit(() -> startLeaderServicesAndWaitForLoadReady(epoch)); + } - // To adapt old version, we check cluster ID after state machine has been fully recovered. - // Do check async because sync will be slow and block every other things. - threadPool.submit(() -> configManager.getClusterManager().checkClusterId()); + private void startLeaderServicesAndWaitForLoadReady(final long epoch) { + if (!startLeaderServices(epoch)) { + return; + } + boolean loadReady = waitForLoadReady(epoch); + if (!isCurrentLeaderServicesEpoch(epoch)) { + return; + } + logLoadWarmUpIfNeeded(loadReady); LOGGER.info( ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_AS_CONFIG_REGION_LEADER_IS, ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), currentNodeTEndPoint); } + private boolean startLeaderServices(final long epoch) { + synchronized (leaderServicesLock) { + if (!isCurrentLeaderServicesEpoch(epoch)) { + LOGGER.info( + ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER + + "skip starting leader services because the leader epoch is stale", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint); + return false; + } + + // Start leader scheduling services + submitIfLeaderServicesEpochCurrent( + epoch, () -> configManager.getProcedureManager().getStore().getProcedureInfo().upgrade()); + configManager.getRetryFailedTasksThread().startRetryFailedTasksService(); + configManager.getPartitionManager().startRegionCleaner(); + // Add Metric after leader ready + configManager.addMetrics(); + + // Activate leader related service for config pipe + PipeConfigNodeAgent.runtime().notifyLeaderReady(); + + // we do cq recovery async for performance: + // cq recovery may be time-consuming, we use another thread to do it in + // make notifyLeaderChanged not blocked by it + submitIfLeaderServicesEpochCurrent( + epoch, () -> configManager.getCQManager().startCQScheduler()); + + submitIfLeaderServicesEpochCurrent( + epoch, + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync()); + submitIfLeaderServicesEpochCurrent( + epoch, + () -> configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat()); + submitIfLeaderServicesEpochCurrent( + epoch, + () -> + configManager + .getPipeManager() + .getPipeRuntimeCoordinator() + .onConfigRegionGroupLeaderChanged()); + + submitIfLeaderServicesEpochCurrent( + epoch, + () -> + configManager + .getSubscriptionManager() + .getSubscriptionCoordinator() + .startSubscriptionMetaSync()); + + // To adapt old version, we check cluster ID after state machine has been fully recovered. + // Do check async because sync will be slow and block every other things. + submitIfLeaderServicesEpochCurrent( + epoch, () -> configManager.getClusterManager().checkClusterId()); + + if (!isCurrentLeaderServicesEpoch(epoch)) { + return false; + } + configManager + .getProcedureManager() + .startExecutor(() -> markLeaderServicesReadyIfEpochCurrent(epoch)); + markLeaderServicesReadyIfEpochCurrent(epoch); + return leaderServicesReady.get(); + } + } + + private void markLeaderServicesReadyIfEpochCurrent(final long epoch) { + if (isCurrentLeaderServicesEpoch(epoch)) { + leaderServicesReady.set(true); + } + } + + private void logLoadWarmUpIfNeeded(final boolean loadReady) { + if (!loadReady) { + LOGGER.info( + "Current ConfigNode(nodeId: {}, ip: {}) finished starting leader services while load" + + " warm-up is still in progress: {}", + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), + currentNodeTEndPoint, + configManager.getLoadManager().getLoadReadyReason()); + } + } + + private boolean waitForLoadReady(final long epoch) { + long startTime = System.currentTimeMillis(); + while (isCurrentLeaderServicesEpoch(epoch) + && System.currentTimeMillis() - startTime < WAIT_LOAD_READY_TIMEOUT_MS) { + if (configManager.getLoadManager().isLoadReady()) { + return true; + } + if (!sleepForLoadReady()) { + return false; + } + } + return isCurrentLeaderServicesEpoch(epoch) && configManager.getLoadManager().isLoadReady(); + } + + private boolean sleepForLoadReady() { + try { + TimeUnit.MILLISECONDS.sleep(WAIT_LOAD_READY_INTERVAL_MS); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Unexpected interruption while waiting for ConfigNode leader load warm-up.", e); + return false; + } + } + + public boolean areLeaderServicesReady() { + return leaderServicesReady.get(); + } + + private long nextLeaderServicesEpoch() { + leaderServicesReady.set(false); + return leaderServicesEpoch.incrementAndGet(); + } + + private void invalidateLeaderServices() { + leaderServicesReady.set(false); + leaderServicesEpoch.incrementAndGet(); + } + + private boolean isCurrentLeaderServicesEpoch(final long epoch) { + return leaderServicesEpoch.get() == epoch + && configManager.getConsensusManager().isLeaderReady(); + } + + private void submitIfLeaderServicesEpochCurrent(final long epoch, final Runnable task) { + threadPool.submit( + () -> { + if (isCurrentLeaderServicesEpoch(epoch)) { + task.run(); + } + }); + } + @Override public void start() { if (ConsensusFactory.SIMPLE_CONSENSUS.equals(CONF.getConfigNodeConsensusProtocolClass())) { @@ -413,7 +541,7 @@ private void initStandAloneConfigNode() { dir.mkdirs(); String[] list = new File(CURRENT_FILE_DIR).list(); if (list != null && list.length != 0) { - Arrays.sort(list, new FileComparator()); + Arrays.sort(list, Comparator.comparingLong(ConfigRegionStateMachine::parseEndIndex)); for (String logFileName : list) { File logFile = SystemFileFactory.INSTANCE.getFile(CURRENT_FILE_DIR + File.separator + logFileName); @@ -497,17 +625,7 @@ private void createLogFile(int startIndex) { } } - static class FileComparator implements Comparator { - - @Override - public int compare(String filename1, String filename2) { - long id1 = parseEndIndex(filename1); - long id2 = parseEndIndex(filename2); - return Long.compare(id1, id2); - } - } - - static long parseEndIndex(String filename) { + private static long parseEndIndex(String filename) { if (filename.startsWith("log_inprogress_")) { Matcher matcher = LOG_INPROGRESS_PATTERN.matcher(filename); if (matcher.find()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 2db0255e35ac4..3f60b84a74e1c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1247,6 +1247,10 @@ protected TSStatus confirmLeader() { "ConsensusManager of target-ConfigNode is not initialized, " + "please make sure the target-ConfigNode has been started successfully."); } + // Procedure recovery replays metadata writes before external load warm-up is complete. + if (procedureManager.isProcedureExecutionThread()) { + return getConsensusManager().confirmLeaderForInternalProcedure(); + } return getConsensusManager().confirmLeader(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 3f0ba82fa768c..9d0a7cd55f4e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -226,14 +226,21 @@ public ProcedureManager(ConfigManager configManager, ProcedureInfo procedureInfo } public void startExecutor() { + startExecutor(null); + } + + public void startExecutor(final Runnable beforeStartingWorkers) { if (!executor.isRunning()) { executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount()); - executor.startWorkers(); executor.startCompletedCleaner( CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(), CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL()); executor.addInternalProcedure(partitionTableCleaner); store.start(); + if (beforeStartingWorkers != null) { + beforeStartingWorkers.run(); + } + executor.startWorkers(); LOGGER.info(ManagerMessages.PROCEDUREMANAGER_IS_STARTED_SUCCESSFULLY); } } @@ -250,6 +257,10 @@ public void stopExecutor() { } } + public boolean isProcedureExecutionThread() { + return ProcedureExecutor.isProcedureExecutionThread(); + } + @TestOnly public TSStatus createManyDatabases() { this.executor.submitProcedure(new CreateManyDatabasesProcedure()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 8b4eeed5a1b58..86fd9d499269f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -77,12 +77,14 @@ public class ConsensusManager { new ConfigRegionId(CONF.getConfigRegionId()); private final IManager configManager; + private final ConfigRegionStateMachine stateMachine; private IConsensus consensusImpl; private boolean isInitialized; public ConsensusManager(IManager configManager, ConfigRegionStateMachine stateMachine) { this.configManager = configManager; + this.stateMachine = stateMachine; setConsensusLayer(stateMachine); } @@ -424,39 +426,59 @@ public boolean isLeaderExist() { * NEED_REDIRECTION otherwise */ public TSStatus confirmLeader() { - TSStatus result = new TSStatus(); - if (isLeaderReady()) { - result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - } else { - result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); - if (isLeader()) { - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < MAX_WAIT_READY_TIME_MS) { - if (isLeaderReady()) { - result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - return result; - } - try { - Thread.sleep(RETRY_WAIT_TIME_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn( - ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY); - break; - } - } - result.setMessage( - "The current ConfigNode is leader but not ready yet, please try again later."); - } else { - result.setMessage( - "The current ConfigNode is not leader, please redirect to a new ConfigNode."); - } + return confirmLeader(true); + } + + private TSStatus confirmLeader(final boolean checkLoadReady) { + if (!isLeader()) { + TSStatus result = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); + result.setMessage( + "The current ConfigNode is not leader, please redirect to a new ConfigNode."); TConfigNodeLocation leaderLocation = getLeaderLocation(); if (leaderLocation != null) { result.setRedirectNode(leaderLocation.getInternalEndPoint()); } + return result; + } + + waitForLeaderReady(); + + if (!isLeaderReady()) { + return getLeaderWarmingUpStatus( + "The current ConfigNode is leader but consensus is not ready yet."); + } + if (!stateMachine.areLeaderServicesReady()) { + return getLeaderWarmingUpStatus( + "The current ConfigNode is leader but leader services are not ready yet."); + } + if (checkLoadReady && !configManager.getLoadManager().isLoadReady()) { + return getLeaderWarmingUpStatus(configManager.getLoadManager().getLoadReadyReason()); + } + + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + public TSStatus confirmLeaderForInternalProcedure() { + return confirmLeader(false); + } + + private void waitForLeaderReady() { + long startTime = System.currentTimeMillis(); + while (!isLeaderReady() && System.currentTimeMillis() - startTime < MAX_WAIT_READY_TIME_MS) { + try { + Thread.sleep(RETRY_WAIT_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn( + ManagerMessages.UNEXPECTED_INTERRUPTION_DURING_WAITING_FOR_CONFIGNODE_LEADER_READY); + return; + } } - return result; + } + + private TSStatus getLeaderWarmingUpStatus(String message) { + return new TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) + .setMessage(message); } public ConsensusGroupId getConsensusGroupId() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index e97f32bdbda85..c6bf610612580 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -50,7 +50,10 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -59,6 +62,8 @@ */ public class LoadManager { + private static final long FIRST_HEARTBEAT_READY_TOLERANCE_MS = TimeUnit.SECONDS.toMillis(30); + protected final IManager configManager; /** Balancers. */ @@ -74,6 +79,10 @@ public class LoadManager { private final StatisticsService statisticsService; private final EventService eventService; private final TopologyService topologyService; + private final AtomicBoolean loadServicesStarted; + private final AtomicLong loadReadyStartTimeMillis; + private final AtomicBoolean loadReady; + private volatile String loadReadyReason; public LoadManager(IManager configManager) { this.configManager = configManager; @@ -90,6 +99,10 @@ public LoadManager(IManager configManager) { this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator()); this.eventService.register(routeBalancer); this.eventService.register(topologyService); + this.loadServicesStarted = new AtomicBoolean(false); + this.loadReadyStartTimeMillis = new AtomicLong(0); + this.loadReady = new AtomicBoolean(false); + this.loadReadyReason = "ConfigNode leader load services are not started."; } protected void setHeartbeatService(IManager configManager, LoadCache loadCache) { @@ -147,7 +160,11 @@ public void reBalanceDataPartitionPolicy(String database) { } public void startLoadServices() { + loadReady.set(false); + loadReadyStartTimeMillis.set(System.currentTimeMillis()); + loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat sampling."; loadCache.initHeartbeatCache(configManager); + loadServicesStarted.set(true); heartbeatService.startHeartbeatService(); statisticsService.startLoadStatisticsService(); eventService.startEventService(); @@ -155,6 +172,10 @@ public void startLoadServices() { } public void stopLoadServices() { + loadServicesStarted.set(false); + loadReadyStartTimeMillis.set(0); + loadReady.set(false); + loadReadyReason = "ConfigNode leader load services are stopped."; heartbeatService.stopHeartbeatService(); statisticsService.stopLoadStatisticsService(); eventService.stopEventService(); @@ -163,6 +184,50 @@ public void stopLoadServices() { routeBalancer.clearRegionPriority(); } + public boolean isLoadReady() { + return loadReady.get() || tryUpdateLoadReady(); + } + + public String getLoadReadyReason() { + return loadReadyReason; + } + + private synchronized boolean tryUpdateLoadReady() { + if (loadReady.get()) { + return true; + } + if (!loadServicesStarted.get()) { + loadReadyReason = "ConfigNode leader load services are not started."; + return false; + } + + loadCache.updateNodeStatistics(false); + eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary(); + + List unreadyReasons = loadCache.getNodeHeartbeatUnreadyReasons(); + if (unreadyReasons.isEmpty()) { + loadReadyReason = "ConfigNode leader load services are ready."; + loadReady.set(true); + return true; + } + + long elapsedMillis = System.currentTimeMillis() - loadReadyStartTimeMillis.get(); + if (elapsedMillis < FIRST_HEARTBEAT_READY_TOLERANCE_MS) { + loadReadyReason = + "ConfigNode leader is waiting for first heartbeat from registered ConfigNodes/DataNodes: " + + unreadyReasons; + return false; + } + + loadReadyReason = + "ConfigNode leader load services are ready after waiting " + + FIRST_HEARTBEAT_READY_TOLERANCE_MS + + "ms for first heartbeat. Missing heartbeats: " + + unreadyReasons; + loadReady.set(true); + return true; + } + public void startTopologyService() { topologyService.startTopologyService(); } @@ -487,6 +552,14 @@ public RouteBalancer getRouteBalancer() { return routeBalancer; } + @TestOnly + void markLoadServicesStartedForTest(long loadReadyStartTimeMillis) { + loadServicesStarted.set(true); + loadReady.set(false); + this.loadReadyStartTimeMillis.set(loadReadyStartTimeMillis); + loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat sampling."; + } + @TestOnly public EventService getEventService() { return eventService; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java index d61a004352033..e5ab6445f98e7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java @@ -97,6 +97,10 @@ public AbstractHeartbeatSample getLastSample() { return slidingWindow.isEmpty() ? null : slidingWindow.get(slidingWindow.size() - 1); } + public boolean hasHeartbeatSample() { + return getLastSample() != null; + } + /** * Update currentStatistics based on the latest heartbeat sample that cached in the slidingWindow. */ diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 818171c89bcc8..3416246811edc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -82,6 +82,7 @@ public class LoadCache { Math.max( ProcedureManager.PROCEDURE_WAIT_TIME_OUT - TimeUnit.SECONDS.toMillis(2), TimeUnit.SECONDS.toMillis(10)); + private static final int MAX_UNREADY_ENTITY_PRINT = 10; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); @@ -496,6 +497,32 @@ public Map getCurrentConsensusGroup return consensusGroupStatisticsMap; } + public List getNodeHeartbeatUnreadyReasons() { + List unreadyNodes = new ArrayList<>(); + nodeCacheMap.forEach( + (nodeId, nodeCache) -> { + if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) { + return; + } + if ((nodeCache instanceof ConfigNodeHeartbeatCache + || nodeCache instanceof DataNodeHeartbeatCache) + && !nodeCache.hasHeartbeatSample()) { + unreadyNodes.add(nodeId); + } + }); + if (unreadyNodes.isEmpty()) { + return Collections.emptyList(); + } + Collections.sort(unreadyNodes); + List nodesToPrint = + unreadyNodes.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT, unreadyNodes.size())); + String suffix = + unreadyNodes.size() > MAX_UNREADY_ENTITY_PRINT + ? "...(" + (unreadyNodes.size() - MAX_UNREADY_ENTITY_PRINT) + " more)" + : ""; + return Collections.singletonList("nodes=" + nodesToPrint + suffix); + } + /** * Safely get NodeStatus by NodeId. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java index aa924dc29b585..7dcacc4fd805c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java @@ -41,7 +41,7 @@ public synchronized void updateCurrentStatistics(boolean forceUpdate) { synchronized (slidingWindow) { lastSample = (ConsensusGroupHeartbeatSample) getLastSample(); } - if (lastSample != null && lastSample.getLeaderId() != UN_READY_LEADER_ID) { + if (lastSample != null) { currentStatistics.set( new ConsensusGroupStatistics(System.nanoTime(), lastSample.getLeaderId())); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 11bb7f382d41e..82afea3859fd4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -56,6 +56,8 @@ public class ProcedureExecutor { private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); + private static final ThreadLocal PROCEDURE_EXECUTION_CONTEXT = + ThreadLocal.withInitial(() -> false); private final ConcurrentHashMap> completed = new ConcurrentHashMap<>(); @@ -96,6 +98,10 @@ public ProcedureExecutor(final Env environment, final IProcedureStore store this(environment, store, new SimpleProcedureScheduler()); } + public static boolean isProcedureExecutionThread() { + return PROCEDURE_EXECUTION_CONTEXT.get(); + } + public void init(int numThreads) { this.corePoolSize = numThreads; this.maxPoolSize = 10 * numThreads; @@ -784,7 +790,12 @@ public void run() { this.activeProcedure.set(procedure); activeExecutorCount.incrementAndGet(); startTime.set(System.currentTimeMillis()); - executeProcedure(procedure); + PROCEDURE_EXECUTION_CONTEXT.set(true); + try { + executeProcedure(procedure); + } finally { + PROCEDURE_EXECUTION_CONTEXT.remove(); + } activeExecutorCount.decrementAndGet(); LOG.trace( "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 037f138286a18..da766541786cb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -89,7 +89,7 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); - private static final int STARTUP_RETRY_NUM = 10; + private static final int STARTUP_RETRY_NUM = 20; private static final long STARTUP_RETRY_INTERVAL_IN_MS = TimeUnit.SECONDS.toMillis(3); private static final int SCHEDULE_WAITING_RETRY_NUM = (int) (COMMON_CONFIG.getCnConnectionTimeoutInMS() / STARTUP_RETRY_INTERVAL_IN_MS); @@ -414,6 +414,12 @@ private void sendRegisterConfigNodeRequest() throws StartupException, IOExceptio } else if (status.getCode() == TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) { LOGGER.warn( ConfigNodeMessages.THE_RESULT_OF_REGISTER_SELF_CONFIGNODE_IS_RETRY, status, retry); + } else if (status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) { + LOGGER.info( + "ConfigNode leader is warming up before serving the registering ConfigNode, will wait" + + " and retry. Status: {}, retry: {}", + status, + retry); } else { throw new StartupException(status.getMessage()); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java index fd62c31ae3678..e2c51df9693be 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java @@ -48,6 +48,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -292,4 +293,73 @@ public void testConsensusGroupCache() throws InterruptedException { new Pair<>(new ConsensusGroupStatistics(newLeaderId), null), differentConsensusGroupStatisticsMap.get(regionGroupId)); } + + @Test + public void testLoadWarmUpRequiresOnlyConfigNodeAndDataNodeSamples() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.DataRegion, 100); + Set dataNodeIds = Stream.of(11, 12).collect(Collectors.toSet()); + + loadCache.createNodeHeartbeatCache(NodeType.ConfigNode, 10); + dataNodeIds.forEach( + dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createNodeHeartbeatCache(NodeType.AINode, 13); + loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); + + Assert.assertEquals( + Collections.singletonList("nodes=[10, 11, 12]"), + loadCache.getNodeHeartbeatUnreadyReasons()); + + loadCache.cacheConfigNodeHeartbeatSample(10, new NodeHeartbeatSample(NodeStatus.Unknown)); + + dataNodeIds.forEach( + dataNodeId -> + loadCache.cacheDataNodeHeartbeatSample( + dataNodeId, new NodeHeartbeatSample(NodeStatus.Running))); + loadCache.updateNodeStatistics(false); + + Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty()); + } + + @Test + public void testRegionAndConsensusGroupsDoNotBlockLoadWarmUp() { + LoadCache loadCache = new LoadCache(); + TConsensusGroupId regionGroupId = new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 101); + Set dataNodeIds = Stream.of(21, 22).collect(Collectors.toSet()); + + dataNodeIds.forEach( + dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, dataNodeId)); + loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, dataNodeIds); + dataNodeIds.forEach( + dataNodeId -> { + loadCache.cacheDataNodeHeartbeatSample( + dataNodeId, new NodeHeartbeatSample(NodeStatus.Running)); + }); + loadCache.updateNodeStatistics(false); + loadCache.updateRegionGroupStatistics(); + loadCache.updateConsensusGroupStatistics(); + + Assert.assertTrue(loadCache.getNodeHeartbeatUnreadyReasons().isEmpty()); + } + + @Test + public void testLoadWarmUpToleratesMissingFirstHeartbeatAfterThirtySeconds() { + LOAD_CACHE.clearHeartbeatCache(); + LOAD_CACHE.createNodeHeartbeatCache(NodeType.DataNode, 31); + + try { + LOAD_MANAGER.markLoadServicesStartedForTest(System.currentTimeMillis()); + + Assert.assertFalse(LOAD_MANAGER.isLoadReady()); + Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("waiting for first heartbeat")); + + LOAD_MANAGER.markLoadServicesStartedForTest( + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(31)); + + Assert.assertTrue(LOAD_MANAGER.isLoadReady()); + Assert.assertTrue(LOAD_MANAGER.getLoadReadyReason().contains("Missing heartbeats")); + } finally { + LOAD_MANAGER.stopLoadServices(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index 1f0b09f0b8689..de69996e8af64 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -222,6 +222,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie "Failed to connect to ConfigNode %s from DataNode %s when executing %s, Exception:"; private static final long RETRY_INTERVAL_MS = 1000L; private static final long WAIT_CN_LEADER_ELECTION_INTERVAL_MS = 2000L; + private static final long REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS = 60_000L; private static final String UNSUPPORTED_INVOCATION = "This method is not supported for invocation by DataNode"; @@ -401,12 +402,33 @@ private boolean updateConfigNodeLeader(TSStatus status) { } return true; } + if (status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) { + if (!isFirstInitiated) { + logger.info( + "ConfigNode leader {} is warming up before serving DataNode {}, will wait and retry." + + " Reason: {}", + configNode, + config.getAddressAndPort(), + status.getMessage()); + } + try { + Thread.sleep(WAIT_CN_LEADER_ELECTION_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn(DataNodeMiscMessages.UNEXPECTED_INTERRUPTION_CONNECT_CONFIG_NODE_BREAK); + } + return true; + } return false; } finally { isFirstInitiated = false; } } + private boolean isConfigNodeLeaderWarmingUp(TSStatus status) { + return status.getCode() == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode(); + } + /** * The frame of execute RPC, include logic of retry and exception handling. * @@ -478,20 +500,33 @@ public TGetClusterIdResp getClusterId() throws TException { @Override public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException { - for (int i = 0; i < RETRY_NUM; i++) { + boolean leaderWarmingUpObserved = false; + long leaderWarmingUpRetryDeadline = 0; + for (int i = 0; + i < RETRY_NUM + || (leaderWarmingUpObserved + && System.currentTimeMillis() < leaderWarmingUpRetryDeadline); + i++) { try { TDataNodeRegisterResp resp = client.registerDataNode(req); if (!updateConfigNodeLeader(resp.status)) { return resp; } + if (isConfigNodeLeaderWarmingUp(resp.status) && !leaderWarmingUpObserved) { + leaderWarmingUpObserved = true; + leaderWarmingUpRetryDeadline = + System.currentTimeMillis() + REGISTER_LEADER_WARMING_UP_RETRY_TIMEOUT_MS; + } // set latest config node list - List newConfigNodes = new ArrayList<>(); - for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) { - newConfigNodes.add(configNodeLocation.getInternalEndPoint()); + if (resp.isSetConfigNodeList()) { + List newConfigNodes = new ArrayList<>(); + for (TConfigNodeLocation configNodeLocation : resp.getConfigNodeList()) { + newConfigNodes.add(configNodeLocation.getInternalEndPoint()); + } + configNodes = newConfigNodes; } - configNodes = newConfigNodes; } catch (TException e) { String message = String.format(