Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public enum TSStatusCode {
CAN_NOT_CONNECT_AINODE(1011),
NO_AVAILABLE_REPLICA(1012),
NO_AVAILABLE_AINODE(1013),
CONFIG_NODE_LEADER_WARMING_UP(1014),

// Sync, Load TsFile
LOAD_FILE_ERROR(1100),
Expand Down
1 change: 1 addition & 0 deletions iotdb-core/ainode/iotdb/ainode/core/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
class TSStatusCode(Enum):
SUCCESS_STATUS = 200
REDIRECTION_RECOMMEND = 400
CONFIG_NODE_LEADER_WARMING_UP = 1014
MODEL_EXISTED_ERROR = 1503
MODEL_NOT_EXIST_ERROR = 1504
CREATE_MODEL_ERROR = 1505
Expand Down
11 changes: 9 additions & 2 deletions iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(self, config_leader: TEndPoint):
"Fail to connect to any config node. Please check status of ConfigNodes"
)
self._RETRY_NUM = 10
self._STARTUP_RETRY_NUM = 60
self._RETRY_INTERVAL_IN_S = 1

self._try_to_connect()
Expand Down Expand Up @@ -163,6 +164,12 @@ def _update_config_node_leader(self, status: TSStatus) -> bool:
else:
self._config_leader = None
return True
if status.code == TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.get_status_code():
logger.info(
"ConfigNode leader is warming up before serving AINode, will wait and retry. Reason: {}",
status.message,
)
return True
return False

def node_register(
Expand All @@ -177,7 +184,7 @@ def node_register(
versionInfo=version_info,
)

for _ in range(0, self._RETRY_NUM):
for _ in range(0, self._STARTUP_RETRY_NUM):
try:
resp = self._client.registerAINode(req)
if not self._update_config_node_leader(resp.status):
Expand Down Expand Up @@ -208,7 +215,7 @@ def node_restart(
versionInfo=version_info,
)

for _ in range(0, self._RETRY_NUM):
for _ in range(0, self._STARTUP_RETRY_NUM):
try:
resp = self._client.restartAINode(req)
if not self._update_config_node_leader(resp.status):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.confignode.client.async.handlers.heartbeat;

import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.confignode.manager.load.LoadManager;
Expand Down Expand Up @@ -47,10 +46,8 @@ public void onComplete(TConfigNodeHeartbeatResp resp) {

@Override
public void onError(Exception e) {
if (ThriftClient.isConnectionBroken(e)) {
loadManager.forceUpdateNodeCache(
NodeType.ConfigNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown));
}
loadManager.forceUpdateNodeCache(
NodeType.ConfigNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown));
loadManager.getLoadCache().resetHeartbeatProcessing(nodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

package org.apache.iotdb.confignode.client.async.handlers.heartbeat;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.NodeType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator;
Expand All @@ -36,6 +35,7 @@

import org.apache.thrift.async.AsyncMethodCallback;

import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;

Expand Down Expand Up @@ -81,7 +81,7 @@
}

@Override
public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {

Check failure on line 84 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6IQxPF6wWHSYqa09KV&open=AZ6IQxPF6wWHSYqa09KV&pullRequest=17821
// Update NodeCache
loadManager
.getLoadCache()
Expand All @@ -89,46 +89,57 @@

RegionStatus regionStatus = RegionStatus.valueOf(heartbeatResp.getStatus());

heartbeatResp
.getJudgedLeaders()
.forEach(
(regionGroupId, isLeader) -> {

// Do not allow regions to inherit the Removing state from datanode
RegionStatus nextRegionStatus = regionStatus;
if (nextRegionStatus == RegionStatus.Removing) {
nextRegionStatus =
loadManager
.getLoadCache()
.getRegionCacheLastSampleStatus(regionGroupId, nodeId);
}

// Update RegionGroupCache
loadManager
.getLoadCache()
.cacheRegionHeartbeatSample(
regionGroupId,
nodeId,
new RegionHeartbeatSample(
heartbeatResp.getHeartbeatTimestamp(),
// Region will inherit DataNode's status
nextRegionStatus),
false);

if (((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
&& SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
|| (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
&& DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE))
&& Boolean.TRUE.equals(isLeader)) {
// Update ConsensusGroupCache when necessary
loadManager
.getLoadCache()
.cacheConsensusSample(
regionGroupId,
new ConsensusGroupHeartbeatSample(
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId));
}
});
Map<TConsensusGroupId, Boolean> judgedLeaders =
heartbeatResp.isSetJudgedLeaders()
? heartbeatResp.getJudgedLeaders()
: Collections.emptyMap();
judgedLeaders.forEach(
(regionGroupId, isLeader) -> {

// Do not allow regions to inherit the Removing state from datanode
RegionStatus nextRegionStatus = regionStatus;
if (nextRegionStatus == RegionStatus.Removing) {
nextRegionStatus =
loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, nodeId);
}

// Update RegionGroupCache
loadManager
.getLoadCache()
.cacheRegionHeartbeatSample(
regionGroupId,
nodeId,
new RegionHeartbeatSample(
heartbeatResp.getHeartbeatTimestamp(),
// Region will inherit DataNode's status
nextRegionStatus),
false);

boolean shouldCacheConsensusSample =
(TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
&& SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
|| (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
&& DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE);
boolean hasConsensusLogicalTimestamp =
heartbeatResp.isSetConsensusLogicalTimeMap()
&& heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId);
long logicalTimestamp =
hasConsensusLogicalTimestamp
? heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId)
: heartbeatResp.getHeartbeatTimestamp();
loadManager
.getLoadCache()
.cacheConsensusGroupHeartbeatSample(
regionGroupId,
nodeId,
Boolean.TRUE.equals(isLeader),
logicalTimestamp,
shouldCacheConsensusSample && hasConsensusLogicalTimestamp);
});
loadManager
.getLoadCache()
.cacheUnreportedDataNodeRegionHeartbeatSamples(
nodeId, judgedLeaders.keySet(), heartbeatResp.getHeartbeatTimestamp());

if (heartbeatResp.getRegionDeviceUsageMap() != null) {
deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap());
Expand Down Expand Up @@ -167,10 +178,9 @@

@Override
public void onError(Exception e) {
if (ThriftClient.isConnectionBroken(e)) {
loadManager.forceUpdateNodeCache(
NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown));
}
loadManager.forceUpdateNodeCache(
NodeType.DataNode, nodeId, new NodeHeartbeatSample(NodeStatus.Unknown));
loadManager.getLoadCache().cacheDataNodeHeartbeatFailureSample(nodeId, System.nanoTime());
loadManager.getLoadCache().resetHeartbeatProcessing(nodeId);
}
}
Loading
Loading