Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
Expand All @@ -68,6 +71,8 @@
/** Default implementation of {@link FileStoreScan}. */
public abstract class AbstractFileStoreScan implements FileStoreScan {

private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreScan.class);

private final ManifestsReader manifestsReader;
private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
Expand Down Expand Up @@ -277,9 +282,6 @@ public Plan plan() {
List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;

Iterator<ManifestEntry> iterator = readManifestEntries(manifests, false);
if (supportsLimitPushManifestEntries()) {
iterator = limitPushManifestEntries(iterator);
}

List<ManifestEntry> files = ListUtils.toList(iterator);
if (postFilterManifestEntriesEnabled()) {
Expand All @@ -289,6 +291,10 @@ public Plan plan() {
List<ManifestEntry> result = files;

long scanDuration = (System.nanoTime() - started) / 1_000_000;
LOG.info(
"File store scan plan completed in {} ms. Files size : {}",
scanDuration,
result.size());
if (scanMetrics != null) {
long allDataFiles =
manifestsResult.allManifests.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -92,9 +94,21 @@ public boolean supportsLimitPushManifestEntries() {
}

@Override
protected Iterator<ManifestEntry> limitPushManifestEntries(Iterator<ManifestEntry> entries) {
protected boolean postFilterManifestEntriesEnabled() {
return supportsLimitPushManifestEntries();
}

@Override
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> entries) {
checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled);
return new LimitAwareManifestEntryIterator(entries, limit);
// Use LimitAwareManifestEntryIterator for limit pushdown
Iterator<ManifestEntry> iterator =
new LimitAwareManifestEntryIterator(entries.iterator(), limit);
List<ManifestEntry> result = new ArrayList<>();
while (iterator.hasNext()) {
result.add(iterator.next());
}
return result;
}

/** Note: Keep this thread-safe. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FilteredManifestEntry;
Expand All @@ -37,16 +38,19 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
Expand All @@ -55,6 +59,8 @@
/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
public class KeyValueFileStoreScan extends AbstractFileStoreScan {

private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreScan.class);

private final SimpleStatsEvolutions fieldKeyStatsConverters;
private final SimpleStatsEvolutions fieldValueStatsConverters;
private final BucketSelectConverter bucketSelectConverter;
Expand Down Expand Up @@ -203,29 +209,162 @@ private boolean isValueFilterEnabled() {
}
}

/**
* Check if limit pushdown is supported for PK tables.
*
* <p>Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need merge) or deletion
* vectors are enabled (can't count deleted rows). For DEDUPLICATE/FIRST_ROW, per-bucket checks
* (no overlapping, no delete rows) are done in applyLimitPushdownForBucket.
*/
@Override
public boolean supportsLimitPushManifestEntries() {
if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE) {
return false;
}

return limit != null && limit > 0 && !deletionVectorsEnabled;
}

/**
* Apply limit pushdown for a single bucket. Returns files to include, or null if unsafe.
*
* <p>Returns null if files overlap (LSM level 0 or different levels) or have delete rows. For
* non-overlapping files with no delete rows, accumulates row counts until limit is reached.
*
* @param bucketEntries files in the same bucket
* @param limit the limit to apply
* @return files to include, or null if we can't safely push down limit
*/
@Nullable
private List<ManifestEntry> applyLimitPushdownForBucket(
List<ManifestEntry> bucketEntries, long limit) {
// Check if this bucket has overlapping files (LSM property)
boolean hasOverlapping = !noOverlapping(bucketEntries);

if (hasOverlapping) {
// For buckets with overlapping, we can't safely push down limit because files
// need to be merged and we can't accurately calculate the merged row count.
return null;
}

// For buckets without overlapping and with merge engines that don't require
// merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count
// and stop when limit is reached, but only if files have no delete rows.
List<ManifestEntry> result = new ArrayList<>();
long accumulatedRowCount = 0;

for (ManifestEntry entry : bucketEntries) {
long fileRowCount = entry.file().rowCount();
// Check if file has delete rows - if so, we can't accurately calculate
// the merged row count, so we need to stop limit pushdown
boolean hasDeleteRows =
entry.file().deleteRowCount().map(count -> count > 0L).orElse(false);

if (hasDeleteRows) {
// If file has delete rows, we can't accurately calculate merged row count
// without reading the actual data. Can't safely push down limit.
return null;
}

// File has no delete rows, no overlapping, and merge engine doesn't require merge.
// Safe to count rows.
result.add(entry);
accumulatedRowCount += fileRowCount;
if (accumulatedRowCount >= limit) {
break;
}
}

return result;
}

@Override
protected boolean postFilterManifestEntriesEnabled() {
return valueFilter != null && scanMode == ScanMode.ALL;
return (valueFilter != null && scanMode == ScanMode.ALL)
|| supportsLimitPushManifestEntries();
}

@Override
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> files) {
// We group files by bucket here, and filter them by the whole bucket filter.
// Why do this: because in primary key table, we can't just filter the value
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
// but we can do this by filter the whole bucket files
return files.stream()
.collect(
Collectors.groupingBy(
// we use LinkedHashMap to avoid disorder
file -> Pair.of(file.partition(), file.bucket()),
LinkedHashMap::new,
Collectors.toList()))
.values()
.stream()
.map(this::doFilterWholeBucketByStats)
.flatMap(Collection::stream)
.collect(Collectors.toList());
long startTime = System.nanoTime();
Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets = groupByBucket(files);

// Apply filter if valueFilter is enabled, otherwise use identity function
Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor =
(valueFilter != null && scanMode == ScanMode.ALL)
? this::doFilterWholeBucketByStats
: Function.identity();

// Apply filter (if enabled) and limit pushdown (if enabled)
boolean limitEnabled = supportsLimitPushManifestEntries();
List<ManifestEntry> result =
applyLimitPushdownToBuckets(buckets, bucketProcessor, limitEnabled);

if (limitEnabled) {
long duration = (System.nanoTime() - startTime) / 1_000_000;
LOG.info(
"Limit pushdown for PK table completed in {} ms. Limit: {}, InputFiles: {}, OutputFiles: {}, "
+ "MergeEngine: {}, ScanMode: {}, DeletionVectorsEnabled: {}",
duration,
limit,
files.size(),
result.size(),
mergeEngine,
scanMode,
deletionVectorsEnabled);
}

return result;
}

/**
* Apply limit pushdown to buckets with an optional bucket processor (e.g., filtering).
*
* <p>This method processes buckets in order, applying the bucket processor first, then applying
* limit pushdown if enabled. It stops early when the limit is reached.
*
* @param buckets buckets grouped by (partition, bucket)
* @param bucketProcessor processor to apply to each bucket before limit pushdown
* @return processed entries (filtered and limited if limit is enabled)
*/
private List<ManifestEntry> applyLimitPushdownToBuckets(
Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> buckets,
Function<List<ManifestEntry>, List<ManifestEntry>> bucketProcessor,
boolean limitEnabled) {
List<ManifestEntry> result = new ArrayList<>();
long accumulatedRowCount = 0;

for (List<ManifestEntry> bucketEntries : buckets.values()) {
// Apply bucket processor (e.g., filtering)
List<ManifestEntry> processed = bucketProcessor.apply(bucketEntries);

if (limitEnabled) {
// Apply limit pushdown if enabled
if (accumulatedRowCount >= limit) {
// Already reached limit, stop processing remaining buckets
break;
}

long remainingLimit = limit - accumulatedRowCount;
List<ManifestEntry> processedBucket =
applyLimitPushdownForBucket(processed, remainingLimit);
if (processedBucket == null) {
// Can't safely push down limit for this bucket, include all processed entries
result.addAll(processed);
} else {
result.addAll(processedBucket);
for (ManifestEntry entry : processedBucket) {
long fileRowCount = entry.file().rowCount();
accumulatedRowCount += fileRowCount;
}
}
} else {
// No limit pushdown, just add processed entries
result.addAll(processed);
}
}

return result;
}

private List<ManifestEntry> doFilterWholeBucketByStats(List<ManifestEntry> entries) {
Expand Down Expand Up @@ -316,4 +455,16 @@ private static boolean noOverlapping(List<ManifestEntry> entries) {

return true;
}

/** Group manifest entries by (partition, bucket) while preserving order. */
private Map<Pair<BinaryRow, Integer>, List<ManifestEntry>> groupByBucket(
List<ManifestEntry> entries) {
return entries.stream()
.collect(
Collectors.groupingBy(
// we use LinkedHashMap to avoid disorder
file -> Pair.of(file.partition(), file.bucket()),
LinkedHashMap::new,
Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
import org.apache.paimon.types.DataType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -40,6 +43,8 @@
/** {@link TableScan} implementation for batch planning. */
public class DataTableBatchScan extends AbstractDataTableScan {

private static final Logger LOG = LoggerFactory.getLogger(DataTableBatchScan.class);

private StartingScanner startingScanner;
private boolean hasNext;

Expand Down Expand Up @@ -134,6 +139,7 @@ private Optional<StartingScanner.Result> applyPushDownLimit() {
long scannedRowCount = 0;
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
List<DataSplit> splits = plan.dataSplits();
LOG.info("Applying limit pushdown. Original splits count: {}", splits.size());
if (splits.isEmpty()) {
return Optional.of(result);
}
Expand All @@ -147,6 +153,11 @@ private Optional<StartingScanner.Result> applyPushDownLimit() {
if (scannedRowCount >= pushDownLimit) {
SnapshotReader.Plan newPlan =
new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits);
LOG.info(
"Limit pushdown applied successfully. Original splits: {}, Limited splits: {}, Pushdown limit: {}",
splits.size(),
limitedSplits.size(),
pushDownLimit);
return Optional.of(new ScannedResult(newPlan));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,11 @@ private List<DataSplit> generateSplits(
isStreaming
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);

// Calculate bucketPath once per bucket to avoid repeated computation
String bucketPath = pathFactory.bucketPath(partition, bucket).toString();
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
String bucketPath = pathFactory.bucketPath(partition, bucket).toString();
builder.withDataFiles(dataFiles)
.rawConvertible(splitGroup.rawConvertible)
.withBucketPath(bucketPath);
Expand Down
Loading