From a190659ae2fea77f53078d127296af4cc9058e28 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Fri, 20 Mar 2026 15:35:37 +0800 Subject: [PATCH] fix: pick deletion event for historical resend --- .../iotconsensusv2/IoTConsensusV2AsyncSink.java | 10 +++++++++- ...eHistoricalDataRegionTsFileAndDeletionSource.java | 12 ++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java index 3001c40b16fb..7cdeef9e826a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java @@ -236,7 +236,15 @@ public synchronized void removeEventFromBuffer(EnrichedEvent event) { while (!current.equalsInIoTConsensusV2(event) && iterator.hasNext()) { current = iterator.next(); } - iterator.remove(); + if (current.equalsInIoTConsensusV2(event)) { + iterator.remove(); + } else { + LOGGER.warn( + "IoTConsensusV2-ConsensusGroup-{}: event-{} not found in transferBuffer, skip removing. queue size = {}", + consensusGroupId, + event, + transferBuffer.size()); + } // update replicate progress currentReplicateProgress = Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 809b40f6eba0..32fcece41158 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -930,6 +930,18 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) { skipIfNoPrivileges, false); + // if using IoTV2, assign a replicateIndex for this historical deletion event + if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2 + && IoTConsensusV2Processor.isShouldReplicate(event)) { + event.setReplicateIndexForIoTV2( + ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName)); + LOGGER.debug( + "[{}]Set {} for historical deletion event {}", + pipeName, + event.getReplicateIndexForIoTV2(), + event); + } + if (sloppyPattern || isDbNameCoveredByPattern) { event.skipParsingPattern(); }