Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
682dd8b
incremental-compaction-mode
cecemei Feb 26, 2026
86ee6b8
style
cecemei Feb 27, 2026
bdc28ac
pending
cecemei Feb 27, 2026
9a09e43
format
cecemei Feb 27, 2026
14294a2
Merge branch 'master' into compact2.2
cecemei Feb 28, 2026
3a0bd79
build
cecemei Feb 28, 2026
3eac326
UncompactedInputSpec
cecemei Feb 28, 2026
ba090ba
test
cecemei Feb 28, 2026
64473a7
uncompacted
cecemei Feb 28, 2026
407e328
format
cecemei Feb 28, 2026
df25187
format2
cecemei Feb 28, 2026
0134e15
format
cecemei Feb 28, 2026
c57b7f6
fix
cecemei Feb 28, 2026
1d19c85
review
cecemei Mar 3, 2026
a7951be
Update indexing-service/src/main/java/org/apache/druid/indexing/commo…
cecemei Mar 3, 2026
4e888b7
Update indexing-service/src/main/java/org/apache/druid/indexing/commo…
cecemei Mar 3, 2026
2ed63ae
set
cecemei Mar 3, 2026
ed32beb
test
cecemei Mar 4, 2026
aaf5ad4
review
cecemei Mar 4, 2026
cd49144
minor
cecemei Mar 5, 2026
f732719
bug
cecemei Mar 5, 2026
e703f29
checkstyle
cecemei Mar 5, 2026
1a055ba
format
cecemei Mar 5, 2026
2b0b871
Merge remote-tracking branch 'origin/master' into compact2.2
cecemei Mar 6, 2026
fb628ee
Update server/src/main/java/org/apache/druid/metadata/IndexerSQLMetad…
cecemei Mar 9, 2026
3d091c9
Merge remote-tracking branch 'origin/master' into compact2.2
cecemei Mar 9, 2026
0131c04
review
cecemei Mar 9, 2026
826110c
review
cecemei Mar 9, 2026
2852f17
Merge branch 'master' into compact2.2
cecemei Mar 10, 2026
45b16b6
Merge branch 'master' into compact2.2
cecemei Mar 11, 2026
4ceeb51
build
cecemei Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;

import java.util.Map;
import java.util.Set;

/**
* Task action that records segments as being upgraded in the metadata store.
* <p>
* This action is used during compaction to track which segments are being replaced.
* It validates that all segments to be upgraded are covered by
* {@link ReplaceTaskLock}s before inserting them into the upgrade segments table.
* <p>
* The action will fail if any of the upgrade segments do not have a corresponding
* replace lock, ensuring that only properly locked segments can be marked for upgrade.
*
* @return the number of segments successfully inserted into the upgrade segments table
*/
public class MarkSegmentToUpgradeAction implements TaskAction<Integer>
{
private final String dataSource;
private final Set<DataSegment> upgradeSegments;

/**
* @param dataSource the datasource containing the segments to upgrade
* @param upgradeSegments the set of segments to be recorded as upgraded
*/
@JsonCreator
public MarkSegmentToUpgradeAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("upgradeSegments") Set<DataSegment> upgradeSegments
)
{
this.dataSource = dataSource;
this.upgradeSegments = upgradeSegments;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@JsonProperty
public Set<DataSegment> getUpgradeSegments()
{
return upgradeSegments;
}

@Override
public TypeReference<Integer> getReturnTypeReference()
{
return new TypeReference<>()
{
};
}

@Override
public Integer perform(Task task, TaskActionToolbox toolbox)
{
final String datasource = task.getDataSource();
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), upgradeSegments);

if (segmentToReplaceLock.size() < upgradeSegments.size()) {
throw InvalidInput.exception(
"Segments to upgrade must be covered by a REPLACE lock. Only [%d] out of [%d] segments are covered.",
segmentToReplaceLock.size(),
upgradeSegments.size()
);
}

return toolbox.getIndexerMetadataStorageCoordinator()
.insertIntoUpgradeSegmentsTable(segmentToReplaceLock);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
@JsonSubTypes.Type(name = "markSegmentsToUpgrade", value = MarkSegmentToUpgradeAction.class),
@JsonSubTypes.Type(name = "retrieveSegmentsById", value = RetrieveSegmentsByIdAction.class),
@JsonSubTypes.Type(name = "retrieveUpgradedFromSegmentIds", value = RetrieveUpgradedFromSegmentIdsAction.class),
@JsonSubTypes.Type(name = "retrieveUpgradedToSegmentIds", value = RetrieveUpgradedToSegmentIdsAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = CompactionIntervalSpec.TYPE, value = CompactionIntervalSpec.class),
@Type(name = MinorCompactionInputSpec.TYPE, value = MinorCompactionInputSpec.class),
@Type(name = SpecificSegmentsSpec.TYPE, value = SpecificSegmentsSpec.class)
})
public interface CompactionInputSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.joda.time.Interval;

import java.util.Map;

Expand All @@ -47,7 +47,7 @@ public interface CompactionRunner
*/
TaskStatus runCompactionTasks(
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalDataSchemaMap,
Map<QuerySegmentSpec, DataSchema> inputSchemas,
TaskToolbox taskToolbox
) throws Exception;

Expand All @@ -59,7 +59,7 @@ TaskStatus runCompactionTasks(
*/
CompactionConfigValidationResult validateCompactionTask(
CompactionTask compactionTask,
Map<Interval, DataSchema> intervalToDataSchemaMap
Map<QuerySegmentSpec, DataSchema> inputSchemas
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentToUpgradeAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
Expand All @@ -77,7 +78,11 @@
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata;
Expand All @@ -102,6 +107,7 @@
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
Expand All @@ -127,6 +133,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

/**
* The client representation of this task is {@link ClientCompactionTaskQuery}. JSON
Expand Down Expand Up @@ -518,7 +525,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
emitMetric(toolbox.getEmitter(), "ingest/count", 1);

final Map<Interval, DataSchema> intervalDataSchemas = createDataSchemasForIntervals(
final Map<QuerySegmentSpec, DataSchema> inputSchemas = createInputDataSchemas(
toolbox,
getTaskLockHelper().getLockGranularityToUse(),
segmentProvider,
Expand All @@ -534,22 +541,26 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
CompactionConfigValidationResult supportsCompactionConfig = compactionRunner.validateCompactionTask(
this,
intervalDataSchemas
inputSchemas
);
if (!supportsCompactionConfig.isValid()) {
throw InvalidInput.exception("Compaction spec not supported. Reason[%s].", supportsCompactionConfig.getReason());
}
return compactionRunner.runCompactionTasks(this, intervalDataSchemas, toolbox);
return compactionRunner.runCompactionTasks(this, inputSchemas, toolbox);
}

/**
* Generate dataschema for segments in each interval.
* Creates input data schemas for compaction by grouping segments and generating {@link DataSchema}s.
* When segment granularity is not specified, preserves original granularity and creates a schema
* for each unified interval. When segment granularity is specified, creates a single schema for all
* segments. For minor compaction, validates that all segments are completely within the target
* interval and submits already-compacted segments via {@link MarkSegmentToUpgradeAction} for direct upgrade.
*
* @throws IOException if an exception occurs whie retrieving used segments to
* determine schemas.
* @return map from {@link QuerySegmentSpec} to {@link DataSchema} for each group of segments to compact
* @throws IOException if an exception occurs while retrieving segments
*/
@VisibleForTesting
static Map<Interval, DataSchema> createDataSchemasForIntervals(
static Map<QuerySegmentSpec, DataSchema> createInputDataSchemas(
final TaskToolbox toolbox,
final LockGranularity lockGranularityInUse,
final SegmentProvider segmentProvider,
Expand All @@ -572,17 +583,45 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
return Collections.emptyMap();
}

if (segmentProvider.minorCompaction) {
Iterable<DataSegment> segmentsNotCompletelyWithinin =
Iterables.filter(timelineSegments, s -> !segmentProvider.interval.contains(s.getInterval()));
if (segmentsNotCompletelyWithinin.iterator().hasNext()) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
"Minor compaction doesn't allow segments not completely within interval[%s]",
segmentProvider.interval
);
}
}

if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) {
Map<Interval, DataSchema> intervalDataSchemaMap = new HashMap<>();
Map<QuerySegmentSpec, DataSchema> inputSchemas = new HashMap<>();
// if segment is already compacted in minor compaction, they need to be upgraded directly, supported in MSQ
Set<DataSegment> segmentsToUpgrade = new HashSet<>();

// original granularity
final Map<Interval, List<DataSegment>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);

for (final DataSegment dataSegment : timelineSegments) {
intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new ArrayList<>())
.add(dataSegment);
if (segmentProvider.shouldUpgradeSegment(dataSegment)) {
segmentsToUpgrade.add(dataSegment);
} else {
intervalToSegments.computeIfAbsent(dataSegment.getInterval(), k -> new ArrayList<>())
.add(dataSegment);
}
}
if (!segmentsToUpgrade.isEmpty()) {
log.info(
"Marking [%d]segments to upgrade, showing the first 10 segments:%s",
segmentsToUpgrade.size(),
segmentsToUpgrade.stream().map(DataSegment::getId).map(SegmentId::toString).limit(10L)
);
toolbox.getTaskActionClient()
.submit(new MarkSegmentToUpgradeAction(segmentProvider.dataSource, segmentsToUpgrade));
}

// unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec
Expand Down Expand Up @@ -627,18 +666,45 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
projections,
needMultiValuedColumns
);
intervalDataSchemaMap.put(interval, dataSchema);
final QuerySegmentSpec querySegmentSpec;
if (segmentProvider.minorCompaction) {
querySegmentSpec = new MultipleSpecificSegmentSpec(segmentsToCompact.stream()
.map(DataSegment::toDescriptor)
.collect(Collectors.toList()));
} else {
querySegmentSpec = new MultipleIntervalSegmentSpec(List.of(interval));
}
inputSchemas.put(querySegmentSpec, dataSchema);
}
return intervalDataSchemaMap;
return inputSchemas;
} else {
// given segment granularity
Set<DataSegment> segmentsToUpgrade = new HashSet<>();
Iterables.addAll(segmentsToUpgrade, Iterables.filter(
timelineSegments,
segmentProvider::shouldUpgradeSegment
));
if (!segmentsToUpgrade.isEmpty()) {
log.info(
"Marking [%d]segments to upgrade, showing the first 10 segments:%s",
segmentsToUpgrade.size(),
segmentsToUpgrade.stream().map(DataSegment::getId).map(SegmentId::toString).limit(10L)
);
toolbox.getTaskActionClient()
.submit(new MarkSegmentToUpgradeAction(segmentProvider.dataSource, segmentsToUpgrade));
}

final Iterable<DataSegment> segmentsToCompact = Iterables.filter(
timelineSegments,
s -> !segmentProvider.shouldUpgradeSegment(s)
);
final DataSchema dataSchema = createDataSchema(
toolbox.getEmitter(),
metricBuilder,
segmentProvider.dataSource,
JodaUtils.umbrellaInterval(Iterables.transform(timelineSegments, DataSegment::getInterval)),
lazyFetchSegments(
timelineSegments,
segmentsToCompact,
toolbox.getSegmentCacheManager()
),
dimensionsSpec,
Expand All @@ -648,7 +714,15 @@ static Map<Interval, DataSchema> createDataSchemasForIntervals(
projections,
needMultiValuedColumns
);
return Collections.singletonMap(segmentProvider.interval, dataSchema);
final QuerySegmentSpec querySegmentSpec;
if (segmentProvider.minorCompaction) {
querySegmentSpec = new MultipleSpecificSegmentSpec(StreamSupport.stream(segmentsToCompact.spliterator(), false)
.map(DataSegment::toDescriptor)
.collect(Collectors.toList()));
} else {
querySegmentSpec = new MultipleIntervalSegmentSpec(List.of(segmentProvider.interval));
}
return Map.of(querySegmentSpec, dataSchema);
}
}

Expand All @@ -658,8 +732,7 @@ private static Iterable<DataSegment> retrieveRelevantTimelineHolders(
LockGranularity lockGranularityInUse
) throws IOException
{
final List<DataSegment> usedSegments =
segmentProvider.findSegments(toolbox.getTaskActionClient());
final List<DataSegment> usedSegments = segmentProvider.findSegments(toolbox.getTaskActionClient());
segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = SegmentTimeline
.forSegments(usedSegments)
Expand Down Expand Up @@ -1218,11 +1291,30 @@ static class SegmentProvider
private final CompactionInputSpec inputSpec;
private final Interval interval;

private final boolean minorCompaction;
private final Set<SegmentDescriptor> uncompactedSegments;

SegmentProvider(String dataSource, CompactionInputSpec inputSpec)
{
this.dataSource = Preconditions.checkNotNull(dataSource);
this.inputSpec = inputSpec;
this.interval = inputSpec.findInterval(dataSource);
if (inputSpec instanceof MinorCompactionInputSpec) {
minorCompaction = true;
uncompactedSegments = Set.copyOf(((MinorCompactionInputSpec) inputSpec).getUncompactedSegments());
} else {
minorCompaction = false;
uncompactedSegments = null;
}
}

private boolean shouldUpgradeSegment(DataSegment s)
{
if (minorCompaction) {
return !uncompactedSegments.contains(s.toDescriptor()) && this.interval.contains(s.getInterval());
} else {
return false;
}
}

List<DataSegment> findSegments(TaskActionClient actionClient) throws IOException
Expand Down
Loading
Loading