diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java index 4055398ddb7ec..790fd637d616a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java @@ -37,6 +37,11 @@ public enum CnToDnSyncRequestType { DELETE_OLD_REGION_PEER, RESET_PEER_LIST, + // Data Partition Table Maintenance + COLLECT_EARLIEST_TIMESLOTS, + GENERATE_DATA_PARTITION_TABLE, + GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + // PartitionCache INVALIDATE_PARTITION_CACHE, INVALIDATE_PERMISSION_CACHE, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index d63d5a74f6095..9f5729ef06dfd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -32,6 +32,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TKillQueryInstanceReq; @@ -139,6 +140,15 @@ private void buildActionMap() { actionMapBuilder.put( CnToDnSyncRequestType.SHOW_APPLIED_CONFIGURATIONS, (req, client) -> client.showAppliedConfigurations()); + actionMapBuilder.put( + CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS, + (req, client) -> client.getEarliestTimeslots()); + actionMapBuilder.put( + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE, + (req, client) -> client.generateDataPartitionTable((TGenerateDataPartitionTableReq) req)); + actionMapBuilder.put( + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + (req, client) -> client.generateDataPartitionTableHeartbeat()); actionMap = actionMapBuilder.build(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 3abb322d08472..59b318a4b11e9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -319,6 +319,8 @@ public class ConfigNodeConfig { private long forceWalPeriodForConfigNodeSimpleInMs = 100; + private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000; + public ConfigNodeConfig() { // empty constructor } @@ -1286,4 +1288,13 @@ public long getFailureDetectorPhiAcceptablePauseInMs() { public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) { this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } + + public long getPartitionTableRecoverWaitAllDnUpTimeoutInMs() { + return partitionTableRecoverWaitAllDnUpTimeoutInMs; + } + + public void setPartitionTableRecoverWaitAllDnUpTimeoutInMs( + long partitionTableRecoverWaitAllDnUpTimeoutInMs) { + this.partitionTableRecoverWaitAllDnUpTimeoutInMs = partitionTableRecoverWaitAllDnUpTimeoutInMs; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 77790dae1a903..9f8206f5dd78b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -322,6 +322,12 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "failure_detector_phi_acceptable_pause_in_ms", String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs())))); + conf.setPartitionTableRecoverWaitAllDnUpTimeoutInMs( + Long.parseLong( + properties.getProperty( + "partition_table_recover_wait_all_dn_up_timeout_ms", + String.valueOf(conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs())))); + String leaderDistributionPolicy = properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()); if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 646aaf66daf4f..1a69044d37d3d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -67,6 +67,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure; +import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure; @@ -1374,6 +1375,16 @@ public TSStatus createRegionGroups( } } + /** Used to repair the lost data partition table */ + public TSStatus dataPartitionTableIntegrityCheck() { + DataPartitionTableIntegrityCheckProcedure procedure; + synchronized (this) { + procedure = new DataPartitionTableIntegrityCheckProcedure(); + executor.submitProcedure(procedure); + } + return waitingProcedureFinished(procedure); + } + /** * Generate {@link CreateTriggerProcedure} and wait until it finished. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java new file mode 100644 index 0000000000000..c1ebd7ffccde1 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.partition; + +import org.apache.iotdb.confignode.manager.ConfigManager; + +/** + * Environment object for ConfigNode procedures. Provides access to ConfigManager and other + * necessary components. + */ +public class ConfigNodeProcedureEnv { + + private final ConfigManager configManager; + + public ConfigNodeProcedureEnv(ConfigManager configManager) { + this.configManager = configManager; + } + + public ConfigManager getConfigManager() { + return configManager; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java new file mode 100644 index 0000000000000..0ff1ec91acdb0 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java @@ -0,0 +1,954 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType; +import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; +import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Procedure for checking and restoring data partition table integrity. This procedure scans all + * DataNodes to detect missing data partitions and restores the DataPartitionTable on the ConfigNode + * Leader. + */ +public class DataPartitionTableIntegrityCheckProcedure + extends StateMachineProcedure< + ConfigNodeProcedureEnv, DataPartitionTableIntegrityCheckProcedureState> { + + private static final Logger LOG = + LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedure.class); + + // how many times will retry after rpc request failed + private static final int MAX_RETRY_COUNT = 3; + + // how long to start a heartbeat request, the unit is ms + private static final long HEART_BEAT_REQUEST_INTERVAL = 10000; + + // how long to check all datanode are alive, the unit is ms + private static final long CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL = 10000; + + NodeManager dataNodeManager; + private List allDataNodes = new ArrayList<>(); + + // ============Need serialize BEGIN=============/ + /** Collected earliest timeslots from DataNodes: database -> earliest timeslot */ + private Map earliestTimeslots = new ConcurrentHashMap<>(); + + /** DataPartitionTables collected from DataNodes: dataNodeId -> */ + private Map> dataPartitionTables = + new ConcurrentHashMap<>(); + + /** + * Collect all database names that those database lost data partition, the string in the Set + * collection is database name + */ + private Set databasesWithLostDataPartition = new HashSet<>(); + + /** + * Final merged DataPartitionTable for every database Map key(String): + * database name + */ + private Map finalDataPartitionTables; + + private static Set skipDataNodes = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private static Set failedDataNodes = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + + // ============Need serialize END=============/ + + public DataPartitionTableIntegrityCheckProcedure() { + super(); + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws InterruptedException { + try { + // Ensure to get the real-time DataNodes in the current cluster at every step + dataNodeManager = env.getConfigManager().getNodeManager(); + allDataNodes = dataNodeManager.getRegisteredDataNodes(); + + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + failedDataNodes = new HashSet<>(); + return collectEarliestTimeslots(); + case ANALYZE_MISSING_PARTITIONS: + databasesWithLostDataPartition = new HashSet<>(); + return analyzeMissingPartitions(env); + case REQUEST_PARTITION_TABLES: + return requestPartitionTables(); + case REQUEST_PARTITION_TABLES_HEART_BEAT: + return requestPartitionTablesHeartBeat(); + case MERGE_PARTITION_TABLES: + finalDataPartitionTables = new HashMap<>(); + return mergePartitionTables(env); + case WRITE_PARTITION_TABLE_TO_CONSENSUS: + return writePartitionTableToConsensus(env); + default: + throw new ProcedureException("Unknown state: " + state); + } + } catch (Exception e) { + LOG.error("[DataPartitionIntegrity] Error executing state {}: {}", state, e.getMessage(), e); + setFailure("DataPartitionTableIntegrityCheckProcedure", e); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv env, final DataPartitionTableIntegrityCheckProcedureState state) + throws IOException, InterruptedException, ProcedureException { + // Cleanup resources + switch (state) { + case COLLECT_EARLIEST_TIMESLOTS: + earliestTimeslots.clear(); + break; + case ANALYZE_MISSING_PARTITIONS: + databasesWithLostDataPartition.clear(); + break; + case REQUEST_PARTITION_TABLES: + case REQUEST_PARTITION_TABLES_HEART_BEAT: + dataPartitionTables.clear(); + break; + case MERGE_PARTITION_TABLES: + finalDataPartitionTables.clear(); + break; + default: + allDataNodes.clear(); + earliestTimeslots.clear(); + dataPartitionTables.clear(); + finalDataPartitionTables.clear(); + throw new ProcedureException("Unknown state for rollback: " + state); + } + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getState(final int stateId) { + return DataPartitionTableIntegrityCheckProcedureState.values()[stateId]; + } + + @Override + protected int getStateId(final DataPartitionTableIntegrityCheckProcedureState state) { + return state.ordinal(); + } + + @Override + protected DataPartitionTableIntegrityCheckProcedureState getInitialState() { + skipDataNodes = new HashSet<>(); + failedDataNodes = new HashSet<>(); + return DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS; + } + + /** + * Collect earliest timeslot information from all DataNodes. Each DataNode returns a Map where key is database name and value is the earliest timeslot id. + */ + private Flow collectEarliestTimeslots() { + if (LOG.isDebugEnabled()) { + LOG.debug("Collecting earliest timeslots from all DataNodes..."); + } + + if (allDataNodes.isEmpty()) { + LOG.error( + "[DataPartitionIntegrity] No DataNodes registered, no way to collect earliest timeslots, waiting for them to go up"); + sleep( + CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL, + "[DataPartitionIntegrity] Error waiting for DataNode startup due to thread interruption."); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + // Collect earliest timeslots from all DataNodes + allDataNodes.removeAll(skipDataNodes); + for (TDataNodeConfiguration dataNode : allDataNodes) { + try { + TGetEarliestTimeslotsResp resp = + (TGetEarliestTimeslotsResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS, + MAX_RETRY_COUNT); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failedDataNodes.add(dataNode); + LOG.error( + "[DataPartitionIntegrity] Failed to collected earliest timeslots from the DataNode[id={}], response status is {}", + dataNode.getLocation().getDataNodeId(), + resp.getStatus()); + continue; + } + + Map nodeTimeslots = resp.getDatabaseToEarliestTimeslot(); + + // Merge with existing timeslots (take minimum) + for (Map.Entry entry : nodeTimeslots.entrySet()) { + earliestTimeslots.merge(entry.getKey(), entry.getValue(), Math::min); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Collected earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + nodeTimeslots); + } + } catch (Exception e) { + LOG.error( + "[DataPartitionIntegrity] Failed to collect earliest timeslots from the DataNode[id={}]: {}", + dataNode.getLocation().getDataNodeId(), + e.getMessage(), + e); + failedDataNodes.add(dataNode); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Collected earliest timeslots from {} DataNodes: {}, the number of successful DataNodes is {}", + allDataNodes.size(), + earliestTimeslots, + allDataNodes.size() - failedDataNodes.size()); + } + + if (failedDataNodes.size() == allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + } else { + setNextState(DataPartitionTableIntegrityCheckProcedureState.ANALYZE_MISSING_PARTITIONS); + } + return Flow.HAS_MORE_STATE; + } + + /** + * Analyze which data partitions are missing based on earliest timeslots. Identify data partitions + * of databases need to be repaired. + */ + private Flow analyzeMissingPartitions(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Analyzing missing data partitions..."); + } + + if (earliestTimeslots.isEmpty()) { + LOG.warn( + "[DataPartitionIntegrity] No missing data partitions detected, nothing needs to be repaired, terminating procedure"); + return Flow.NO_MORE_STATE; + } + + // Find all databases that have lost data partition tables + for (Map.Entry entry : earliestTimeslots.entrySet()) { + String database = entry.getKey(); + long earliestTimeslot = entry.getValue(); + + // Get current DataPartitionTable from ConfigManager + Map>>> + localDataPartitionTable = getLocalDataPartitionTable(env, database); + + // Check if ConfigNode has a data partition that is associated with the earliestTimeslot + if (localDataPartitionTable == null + || localDataPartitionTable.isEmpty() + || localDataPartitionTable.get(database) == null + || localDataPartitionTable.get(database).isEmpty()) { + databasesWithLostDataPartition.add(database); + LOG.warn( + "[DataPartitionIntegrity] No data partition table related to database {} was found from the ConfigNode, and this issue needs to be repaired", + database); + continue; + } + + Map>> + seriesPartitionMap = localDataPartitionTable.get(database); + for (Map.Entry>> + seriesPartitionEntry : seriesPartitionMap.entrySet()) { + Map> tTimePartitionSlotListMap = + seriesPartitionEntry.getValue(); + + if (tTimePartitionSlotListMap.isEmpty()) { + continue; + } + + TTimePartitionSlot localEarliestSlot = + tTimePartitionSlotListMap.keySet().stream() + .min(Comparator.comparingLong(TTimePartitionSlot::getStartTime)) + .orElse(null); + + if (localEarliestSlot.getStartTime() + > TimePartitionUtils.getStartTimeByPartitionId(earliestTimeslot)) { + databasesWithLostDataPartition.add(database); + LOG.warn( + "[DataPartitionIntegrity] Database {} has lost timeslot {} in its data table partition, and this issue needs to be repaired", + database, + earliestTimeslot); + } + } + } + + if (databasesWithLostDataPartition.isEmpty()) { + LOG.info( + "[DataPartitionIntegrity] No databases have lost data partitions, terminating procedure"); + return Flow.NO_MORE_STATE; + } + + LOG.info( + "[DataPartitionIntegrity] Identified {} databases have lost data partitions, will request DataPartitionTable generation from {} DataNodes", + databasesWithLostDataPartition.size(), + allDataNodes.size() - failedDataNodes.size()); + setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); + return Flow.HAS_MORE_STATE; + } + + private Map>>> + getLocalDataPartitionTable(final ConfigNodeProcedureEnv env, final String database) { + Map> schemaPartitionTable = + env.getConfigManager() + .getSchemaPartition(Collections.singletonMap(database, Collections.emptyList())) + .getSchemaPartitionTable(); + + // Construct request for getting data partition + final Map> partitionSlotsMap = new HashMap<>(); + schemaPartitionTable.forEach( + (key, value) -> { + Map slotListMap = new HashMap<>(); + value + .keySet() + .forEach( + slot -> + slotListMap.put( + slot, new TTimeSlotList(Collections.emptyList(), true, true))); + partitionSlotsMap.put(key, slotListMap); + }); + final GetDataPartitionPlan getDataPartitionPlan = new GetDataPartitionPlan(partitionSlotsMap); + return env.getConfigManager().getDataPartition(getDataPartitionPlan).getDataPartitionTable(); + } + + /** + * Request DataPartitionTable generation from target DataNodes. Each DataNode scans its tsfile + * resources and generates a DataPartitionTable. + */ + private Flow requestPartitionTables() { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Requesting DataPartitionTable generation from {} DataNodes...", allDataNodes.size()); + } + + if (allDataNodes.isEmpty()) { + LOG.error( + "[DataPartitionIntegrity] No DataNodes registered, no way to requested DataPartitionTable generation, terminating procedure"); + sleep( + CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL, + "[DataPartitionIntegrity] Error waiting for DataNode startup due to thread interruption."); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + allDataNodes.removeAll(skipDataNodes); + allDataNodes.removeAll(failedDataNodes); + for (TDataNodeConfiguration dataNode : allDataNodes) { + int dataNodeId = dataNode.getLocation().getDataNodeId(); + if (!dataPartitionTables.containsKey(dataNodeId)) { + try { + TGenerateDataPartitionTableReq req = new TGenerateDataPartitionTableReq(); + req.setDatabases(databasesWithLostDataPartition); + TGenerateDataPartitionTableResp resp = + (TGenerateDataPartitionTableResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + req, + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE, + MAX_RETRY_COUNT); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failedDataNodes.add(dataNode); + LOG.error( + "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from the DataNode[id={}], response status is {}", + dataNode.getLocation().getDataNodeId(), + resp.getStatus()); + } + } catch (Exception e) { + failedDataNodes.add(dataNode); + LOG.error( + "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from DataNode[id={}]: {}", + dataNodeId, + e.getMessage(), + e); + } + } + } + + if (failedDataNodes.size() == allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + setNextState( + DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT); + return Flow.HAS_MORE_STATE; + } + + private Flow requestPartitionTablesHeartBeat() { + if (LOG.isDebugEnabled()) { + LOG.debug("Checking DataPartitionTable generation completion status..."); + } + + int completeCount = 0; + for (TDataNodeConfiguration dataNode : allDataNodes) { + int dataNodeId = dataNode.getLocation().getDataNodeId(); + + if (!dataPartitionTables.containsKey(dataNodeId)) { + try { + TGenerateDataPartitionTableHeartbeatResp resp = + (TGenerateDataPartitionTableHeartbeatResp) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getLocation().getInternalEndPoint(), + null, + CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT, + MAX_RETRY_COUNT); + DataPartitionTableGeneratorState state = + DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode()); + + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOG.error( + "[DataPartitionIntegrity] Failed to request DataPartitionTable generation heart beat from the DataNode[id={}], state is {}, response status is {}", + dataNode.getLocation().getDataNodeId(), + state, + resp.getStatus()); + continue; + } + + switch (state) { + case SUCCESS: + List byteBufferList = resp.getDatabaseScopedDataPartitionTables(); + List databaseScopedDataPartitionTableList = + deserializeDatabaseScopedTableList(byteBufferList); + dataPartitionTables.put(dataNodeId, databaseScopedDataPartitionTableList); + LOG.info( + "[DataPartitionIntegrity] DataNode {} completed DataPartitionTable generation, terminating heart beat", + dataNodeId); + completeCount++; + break; + case IN_PROGRESS: + LOG.info( + "[DataPartitionIntegrity] DataNode {} still generating DataPartitionTable", + dataNodeId); + break; + default: + failedDataNodes.add(dataNode); + LOG.error( + "[DataPartitionIntegrity] DataNode {} returned unknown error code: {}", + dataNodeId, + resp.getErrorCode()); + break; + } + } catch (Exception e) { + LOG.error( + "[DataPartitionIntegrity] Error checking DataPartitionTable status from DataNode {}: {}, terminating heart beat", + dataNodeId, + e.getMessage(), + e); + completeCount++; + } + } else { + completeCount++; + } + } + + if (completeCount >= allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.MERGE_PARTITION_TABLES); + return Flow.HAS_MORE_STATE; + } + + // Don't find any one data partition table generation task on all registered DataNodes, go back + // to the REQUEST_PARTITION_TABLES step and re-execute + if (failedDataNodes.size() == allDataNodes.size()) { + setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES); + return Flow.HAS_MORE_STATE; + } + + sleep( + HEART_BEAT_REQUEST_INTERVAL, + "[DataPartitionIntegrity] Error checking DataPartitionTable status due to thread interruption."); + setNextState( + DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT); + return Flow.HAS_MORE_STATE; + } + + private static void sleep(long intervalTime, String logMessage) { + try { + Thread.sleep(intervalTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error(logMessage); + } + } + + /** Merge DataPartitionTables from all DataNodes into a final table. */ + private Flow mergePartitionTables(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Merging DataPartitionTables from {} DataNodes...", dataPartitionTables.size()); + } + + if (dataPartitionTables.isEmpty()) { + LOG.error( + "[DataPartitionIntegrity] No DataPartitionTables to merge, dataPartitionTables is empty"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } + + for (String database : databasesWithLostDataPartition) { + Map finalDataPartitionMap = new HashMap<>(); + + // Get current DataPartitionTable from ConfigManager + Map>>> + localDataPartitionTableMap = getLocalDataPartitionTable(env, database); + + // Check if ConfigNode has a data partition that is associated with the earliestTimeslot + if (localDataPartitionTableMap == null + || localDataPartitionTableMap.isEmpty() + || localDataPartitionTableMap.get(database) == null + || localDataPartitionTableMap.get(database).isEmpty()) { + LOG.warn( + "[DataPartitionIntegrity] No data partition table related to database {} was found from the ConfigNode, use data partition table of DataNode directly", + database); + continue; + } + + localDataPartitionTableMap + .values() + .forEach( + map -> + map.forEach( + (tSeriesPartitionSlot, seriesPartitionTableMap) -> { + if (tSeriesPartitionSlot == null + || seriesPartitionTableMap == null + || seriesPartitionTableMap.isEmpty()) { + return; + } + finalDataPartitionMap.computeIfAbsent( + tSeriesPartitionSlot, + k -> new SeriesPartitionTable(seriesPartitionTableMap)); + })); + + dataPartitionTables.forEach( + (k, v) -> + v.forEach( + databaseScopedDataPartitionTable -> { + if (!databaseScopedDataPartitionTable.getDatabase().equals(database)) { + return; + } + finalDataPartitionTables.put( + database, + new DataPartitionTable(finalDataPartitionMap) + .merge(databaseScopedDataPartitionTable.getDataPartitionTable())); + })); + } + + LOG.info("[DataPartitionIntegrity] DataPartitionTables merge completed successfully"); + setNextState(DataPartitionTableIntegrityCheckProcedureState.WRITE_PARTITION_TABLE_TO_CONSENSUS); + return Flow.HAS_MORE_STATE; + } + + /** Write the final DataPartitionTable to consensus log. */ + private Flow writePartitionTableToConsensus(final ConfigNodeProcedureEnv env) { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing DataPartitionTable to consensus log..."); + } + + if (databasesWithLostDataPartition.isEmpty()) { + LOG.error("[DataPartitionIntegrity] No database lost data partition table"); + setFailure( + "DataPartitionTableIntegrityCheckProcedure", + new ProcedureException("No database lost data partition table for consensus write")); + return getFlow(); + } + + if (finalDataPartitionTables.isEmpty()) { + LOG.error("[DataPartitionIntegrity] DataPartitionTable to write to consensus"); + setFailure( + "DataPartitionTableIntegrityCheckProcedure", + new ProcedureException("No DataPartitionTable available for consensus write")); + return getFlow(); + } + + int failedCnt = 0; + final int maxRetryCountForConsensus = 3; + while (failedCnt < maxRetryCountForConsensus) { + try { + CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan(); + Map assignedDataPartition = new HashMap<>(); + for (String database : databasesWithLostDataPartition) { + assignedDataPartition.put(database, finalDataPartitionTables.get(database)); + } + createPlan.setAssignedDataPartition(assignedDataPartition); + TSStatus tsStatus = env.getConfigManager().getConsensusManager().write(createPlan); + + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOG.info( + "[DataPartitionIntegrity] DataPartitionTable successfully written to consensus log"); + break; + } else { + LOG.error("[DataPartitionIntegrity] Failed to write DataPartitionTable to consensus log"); + setFailure( + "DataPartitionTableIntegrityCheckProcedure", + new ProcedureException("Failed to write DataPartitionTable to consensus log")); + } + } catch (Exception e) { + LOG.error("[DataPartitionIntegrity] Error writing DataPartitionTable to consensus log", e); + setFailure("DataPartitionTableIntegrityCheckProcedure", e); + } + failedCnt++; + } + + return getFlow(); + } + + /** + * Determine whether there are still DataNode nodes with failed execution of a certain step in + * this round. If such nodes exist, calculate the skipDataNodes and exclude these nodes when + * requesting the list of DataNode nodes in the cluster for the next round; if no such nodes + * exist, it means the procedure has been completed + */ + private Flow getFlow() { + if (!failedDataNodes.isEmpty()) { + allDataNodes.removeAll(failedDataNodes); + skipDataNodes = new HashSet<>(allDataNodes); + setNextState(DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS); + return Flow.HAS_MORE_STATE; + } else { + skipDataNodes.clear(); + return Flow.NO_MORE_STATE; + } + } + + @Override + public void serialize(final DataOutputStream stream) throws IOException { + super.serialize(stream); + + // Serialize earliestTimeslots + stream.writeInt(earliestTimeslots.size()); + for (Map.Entry entry : earliestTimeslots.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), stream); + stream.writeLong(entry.getValue()); + } + + // Serialize dataPartitionTables count + stream.writeInt(dataPartitionTables.size()); + for (Map.Entry> entry : + dataPartitionTables.entrySet()) { + stream.writeInt(entry.getKey()); + + List tableList = entry.getValue(); + stream.writeInt(tableList.size()); + + for (DatabaseScopedDataPartitionTable table : tableList) { + try (final PublicBAOS publicBAOS = new PublicBAOS(); + final DataOutputStream tmpStream = new DataOutputStream(publicBAOS)) { + + TTransport transport = new TIOStreamTransport(tmpStream); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + table.serialize(tmpStream, protocol); + + byte[] buf = publicBAOS.getBuf(); + int size = publicBAOS.size(); + ReadWriteIOUtils.write(size, stream); + stream.write(buf, 0, size); + } catch (IOException | TException e) { + LOG.error( + "[DataPartitionIntegrity] {} serialize failed for dataNodeId: {}", + this.getClass().getSimpleName(), + entry.getKey(), + e); + throw new IOException("Failed to serialize dataPartitionTables", e); + } + } + } + + stream.writeInt(databasesWithLostDataPartition.size()); + for (String database : databasesWithLostDataPartition) { + ReadWriteIOUtils.write(database, stream); + } + + if (finalDataPartitionTables != null && !finalDataPartitionTables.isEmpty()) { + stream.writeInt(finalDataPartitionTables.size()); + + for (Map.Entry entry : finalDataPartitionTables.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), stream); + + try (final PublicBAOS publicBAOS = new PublicBAOS(); + final DataOutputStream tmpStream = new DataOutputStream(publicBAOS)) { + TTransport transport = new TIOStreamTransport(tmpStream); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + entry.getValue().serialize(tmpStream, protocol); + + byte[] buf = publicBAOS.getBuf(); + int size = publicBAOS.size(); + ReadWriteIOUtils.write(size, stream); + stream.write(buf, 0, size); + } catch (IOException | TException e) { + LOG.error( + "[DataPartitionIntegrity] {} serialize finalDataPartitionTables failed", + this.getClass().getSimpleName(), + e); + throw new IOException("Failed to serialize finalDataPartitionTables", e); + } + } + } else { + stream.writeInt(0); + } + + stream.writeInt(skipDataNodes.size()); + for (TDataNodeConfiguration skipDataNode : skipDataNodes) { + try (final PublicBAOS publicBAOS = new PublicBAOS(); + final DataOutputStream tmpStream = new DataOutputStream(publicBAOS)) { + TTransport transport = new TIOStreamTransport(tmpStream); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + skipDataNode.write(protocol); + + byte[] buf = publicBAOS.getBuf(); + int size = publicBAOS.size(); + ReadWriteIOUtils.write(size, stream); + stream.write(buf, 0, size); + } catch (TException e) { + LOG.error("[DataPartitionIntegrity] Failed to serialize skipDataNode", e); + throw new IOException("Failed to serialize skipDataNode", e); + } + } + + stream.writeInt(failedDataNodes.size()); + for (TDataNodeConfiguration failedDataNode : failedDataNodes) { + try (final PublicBAOS publicBAOS = new PublicBAOS(); + final DataOutputStream tmpStream = new DataOutputStream(publicBAOS)) { + TTransport transport = new TIOStreamTransport(tmpStream); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + failedDataNode.write(protocol); + + byte[] buf = publicBAOS.getBuf(); + int size = publicBAOS.size(); + ReadWriteIOUtils.write(size, stream); + stream.write(buf, 0, size); + } catch (TException e) { + LOG.error("[DataPartitionIntegrity] Failed to serialize failedDataNode", e); + throw new IOException("Failed to serialize failedDataNode", e); + } + } + } + + @Override + public void deserialize(final ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + + // Deserialize earliestTimeslots + int earliestTimeslotsSize = byteBuffer.getInt(); + earliestTimeslots = new ConcurrentHashMap<>(); + for (int i = 0; i < earliestTimeslotsSize; i++) { + String database = ReadWriteIOUtils.readString(byteBuffer); + long timeslot = byteBuffer.getLong(); + earliestTimeslots.put(database, timeslot); + } + + // Deserialize dataPartitionTables count + int dataPartitionTablesSize = byteBuffer.getInt(); + dataPartitionTables = new ConcurrentHashMap<>(); + for (int i = 0; i < dataPartitionTablesSize; i++) { + int dataNodeId = byteBuffer.getInt(); + int listSize = byteBuffer.getInt(); + + List tableList = new ArrayList<>(listSize); + + for (int j = 0; j < listSize; j++) { + int dataSize = byteBuffer.getInt(); + byte[] bytes = new byte[dataSize]; + byteBuffer.get(bytes); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream dis = new DataInputStream(bais)) { + + TTransport transport = new TIOStreamTransport(dis); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + DatabaseScopedDataPartitionTable table = + DatabaseScopedDataPartitionTable.deserialize(dis, protocol); + tableList.add(table); + + } catch (IOException | TException e) { + LOG.error( + "[DataPartitionIntegrity] {} deserialize failed for dataNodeId: {}", + this.getClass().getSimpleName(), + dataNodeId, + e); + throw new RuntimeException("Failed to deserialize dataPartitionTables", e); + } + } + + dataPartitionTables.put(dataNodeId, tableList); + } + + int databasesWithLostDataPartitionSize = byteBuffer.getInt(); + for (int i = 0; i < databasesWithLostDataPartitionSize; i++) { + String database = ReadWriteIOUtils.readString(byteBuffer); + databasesWithLostDataPartition.add(database); + } + + // Deserialize finalDataPartitionTable size + int finalDataPartitionTablesSize = byteBuffer.getInt(); + finalDataPartitionTables = new ConcurrentHashMap<>(); + + for (int i = 0; i < finalDataPartitionTablesSize; i++) { + String database = ReadWriteIOUtils.readString(byteBuffer); + + int dataSize = byteBuffer.getInt(); + byte[] bytes = new byte[dataSize]; + byteBuffer.get(bytes); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream dis = new DataInputStream(bais)) { + + TTransport transport = new TIOStreamTransport(dis); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + DataPartitionTable dataPartitionTable = new DataPartitionTable(); + dataPartitionTable.deserialize(dis, protocol); + + finalDataPartitionTables.put(database, dataPartitionTable); + + } catch (IOException | TException e) { + LOG.error( + "[DataPartitionIntegrity] {} deserialize finalDataPartitionTables failed", + this.getClass().getSimpleName(), + e); + throw new RuntimeException("Failed to deserialize finalDataPartitionTables", e); + } + } + + skipDataNodes = new HashSet<>(); + int skipDataNodesSize = byteBuffer.getInt(); + for (int i = 0; i < skipDataNodesSize; i++) { + int size = byteBuffer.getInt(); + byte[] bytes = new byte[size]; + byteBuffer.get(bytes); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { + TTransport transport = new TIOStreamTransport(bais); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + TDataNodeConfiguration dataNode = new TDataNodeConfiguration(); + dataNode.read(protocol); + skipDataNodes.add(dataNode); + } catch (TException | IOException e) { + LOG.error("[DataPartitionIntegrity] Failed to deserialize skipDataNode", e); + throw new RuntimeException(e); + } + } + + failedDataNodes = new HashSet<>(); + int failedDataNodesSize = byteBuffer.getInt(); + for (int i = 0; i < failedDataNodesSize; i++) { + int size = byteBuffer.getInt(); + byte[] bytes = new byte[size]; + byteBuffer.get(bytes); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) { + TTransport transport = new TIOStreamTransport(bais); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + + TDataNodeConfiguration dataNode = new TDataNodeConfiguration(); + dataNode.read(protocol); + failedDataNodes.add(dataNode); + } catch (TException | IOException e) { + LOG.error("[DataPartitionIntegrity] Failed to deserialize failedDataNode", e); + throw new RuntimeException(e); + } + } + } + + private List deserializeDatabaseScopedTableList( + List dataList) { + if (dataList == null || dataList.isEmpty()) { + return Collections.emptyList(); + } + + List result = new ArrayList<>(dataList.size()); + + for (ByteBuffer data : dataList) { + if (data == null || data.remaining() == 0) { + LOG.warn("[DataPartitionIntegrity] Skipping empty ByteBuffer during deserialization"); + continue; + } + + try { + DatabaseScopedDataPartitionTable table = DatabaseScopedDataPartitionTable.deserialize(data); + result.add(table); + } catch (Exception e) { + LOG.error( + "[DataPartitionIntegrity] Failed to deserialize DatabaseScopedDataPartitionTable", e); + } + } + + return result; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java new file mode 100644 index 0000000000000..bf302db755bac --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.state; + +public enum DataPartitionTableIntegrityCheckProcedureState { + /** Collect earliest timeslot information from all DataNodes */ + COLLECT_EARLIEST_TIMESLOTS, + /** Analyze missing data partitions */ + ANALYZE_MISSING_PARTITIONS, + /** Request DataPartitionTable generation from DataNodes */ + REQUEST_PARTITION_TABLES, + /** Round robin get DataPartitionTable generation result from DataNodes */ + REQUEST_PARTITION_TABLES_HEART_BEAT, + /** Merge DataPartitionTables from all DataNodes */ + MERGE_PARTITION_TABLES, + /** Write final DataPartitionTable to raft log */ + WRITE_PARTITION_TABLE_TO_CONSENSUS +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index dd15558608718..140fffa852ccc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure; import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure; +import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure; import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure; @@ -404,6 +405,9 @@ public Procedure create(ByteBuffer buffer) throws IOException { case ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE: procedure = new AddNeverFinishSubProcedureProcedure(); break; + case DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE: + procedure = new DataPartitionTableIntegrityCheckProcedure(); + break; default: LOGGER.error("Unknown Procedure type: {}", typeCode); throw new IOException("Unknown Procedure type: " + typeCode); @@ -554,6 +558,8 @@ public static ProcedureType getProcedureType(final Procedure procedure) { return ProcedureType.NEVER_FINISH_PROCEDURE; } else if (procedure instanceof AddNeverFinishSubProcedureProcedure) { return ProcedureType.ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE; + } else if (procedure instanceof DataPartitionTableIntegrityCheckProcedure) { + return ProcedureType.DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE; } throw new UnsupportedOperationException( "Procedure type " + procedure.getClass() + " is not supported"); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java index 820a90f7ebfb9..839c8ace0984d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java @@ -172,7 +172,10 @@ public enum ProcedureType { @TestOnly NEVER_FINISH_PROCEDURE((short) 30000), @TestOnly - ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 30001); + ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 30001), + + /** Data Partition Table Integrity Check */ + DATA_PARTITION_TABLE_INTEGRITY_CHECK_PROCEDURE((short) 1600); private final short typeCode; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index f20f77095d97a..3a6c93b12ec2f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -24,6 +24,8 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.ServerCommandLine; import org.apache.iotdb.commons.client.ClientManagerMetrics; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadModule; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.ThreadPoolMetrics; @@ -79,6 +81,9 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { @@ -110,6 +115,11 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { private int exitStatusCode = 0; + private Future dataPartitionTableCheckFuture; + + private ExecutorService dataPartitionTableCheckExecutor = + IoTDBThreadPoolFactory.newSingleThreadExecutor("DATA_PARTITION_TABLE_CHECK"); + public ConfigNode() { super("ConfigNode"); // We do not init anything here, so that we can re-initialize the instance in IT. @@ -147,6 +157,15 @@ protected void start() throws IoTDBException { } active(); LOGGER.info("IoTDB started"); + if (dataPartitionTableCheckFuture != null) { + try { + dataPartitionTableCheckFuture.get(); + } catch (ExecutionException | InterruptedException e) { + LOGGER.error("Data partition table check task execute failed", e); + } finally { + dataPartitionTableCheckExecutor.shutdownNow(); + } + } } @Override @@ -175,7 +194,7 @@ public void active() { int configNodeId = CONF.getConfigNodeId(); configManager.initConsensusManager(); upgrade(); - waitForLeaderElected(); + TConfigNodeLocation leaderNodeLocation = waitForLeaderElected(); setUpMetricService(); // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure // that the external service is not provided until ConfigNode is fully available @@ -203,6 +222,40 @@ public void active() { } loadSecretKey(); loadHardwareCode(); + + /* After the ConfigNode leader election, a leader switch may occur, which could cause the procedure not to be created. This can happen if the original leader has not yet executed the procedure creation, while the other followers have already finished starting up. Therefore, having the original leader (before the leader switch) initiate the process ensures that only one procedure will be created. */ + if (leaderNodeLocation.getConfigNodeId() == configNodeId) { + dataPartitionTableCheckFuture = + dataPartitionTableCheckExecutor.submit( + () -> { + LOGGER.info( + "[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes started up"); + Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs()); + + while (true) { + List dnList = + configManager + .getLoadManager() + .filterDataNodeThroughStatus(NodeStatus.Running); + if (dnList != null && !dnList.isEmpty()) { + LOGGER.info("Starting dataPartitionTableIntegrityCheck..."); + TSStatus status = + configManager.getProcedureManager().dataPartitionTableIntegrityCheck(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.error( + "Data partition table integrity check failed! Current status code is {}, status message is {}", + status.getCode(), + status.getMessage()); + } + break; + } else { + LOGGER.info("No running datanodes found, waiting..."); + Thread.sleep(5000); + } + } + return null; + }); + } return; } else { saveSecretKey(); @@ -469,7 +522,7 @@ protected ConfigNodeRPCServiceProcessor getConfigNodeRPCServiceProcessor() { return new ConfigNodeRPCServiceProcessor(configManager); } - private void waitForLeaderElected() { + private TConfigNodeLocation waitForLeaderElected() { while (!configManager.getConsensusManager().isLeaderExist()) { LOGGER.info("Leader has not been elected yet, wait for 1 second"); try { @@ -479,6 +532,7 @@ private void waitForLeaderElected() { LOGGER.warn("Unexpected interruption during waiting for leader election."); } } + return configManager.getConsensusManager().getLeaderLocation(); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 722e04edb306a..69eab2d79cf28 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1239,6 +1239,11 @@ public class IoTDBConfig { private int maxSubTaskNumForInformationTableScan = 4; + /* Need use these parameters when repair data partition table */ + private int partitionTableRecoverWorkerNum = 10; + // Rate limit set to 10 MB/s + private int partitionTableRecoverMaxReadMBsPerSecond = 10; + IoTDBConfig() {} public int getMaxLogEntriesNumPerBatch() { @@ -4435,4 +4440,21 @@ public int getMaxSubTaskNumForInformationTableScan() { public void setMaxSubTaskNumForInformationTableScan(int maxSubTaskNumForInformationTableScan) { this.maxSubTaskNumForInformationTableScan = maxSubTaskNumForInformationTableScan; } + + public int getPartitionTableRecoverWorkerNum() { + return partitionTableRecoverWorkerNum; + } + + public void setPartitionTableRecoverWorkerNum(int partitionTableRecoverWorkerNum) { + this.partitionTableRecoverWorkerNum = partitionTableRecoverWorkerNum; + } + + public int getPartitionTableRecoverMaxReadMBsPerSecond() { + return partitionTableRecoverMaxReadMBsPerSecond; + } + + public void setPartitionTableRecoverMaxReadMBsPerSecond( + int partitionTableRecoverMaxReadMBsPerSecond) { + this.partitionTableRecoverMaxReadMBsPerSecond = partitionTableRecoverMaxReadMBsPerSecond; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 588e121135c93..5fad7697b9317 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1139,6 +1139,17 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException // update trusted_uri_pattern loadTrustedUriPattern(properties); + conf.setPartitionTableRecoverWorkerNum( + Integer.parseInt( + properties.getProperty( + "partition_table_recover_worker_num", + String.valueOf(conf.getPartitionTableRecoverWorkerNum())))); + conf.setPartitionTableRecoverMaxReadMBsPerSecond( + Integer.parseInt( + properties.getProperty( + "partition_table_recover_max_read_megabytes_per_second", + String.valueOf(conf.getPartitionTableRecoverMaxReadMBsPerSecond())))); + conf.setIncludeNullValueInWriteThroughputMetric( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java new file mode 100644 index 0000000000000..1b43cb2bc5446 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.partition; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.SeriesPartitionTable; +import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import com.google.common.util.concurrent.RateLimiter; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Generator for DataPartitionTable by scanning tsfile resources. This class scans the data + * directory structure and builds a complete DataPartitionTable based on existing tsfiles. + */ +public class DataPartitionTableGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(DataPartitionTableGenerator.class); + + // Task status + private volatile TaskStatus status = TaskStatus.NOT_STARTED; + private volatile String errorMessage; + private Map databasePartitionTableMap = new ConcurrentHashMap<>(); + + // Progress tracking + private final AtomicInteger processedTimePartitions = new AtomicInteger(0); + private final AtomicInteger failedTimePartitions = new AtomicInteger(0); + private long totalTimePartitions = 0; + + // Configuration + private final ExecutorService executor; + private final Set databases; + private final int seriesSlotNum; + private final String seriesPartitionExecutorClass; + + private final RateLimiter limiter = + RateLimiter.create( + (long) + IoTDBDescriptor.getInstance() + .getConfig() + .getPartitionTableRecoverMaxReadMBsPerSecond() + * 1024 + * 1024); + + public static final Set IGNORE_DATABASE = + new HashSet() { + { + add("root.__audit"); + add("root.__system"); + } + }; + + public DataPartitionTableGenerator( + ExecutorService executor, + Set databases, + int seriesSlotNum, + String seriesPartitionExecutorClass) { + this.executor = executor; + this.databases = databases; + this.seriesSlotNum = seriesSlotNum; + this.seriesPartitionExecutorClass = seriesPartitionExecutorClass; + } + + public Map getDatabasePartitionTableMap() { + return databasePartitionTableMap; + } + + public enum TaskStatus { + NOT_STARTED, + IN_PROGRESS, + COMPLETED, + FAILED + } + + /** Start generating DataPartitionTable asynchronously. */ + public CompletableFuture startGeneration() { + if (status != TaskStatus.NOT_STARTED) { + throw new IllegalStateException("Task is already started or completed"); + } + + status = TaskStatus.IN_PROGRESS; + return CompletableFuture.runAsync(this::generateDataPartitionTableByMemory); + } + + private void generateDataPartitionTableByMemory() { + List> futures = new ArrayList<>(); + + SeriesPartitionExecutor seriesPartitionExecutor = + SeriesPartitionExecutor.getSeriesPartitionExecutor( + seriesPartitionExecutorClass, seriesSlotNum); + + try { + totalTimePartitions = + StorageEngine.getInstance().getAllDataRegions().stream() + .mapToLong( + dataRegion -> + (dataRegion == null) + ? 0 + : dataRegion.getTsFileManager().getTimePartitions().size()) + .sum(); + for (DataRegion dataRegion : StorageEngine.getInstance().getAllDataRegions()) { + CompletableFuture regionFuture = + CompletableFuture.runAsync( + () -> { + try { + TsFileManager tsFileManager = dataRegion.getTsFileManager(); + String databaseName = dataRegion.getDatabaseName(); + if (!databases.contains(databaseName) + || IGNORE_DATABASE.contains(databaseName)) { + return; + } + + Map dataPartitionMap = + new ConcurrentHashMap<>(); + + tsFileManager.readLock(); + List seqTsFileList = tsFileManager.getTsFileList(true); + List unseqTsFileList = tsFileManager.getTsFileList(false); + tsFileManager.readUnlock(); + + constructDataPartitionMap( + seqTsFileList, seriesPartitionExecutor, dataPartitionMap); + constructDataPartitionMap( + unseqTsFileList, seriesPartitionExecutor, dataPartitionMap); + + if (dataPartitionMap.isEmpty()) { + LOG.error("Failed to generate DataPartitionTable, dataPartitionMap is empty"); + status = TaskStatus.FAILED; + errorMessage = "DataPartitionMap is empty after processing resource file"; + return; + } + + DataPartitionTable dataPartitionTable = + new DataPartitionTable(dataPartitionMap); + + databasePartitionTableMap.compute( + databaseName, + (k, v) -> { + if (v == null) { + return new DataPartitionTable(dataPartitionMap); + } + v.merge(dataPartitionTable); + return v; + }); + } catch (Exception e) { + LOG.error("Error processing data region: {}", dataRegion.getDatabaseName(), e); + failedTimePartitions.incrementAndGet(); + errorMessage = "Failed to process data region: " + e.getMessage(); + } + }, + executor); + futures.add(regionFuture); + } + + // Wait for all tasks to complete + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + status = TaskStatus.COMPLETED; + LOG.info( + "DataPartitionTable generation completed successfully. Processed: {}, Failed: {}", + processedTimePartitions.get(), + failedTimePartitions.get()); + } catch (Exception e) { + LOG.error("Failed to generate DataPartitionTable", e); + status = TaskStatus.FAILED; + errorMessage = "Generation failed: " + e.getMessage(); + } + } + + private void constructDataPartitionMap( + List seqTsFileList, + SeriesPartitionExecutor seriesPartitionExecutor, + Map dataPartitionMap) { + Set timeSlotIds = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + for (TsFileResource tsFileResource : seqTsFileList) { + long timeSlotId = tsFileResource.getTsFileID().timePartitionId; + try { + Set devices = tsFileResource.getDevices(limiter); + int regionId = tsFileResource.getTsFileID().regionId; + + TConsensusGroupId consensusGroupId = new TConsensusGroupId(); + consensusGroupId.setId(regionId); + consensusGroupId.setType(TConsensusGroupType.DataRegion); + + for (IDeviceID deviceId : devices) { + TSeriesPartitionSlot seriesSlotId = + seriesPartitionExecutor.getSeriesPartitionSlot(deviceId); + TTimePartitionSlot timePartitionSlot = + new TTimePartitionSlot(TimePartitionUtils.getStartTimeByPartitionId(timeSlotId)); + dataPartitionMap + .computeIfAbsent( + seriesSlotId, empty -> newSeriesPartitionTable(consensusGroupId, timeSlotId)) + .putDataPartition(timePartitionSlot, consensusGroupId); + } + if (!timeSlotIds.contains(timeSlotId)) { + timeSlotIds.add(timeSlotId); + processedTimePartitions.incrementAndGet(); + } + } catch (Exception e) { + if (!timeSlotIds.contains(timeSlotId)) { + timeSlotIds.add(timeSlotId); + failedTimePartitions.incrementAndGet(); + } + LOG.error("Failed to process tsfile {}, {}", tsFileResource.getTsFileID(), e.getMessage()); + } + } + + timeSlotIds.clear(); + } + + private static SeriesPartitionTable newSeriesPartitionTable( + TConsensusGroupId consensusGroupId, long timeSlotId) { + SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable(); + TTimePartitionSlot timePartitionSlot = + new TTimePartitionSlot(TimePartitionUtils.getStartTimeByPartitionId(timeSlotId)); + seriesPartitionTable.putDataPartition(timePartitionSlot, consensusGroupId); + return seriesPartitionTable; + } + + // Getters + public TaskStatus getStatus() { + return status; + } + + public String getErrorMessage() { + return errorMessage; + } + + public double getProgress() { + if (totalTimePartitions == 0) { + return 0.0; + } + return (double) (processedTimePartitions.get() + failedTimePartitions.get()) + / totalTimePartitions; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java index 9c44de9f5fdca..881e823ef2d67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java @@ -55,7 +55,10 @@ public enum OperationType { WRITE_AUDIT_LOG("writeAuditLog"), PREPARE_STATEMENT("prepareStatement"), EXECUTE_PREPARED_STATEMENT("executePreparedStatement"), - DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"); + DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"), + GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"), + GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"), + CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"); private final String name; OperationType(String name) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 42929be741819..e2bfe4f6ad9cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -61,8 +61,11 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.partition.DataPartitionTable; +import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable; import org.apache.iotdb.commons.path.ExtendedPartialPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; @@ -102,6 +105,7 @@ import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.partition.DataPartitionTableGenerator; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.client.cn.DnToCnInternalServiceAsyncRequestManager; @@ -193,6 +197,7 @@ import org.apache.iotdb.db.service.externalservice.ExternalServiceManagementService; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; @@ -201,6 +206,8 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -260,6 +267,10 @@ import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq; import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq; +import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp; +import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp; import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TInvalidateColumnCacheReq; @@ -317,11 +328,16 @@ import com.google.common.collect.ImmutableList; import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransport; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.NotImplementedException; +import org.apache.tsfile.external.commons.lang3.StringUtils; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.record.Tablet; @@ -347,6 +363,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -370,7 +387,6 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface { - private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class); @@ -3117,4 +3133,300 @@ public TSStatus writeAuditLog(TAuditLogReq req) { public void handleClientExit() { // Do nothing } + + // ==================================================== + // Data Partition Table Integrity Check Implementation + // ==================================================== + + private volatile DataPartitionTableGenerator currentGenerator; + private volatile CompletableFuture currentGeneratorFuture; + private volatile long currentTaskId = 0; + + @Override + public TGetEarliestTimeslotsResp getEarliestTimeslots() { + TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp(); + + try { + Map earliestTimeslots = new ConcurrentHashMap<>(); + processDataRegionForEarliestTimeslots(earliestTimeslots); + + resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + resp.setDatabaseToEarliestTimeslot(earliestTimeslots); + + LOGGER.info("Retrieved earliest timeslots for {} databases", earliestTimeslots.size()); + } catch (Exception e) { + LOGGER.error("Failed to get earliest timeslots", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.GET_EARLIEST_TIMESLOTS, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + + return resp; + } + + @Override + public TGenerateDataPartitionTableResp generateDataPartitionTable( + TGenerateDataPartitionTableReq req) { + TGenerateDataPartitionTableResp resp = new TGenerateDataPartitionTableResp(); + + try { + // Check if there's already a task in the progress + if (currentGenerator != null + && currentGenerator.getStatus() == DataPartitionTableGenerator.TaskStatus.IN_PROGRESS) { + resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode()); + resp.setMessage( + String.format( + "DataPartitionTable generation is already in the progress: %.1f%%", + currentGenerator.getProgress() * 100)); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + return resp; + } + + // Create generator for all data directories + int seriesSlotNum = IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum(); + String seriesPartitionExecutorClass = + IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(); + + final ExecutorService partitionTableRecoverExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()), + new IoTThreadFactory(ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName()), + ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + currentGenerator = + new DataPartitionTableGenerator( + partitionTableRecoverExecutor, + req.getDatabases(), + seriesSlotNum, + seriesPartitionExecutorClass); + currentTaskId = System.currentTimeMillis(); + + // Start generation synchronously for now to return the data partition table immediately + currentGeneratorFuture = currentGenerator.startGeneration(); + parseGenerationStatus(resp); + } catch (Exception e) { + LOGGER.error("Failed to generate DataPartitionTable", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.GENERATE_DATA_PARTITION_TABLE, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + + return resp; + } + + @Override + public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat() { + TGenerateDataPartitionTableHeartbeatResp resp = new TGenerateDataPartitionTableHeartbeatResp(); + // Must be lower than the RPC request timeout, in milliseconds + final long timeoutMs = 50000; + // Set default value + resp.setDatabaseScopedDataPartitionTables(Collections.emptyList()); + try { + // To resolve this situation that the DataNode is registered and didn't request + // generateDataPartitionTable interface yet. + if (currentGeneratorFuture == null || currentGenerator == null) { + resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode()); + resp.setMessage("No DataPartitionTable generation task found"); + resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + return resp; + } + + currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS); + + parseGenerationStatus(resp); + if (currentGenerator.getStatus().equals(DataPartitionTableGenerator.TaskStatus.COMPLETED)) { + boolean success = false; + List databaseScopedDataPartitionTableList = + new ArrayList<>(); + Map dataPartitionTableMap = + currentGenerator.getDatabasePartitionTableMap(); + if (!dataPartitionTableMap.isEmpty()) { + for (Map.Entry entry : dataPartitionTableMap.entrySet()) { + String database = entry.getKey(); + DataPartitionTable dataPartitionTable = entry.getValue(); + if (!StringUtils.isEmpty(database) && dataPartitionTable != null) { + DatabaseScopedDataPartitionTable databaseScopedDataPartitionTable = + new DatabaseScopedDataPartitionTable(database, dataPartitionTable); + databaseScopedDataPartitionTableList.add(databaseScopedDataPartitionTable); + success = true; + } + } + } + + if (success) { + List result = + serializeDatabaseScopedTableList(databaseScopedDataPartitionTableList); + resp.setDatabaseScopedDataPartitionTables(result); + + // Clear current generator + currentGenerator = null; + } + } + } catch (Exception e) { + LOGGER.error("Failed to check DataPartitionTable generation status", e); + resp.setStatus( + onIoTDBException( + e, + OperationType.CHECK_DATA_PARTITION_TABLE_STATUS, + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())); + } + return resp; + } + + private void parseGenerationStatus(Object resp) { + if (currentGenerator == null) { + return; + } + + switch (currentGenerator.getStatus()) { + case IN_PROGRESS: + setResponseFields( + resp, + DataPartitionTableGeneratorState.IN_PROGRESS.getCode(), + String.format( + "DataPartitionTable generation in progress: %.1f%%", + currentGenerator.getProgress() * 100), + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + LOGGER.info( + String.format( + "DataPartitionTable generation with task ID: %s in progress: %.1f%%", + currentTaskId, currentGenerator.getProgress() * 100)); + break; + case COMPLETED: + setResponseFields( + resp, + DataPartitionTableGeneratorState.SUCCESS.getCode(), + "DataPartitionTable generation completed successfully", + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + LOGGER.info("DataPartitionTable generation completed with task ID: {}", currentTaskId); + break; + case FAILED: + setResponseFields( + resp, + DataPartitionTableGeneratorState.FAILED.getCode(), + "DataPartitionTable generation failed: " + currentGenerator.getErrorMessage(), + RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + LOGGER.info("DataPartitionTable generation failed with task ID: {}", currentTaskId); + break; + default: + setResponseFields( + resp, + DataPartitionTableGeneratorState.UNKNOWN.getCode(), + "Unknown task status: " + currentGenerator.getStatus(), + RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR)); + LOGGER.info("DataPartitionTable generation failed with task ID: {}", currentTaskId); + break; + } + } + + private void setResponseFields(Object resp, int errorCode, String message, TSStatus status) { + if (resp instanceof TGenerateDataPartitionTableResp) { + ((TGenerateDataPartitionTableResp) resp).setErrorCode(errorCode); + ((TGenerateDataPartitionTableResp) resp).setMessage(message); + ((TGenerateDataPartitionTableResp) resp).setStatus(status); + } else if (resp instanceof TGenerateDataPartitionTableHeartbeatResp) { + ((TGenerateDataPartitionTableHeartbeatResp) resp).setErrorCode(errorCode); + ((TGenerateDataPartitionTableHeartbeatResp) resp).setMessage(message); + ((TGenerateDataPartitionTableHeartbeatResp) resp).setStatus(status); + } + } + + /** + * Scan the seq and unseq directory on every data region, then compute the earliest time slot id + * of database + */ + private void processDataRegionForEarliestTimeslots(Map earliestTimeslots) { + final Set ignoreDatabase = + new HashSet() { + { + add("root.__audit"); + add("root.__system"); + } + }; + List> futures = new ArrayList<>(); + final ExecutorService findEarliestTimeSlotExecutor = + new WrappedThreadPoolExecutor( + 0, + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum(), + 0L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>( + IoTDBDescriptor.getInstance().getConfig().getPartitionTableRecoverWorkerNum()), + new IoTThreadFactory(ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName()), + ThreadName.FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL.getName(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + for (DataRegion dataRegion : StorageEngine.getInstance().getAllDataRegions()) { + CompletableFuture regionFuture = + CompletableFuture.runAsync( + () -> { + TsFileManager tsFileManager = dataRegion.getTsFileManager(); + String databaseName = dataRegion.getDatabaseName(); + if (ignoreDatabase.contains(databaseName)) { + return; + } + + Set timePartitionIds = tsFileManager.getTimePartitions(); + final long earliestTimeSlotId = Collections.min(timePartitionIds); + earliestTimeslots.compute( + databaseName, + (k, v) -> v == null ? earliestTimeSlotId : Math.min(earliestTimeSlotId, v)); + }, + findEarliestTimeSlotExecutor); + futures.add(regionFuture); + } + + // Wait for all tasks to complete + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + LOGGER.info("Process data directory for earliestTimeslots completed successfully"); + } + + private long findEarliestTimeslotInFiles( + List seqTsFileList, long earliestTimeSlotId) { + for (TsFileResource tsFileResource : seqTsFileList) { + long timeSlotId = tsFileResource.getTsFileID().timePartitionId; + earliestTimeSlotId = + earliestTimeSlotId == Long.MIN_VALUE + ? timeSlotId + : Math.min(earliestTimeSlotId, timeSlotId); + } + + return earliestTimeSlotId; + } + + private List serializeDatabaseScopedTableList( + List list) { + if (list == null || list.isEmpty()) { + return Collections.emptyList(); + } + + List result = new ArrayList<>(list.size()); + + for (DatabaseScopedDataPartitionTable table : list) { + try (PublicBAOS baos = new PublicBAOS(); + DataOutputStream oos = new DataOutputStream(baos)) { + TTransport transport = new TIOStreamTransport(oos); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + table.serialize(oos, protocol); + result.add(ByteBuffer.wrap(baos.getBuf(), 0, baos.size())); + } catch (IOException | TException e) { + LOGGER.error( + "Failed to serialize DatabaseScopedDataPartitionTable for database: {}", + table.getDatabase(), + e); + } + } + + return result; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 1a85753de5c5c..bc81a6b28a851 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -50,6 +50,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.TimeIndexLevel; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; +import com.google.common.util.concurrent.RateLimiter; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.ITimeSeriesMetadata; @@ -677,6 +678,10 @@ public Set getDevices() { return timeIndex.getDevices(file.getPath(), this); } + public Set getDevices(RateLimiter limiter) { + return timeIndex.getDevices(file.getPath(), this, limiter); + } + public ArrayDeviceTimeIndex buildDeviceTimeIndex(IDeviceID.Deserializer deserializer) throws IOException { readLock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java index fb29083a7373c..caca8e9fdba44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import com.google.common.util.concurrent.RateLimiter; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Deserializer; import org.apache.tsfile.utils.FilePathUtils; @@ -171,6 +172,12 @@ public Set getDevices(String tsFilePath, TsFileResource tsFileResourc return deviceToIndex.keySet(); } + @Override + public Set getDevices( + String tsFilePath, TsFileResource tsFileResource, RateLimiter limiter) { + return deviceToIndex.keySet(); + } + public Map getDeviceToIndex() { return deviceToIndex; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java index e4a812012a8e3..2cf89a02626aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java @@ -21,10 +21,12 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.IOUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import com.google.common.util.concurrent.RateLimiter; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.utils.FilePathUtils; @@ -120,6 +122,45 @@ public Set getDevices(String tsFilePath, TsFileResource tsFileResourc } } + @Override + public Set getDevices( + String tsFilePath, TsFileResource tsFileResource, RateLimiter limiter) { + tsFileResource.readLock(); + try { + try (IOUtils.RatelimitedInputStream inputStream = + new IOUtils.RatelimitedInputStream( + FSFactoryProducer.getFSFactory() + .getBufferedInputStream(tsFilePath + TsFileResource.RESOURCE_SUFFIX), + limiter)) { + // The first byte is VERSION_NUMBER, second byte is timeIndexType. + byte[] bytes = ReadWriteIOUtils.readBytes(inputStream, 2); + + if (bytes[1] == ARRAY_DEVICE_TIME_INDEX_TYPE) { + return ArrayDeviceTimeIndex.getDevices(inputStream); + } else { + return PlainDeviceTimeIndex.getDevices(inputStream); + } + } + } catch (NoSuchFileException e) { + // deleted by ttl + if (tsFileResource.isDeleted()) { + return Collections.emptySet(); + } else { + logger.error( + "Can't read file {} from disk ", tsFilePath + TsFileResource.RESOURCE_SUFFIX, e); + throw new RuntimeException( + "Can't read file " + tsFilePath + TsFileResource.RESOURCE_SUFFIX + " from disk"); + } + } catch (Exception e) { + logger.error( + "Failed to get devices from tsfile: {}", tsFilePath + TsFileResource.RESOURCE_SUFFIX, e); + throw new RuntimeException( + "Failed to get devices from tsfile: " + tsFilePath + TsFileResource.RESOURCE_SUFFIX); + } finally { + tsFileResource.readUnlock(); + } + } + @Override public boolean endTimeEmpty() { return endTime == Long.MIN_VALUE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java index d705a2417d7c6..114a207d75794 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.load.PartitionViolationException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import com.google.common.util.concurrent.RateLimiter; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -74,6 +75,13 @@ ITimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer deseriali */ Set getDevices(String tsFilePath, TsFileResource tsFileResource); + /** + * get devices in TimeIndex and limit files reading rate + * + * @return device names + */ + Set getDevices(String tsFilePath, TsFileResource tsFileResource, RateLimiter limiter); + /** * @return whether end time is empty (Long.MIN_VALUE) */ diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index d89566013fd3b..341f72cfb0e54 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -742,6 +742,24 @@ failure_detector_phi_acceptable_pause_in_ms=10000 # Datatype: double(percentage) disk_space_warning_threshold=0.05 +# Purpose: for data partition repair +# The number of threads used for parallel scanning in the partition table recovery +# effectiveMode: restart +# Datatype: Integer +partition_table_recover_worker_num=10 + +# Purpose: for data partition repair +# Limit the number of bytes read per second from a file, the unit is MB +# effectiveMode: restart +# Datatype: Integer +partition_table_recover_max_read_megabytes_per_second=10 + +# Purpose: for data partition repair +# Set a timeout to wait for all datanodes complete startup, the unit is ms +# effectiveMode: restart +# Datatype: Integer +partition_table_recover_wait_all_dn_up_timeout_ms=60000 + #################### ### Memory Control Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 05b9672c2aae2..81f2aa7156cf7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -203,6 +203,8 @@ public enum ThreadName { TABLE_SIZE_INDEX_RECORD("TableSizeIndexRecord"), BINARY_ALLOCATOR_SAMPLE_EVICTOR("BinaryAllocator-SampleEvictor"), BINARY_ALLOCATOR_AUTO_RELEASER("BinaryAllocator-Auto-Releaser"), + FIND_EARLIEST_TIME_SLOT_PARALLEL_POOL("FindEarliestTimeSlot-Parallel-Pool"), + DATA_PARTITION_RECOVER_PARALLEL_POOL("DataPartitionRecover-Parallel-Pool"), // the unknown thread name is used for metrics UNKNOWN("UNKNOWN"); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java new file mode 100644 index 0000000000000..93cca687799fc --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.enums; + +public enum DataPartitionTableGeneratorState { + SUCCESS(0), + FAILED(1), + IN_PROGRESS(2), + UNKNOWN(-1); + + private final int code; + + DataPartitionTableGeneratorState(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + /** + * get DataPartitionTableGeneratorState by code + * + * @param code code + * @return DataPartitionTableGeneratorState + */ + public static DataPartitionTableGeneratorState getStateByCode(int code) { + for (DataPartitionTableGeneratorState state : DataPartitionTableGeneratorState.values()) { + if (state.code == code) { + return state; + } + } + return UNKNOWN; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java index 91346f0c69c85..d154f1813e1b7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java @@ -282,6 +282,48 @@ public Set autoCleanPartitionTable( return removedTimePartitionSlots; } + /** + * Merge a complete DataPartitionTable from the partition tables received from multiple DataNodes + * (supports cross-database merging, which is exactly the logic implemented in the current PR) + * + * @param sourceMap Map + * @return The complete merged partition table + */ + public DataPartitionTable merge(Map sourceMap) { + DataPartitionTable merged = new DataPartitionTable(this.dataPartitionMap); + for (DataPartitionTable table : sourceMap.values()) { + for (Map.Entry entry : + table.dataPartitionMap.entrySet()) { + TSeriesPartitionSlot slot = entry.getKey(); + SeriesPartitionTable seriesTable = entry.getValue(); + merged + .dataPartitionMap + .computeIfAbsent(slot, k -> new SeriesPartitionTable()) + .merge(seriesTable); + } + } + return merged; + } + + /** + * Support single table merging Merge another DataPartitionTable into the current object (used for + * incremental merging) + */ + public DataPartitionTable merge(DataPartitionTable sourcePartitionTable) { + DataPartitionTable merged = new DataPartitionTable(this.dataPartitionMap); + if (sourcePartitionTable == null) { + return merged; + } + for (Map.Entry entry : + sourcePartitionTable.dataPartitionMap.entrySet()) { + merged + .dataPartitionMap + .computeIfAbsent(entry.getKey(), k -> new SeriesPartitionTable()) + .merge(entry.getValue()); + } + return merged; + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(dataPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java new file mode 100644 index 0000000000000..a47f4024eac88 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DatabaseScopedDataPartitionTable.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.partition; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class DatabaseScopedDataPartitionTable { + private final String database; + private DataPartitionTable dataPartitionTable; + + public DatabaseScopedDataPartitionTable(String database, DataPartitionTable dataPartitionTable) { + this.database = database; + this.dataPartitionTable = dataPartitionTable; + } + + public String getDatabase() { + return database; + } + + public DataPartitionTable getDataPartitionTable() { + return dataPartitionTable; + } + + public void serialize(OutputStream outputStream, TProtocol protocol) + throws IOException, TException { + ReadWriteIOUtils.write(database, outputStream); + + ReadWriteIOUtils.write(dataPartitionTable != null, outputStream); + + if (dataPartitionTable != null) { + dataPartitionTable.serialize(outputStream, protocol); + } + } + + public static DatabaseScopedDataPartitionTable deserialize(ByteBuffer buffer) { + String database = ReadWriteIOUtils.readString(buffer); + + boolean hasDataPartitionTable = ReadWriteIOUtils.readBool(buffer); + + DataPartitionTable dataPartitionTable = null; + if (hasDataPartitionTable) { + dataPartitionTable = new DataPartitionTable(); + dataPartitionTable.deserialize(buffer); + } + + return new DatabaseScopedDataPartitionTable(database, dataPartitionTable); + } + + public static DatabaseScopedDataPartitionTable deserialize( + InputStream inputStream, TProtocol protocol) throws IOException, TException { + String database = ReadWriteIOUtils.readString(inputStream); + + boolean hasDataPartitionTable = ReadWriteIOUtils.readBool(inputStream); + + DataPartitionTable dataPartitionTable = null; + if (hasDataPartitionTable) { + dataPartitionTable = new DataPartitionTable(); + dataPartitionTable.deserialize(inputStream, protocol); + } + + return new DatabaseScopedDataPartitionTable(database, dataPartitionTable); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DatabaseScopedDataPartitionTable that = (DatabaseScopedDataPartitionTable) o; + return Objects.equals(database, that.database) + && Objects.equals(dataPartitionTable, that.dataPartitionTable); + } + + @Override + public int hashCode() { + return Objects.hash(database, dataPartitionTable); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java index f46344566dc32..915e3df4e329a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java @@ -73,7 +73,13 @@ public Map> getSeriesPartitionMap() } public void putDataPartition(TTimePartitionSlot timePartitionSlot, TConsensusGroupId groupId) { - seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new Vector<>()).add(groupId); + List groupList = + seriesPartitionMap.computeIfAbsent(timePartitionSlot, empty -> new Vector<>()); + synchronized (groupList) { + if (!groupList.contains(groupId)) { + groupList.add(groupId); + } + } } /** @@ -270,6 +276,14 @@ public List autoCleanPartitionTable( return removedTimePartitions; } + public void merge(SeriesPartitionTable sourceMap) { + if (sourceMap == null) return; + sourceMap.seriesPartitionMap.forEach( + (timeSlot, groups) -> { + this.seriesPartitionMap.computeIfAbsent(timeSlot, k -> new ArrayList<>()).addAll(groups); + }); + } + public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { ReadWriteIOUtils.write(seriesPartitionMap.size(), outputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java index 8b63d29bd786e..d234c6be30518 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/IOUtils.java @@ -25,10 +25,12 @@ import org.apache.iotdb.commons.path.PartialPath; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.RateLimiter; import java.io.DataInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Map; @@ -289,4 +291,37 @@ public static Optional retryNoException( } return Optional.empty(); } + + public static class RatelimitedInputStream extends InputStream { + private RateLimiter rateLimiter; + private InputStream inner; + + public RatelimitedInputStream(InputStream inner, RateLimiter limiter) { + this.inner = inner; + this.rateLimiter = limiter; + } + + @Override + public int read() throws IOException { + rateLimiter.acquire(1); + return inner.read(); + } + + @Override + public int read(byte[] b) throws IOException { + rateLimiter.acquire(b.length); + return inner.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + rateLimiter.acquire(len); + return inner.read(b, off, len); + } + + @Override + public void close() throws IOException { + inner.close(); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java index eb53cdb2798dd..250a347d1496b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java @@ -122,6 +122,10 @@ public static long getTimePartitionIdWithoutOverflow(long time) { return partitionId.longValue(); } + public static long getStartTimeByPartitionId(long partitionId) { + return (partitionId * timePartitionInterval) + timePartitionOrigin; + } + public static boolean satisfyPartitionId(long startTime, long endTime, long partitionId) { long startPartition = originMayCauseOverflow diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index cca7110f28d40..84d4c2a1c0b11 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -678,6 +678,36 @@ struct TAuditLogReq { 11: required i32 cnId } +/** +* BEGIN: Data Partition Table Integrity Check Structures +**/ + +struct TGetEarliestTimeslotsResp { + 1: required common.TSStatus status + 2: optional map databaseToEarliestTimeslot +} + +struct TGenerateDataPartitionTableReq { + 1: required set databases +} + +struct TGenerateDataPartitionTableResp { + 1: required common.TSStatus status + 2: required i32 errorCode + 3: optional string message +} + +struct TGenerateDataPartitionTableHeartbeatResp { + 1: required common.TSStatus status + 2: required i32 errorCode + 3: optional string message + 4: optional list databaseScopedDataPartitionTables +} + +/** +* END: Data Partition Table Integrity Check Structures +**/ + /** * BEGIN: Used for EXPLAIN ANALYZE **/ @@ -1276,6 +1306,30 @@ service IDataNodeRPCService { * Write an audit log entry to the DataNode's AuditEventLogger */ common.TSStatus writeAuditLog(TAuditLogReq req); + + /** + * BEGIN: Data Partition Table Integrity Check + **/ + + /** + * Get earliest timeslot information from DataNode + * Returns map of database name to earliest timeslot id + */ + TGetEarliestTimeslotsResp getEarliestTimeslots() + + /** + * Request DataNode to generate DataPartitionTable by scanning tsfile resources + */ + TGenerateDataPartitionTableResp generateDataPartitionTable(TGenerateDataPartitionTableReq req) + + /** + * Check the status of DataPartitionTable generation task + */ + TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat() + + /** + * END: Data Partition Table Integrity Check + **/ } service MPPDataExchangeService {