add minor compaction for Overlord based compaction supervisors using the MSQ engine#19059
add minor compaction for Overlord based compaction supervisors using the MSQ engine#19059cecemei merged 31 commits intoapache:masterfrom
Conversation
| final DataSegment segment = new DataSegment( | ||
| "foo", | ||
| Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)), | ||
| "2023-01-0" + i, | ||
| ImmutableMap.of("path", "a-" + i), | ||
| ImmutableList.of("dim1"), | ||
| ImmutableList.of("m1"), | ||
| new DimensionRangeShardSpec(List.of("dim1"), null, null, i - 1, 8), | ||
| 9, | ||
| 100 | ||
| ); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note test
capistrant
left a comment
There was a problem hiding this comment.
This is awesome. left some minor comments
server/src/main/java/org/apache/druid/server/compaction/CompactionMode.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
Outdated
Show resolved
Hide resolved
| @@ -282,18 +283,20 @@ private boolean startJobIfPendingAndReady( | |||
| } | |||
|
|
|||
| // Check if the job is already running, completed or skipped | |||
There was a problem hiding this comment.
nit: this comment might need a refresh to reflect how the status handling has changed versus the former code. I was confused about why PENDING and COMPLETE resulted in a throw until I reviewed deriveCompactionStatus
There was a problem hiding this comment.
there's no logic change here, just made the code more defensive. updated the comment here. ihmo, CompactionStatus is not the best class to use here.
There was a problem hiding this comment.
actually i guess there's some logic change, the policy check has been moved to CompactionConfigBasedJobTemplate since creating the job needs to know the compaction mode (which is decided by the policy), but still the computeCompactionStatus only returns running/skipped/pending state.
There was a problem hiding this comment.
should we update the javadoc to call out how this method now offers special handling for incremental compaction that includes upgrading already compacted segments?
...ing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentUpgradeAction.java
Outdated
Show resolved
Hide resolved
...e/src/test/java/org/apache/druid/indexing/common/actions/MarkSegmentToUpgradeActionTest.java
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/timeline/partition/LinearShardSpec.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
kfaraz
left a comment
There was a problem hiding this comment.
Leaving a partial review for the supporting changes.
Will review the core changes in CompactionTask and IndexerSQLMetadataStorageCoordinator shortly.
server/src/main/java/org/apache/druid/server/compaction/CompactionMode.java
Outdated
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Outdated
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Outdated
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Outdated
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
Outdated
Show resolved
Hide resolved
...ervice/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
Outdated
Show resolved
Hide resolved
...ervice/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
Outdated
Show resolved
Hide resolved
...rvice/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentToUpgradeAction.java
Outdated
Show resolved
Hide resolved
...rvice/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentToUpgradeAction.java
Outdated
Show resolved
Hide resolved
...ng-service/src/main/java/org/apache/druid/indexing/common/task/MinorCompactionInputSpec.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/UncompactedInputSpec.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
| final ShardSpec shardSpec = oldSegment.getShardSpec().withPartitionNum(partitionNum); | ||
| // Create upgraded segment with the correct interval, version and shard spec | ||
| String lockVersion = upgradeSegmentToLockVersion.get(oldSegment.getId().toString()); | ||
| DataSegment dataSegment = DataSegment.builder(oldSegment) |
There was a problem hiding this comment.
Use the final shardSpec to build this DataSegment.
There was a problem hiding this comment.
not sure i understand, the builder is already using the final shardSpec.
There was a problem hiding this comment.
Yes, but the number of core partitions is updated later in the code flow, right?
That would effectively change the shard spec.
Even though the number of core partitions is not used to determine segment ID, it would still be cleaner to update the inner DataSegment object (or maybe construct it lazily) so that it uses the correct shard spec.
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Outdated
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Outdated
Show resolved
Hide resolved
…n/task/MinorCompactionInputSpec.java Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
…n/actions/MarkSegmentToUpgradeAction.java Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
Outdated
Show resolved
Hide resolved
| kafkaServer.deleteTopic(topic1); | ||
| cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec()); | ||
|
|
||
| long totalUsed = overlord.latchableEmitter().getMetricValues( |
There was a problem hiding this comment.
This need not give the correct value of total used segments currently present in the cache.
This metric is emitted after every cache sync, and as such the total aggregate would be higher than the actual number of segments.
There was a problem hiding this comment.
the last value (not aggregate) should be number of used segments right?
There was a problem hiding this comment.
Yes, but there can be race conditions. There might be some delay before the cache is synced with the metadata store. It is better to wait for the actual event you want.
| waitForAllCompactionTasksToFinish(); | ||
|
|
||
| // wait for new segments have been updated to the cache | ||
| overlord.latchableEmitter().waitForEvent( |
There was a problem hiding this comment.
If the intent is to wait for the next cache sync, do the following instead:
overlord.latchableEmitter().waitForNextEvent(
event -> event.hasMetricName("segment/metadataCache/sync/time")
)There was a problem hiding this comment.
wouldn't that be the same to wait for the published segments to show up in the sync?
There was a problem hiding this comment.
Yes, this check itself is correct. The problem is the value of totalUsed that this check depends on.
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Show resolved
Hide resolved
...-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
Show resolved
Hide resolved
…ataStorageCoordinator.java Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
Description
This PR introduces minor compaction support in msq engine to Apache Druid, enabling compaction operations on only uncompacted segments within an interval while upgrading already-compacted segments for consistency.
Key Changes
A new CompactionInputSpec implementation specifically for minor compaction:
MarkSegmentToUpgradeAction
A new task action that updates segment metadata without rewriting data, modifying partition numbers and shard specifications of already-compacted segments.
ShardSpec Interface Enhancements
Extended with mutation methods including withPartitionNum(), withCorePartitions(), and isNumChunkSupported() checks.
Policy-Level Control
Introduced minUncompactedBytesPercentForFullCompaction threshold in MostFragmentedIntervalFirstPolicy determining when incremental versus full compaction applies based on uncompacted-to-total segment byte ratios.
Builder Pattern Implementation
Replaced verbose constructors in UserCompactionTaskQueryTuningConfig with a builder pattern for improved readability.
Release Notes
This feature enables compacting newly ingested segments while upgrading already compacted data, with configuration through percentage thresholds in compaction policies. Support is currently limited to Overlord based compaction supervisors using the MSQ engine.
This PR has: