[IoTV2]: Pick deletion event for historical resend#17329
Merged
jt2594838 merged 1 commit intoapache:masterfrom Mar 20, 2026
Merged
[IoTV2]: Pick deletion event for historical resend#17329jt2594838 merged 1 commit intoapache:masterfrom
jt2594838 merged 1 commit intoapache:masterfrom
Conversation
Collaborator
Pengzna
commented
Mar 20, 2026
- Historical deletion events missing replicateIndex, silently dropped by Processor.
- RemoveEventFromBuffer removes wrong event when target not found.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR addresses IoTConsensusV2 historical resend correctness by ensuring historical deletion events participate in IoTV2 replication tracking, and by preventing accidental removal of the wrong buffered event in the async sink.
Changes:
- Assign
replicateIndexForIoTV2to historical deletion events so they aren’t dropped byIoTConsensusV2Processor. - Fix
IoTConsensusV2AsyncSink.removeEventFromBufferto avoid removing an unrelated event when the target event is not found.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java |
Assigns IoTV2 replicateIndex for historical deletion events to ensure they can be forwarded by the IoTV2 processor. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java |
Prevents removing the wrong element from transferBuffer when the requested event is not present. |
Comments suppressed due to low confidence (1)
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2AsyncSink.java:252
removeEventFromBuffernow correctly avoids removing the wrong element when the target event isn’t found, but it still updatescurrentReplicateProgressand (more importantly) callsevent.decreaseReferenceCount(...)even in the not-found branch. If the event is still intransferBuffer(e.g., equals mismatch) this can prematurely release resources while the queued event remains; if it was already removed earlier it can also drive the reference count negative. Consider only decreasing the connector-held reference (and possibly updating progress) when the event was actually removed, or alternatively remove/decrease based on the exact buffered element found.
if (current.equalsInIoTConsensusV2(event)) {
iterator.remove();
} else {
LOGGER.warn(
"IoTConsensusV2-ConsensusGroup-{}: event-{} not found in transferBuffer, skip removing. queue size = {}",
consensusGroupId,
event,
transferBuffer.size());
}
// update replicate progress
currentReplicateProgress =
Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2());
// decrease reference count
event.decreaseReferenceCount(IoTConsensusV2AsyncSink.class.getName(), true);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.