diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 284a2ef19579..fdefcd0d350b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -46,6 +46,9 @@ import org.apache.paimon.utils.Range; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.util.ArrayList; @@ -68,6 +71,8 @@ /** Default implementation of {@link FileStoreScan}. */ public abstract class AbstractFileStoreScan implements FileStoreScan { + private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreScan.class); + private final ManifestsReader manifestsReader; private final SnapshotManager snapshotManager; private final ManifestFile.Factory manifestFileFactory; @@ -277,9 +282,6 @@ public Plan plan() { List manifests = manifestsResult.filteredManifests; Iterator iterator = readManifestEntries(manifests, false); - if (supportsLimitPushManifestEntries()) { - iterator = limitPushManifestEntries(iterator); - } List files = ListUtils.toList(iterator); if (postFilterManifestEntriesEnabled()) { @@ -289,6 +291,10 @@ public Plan plan() { List result = files; long scanDuration = (System.nanoTime() - started) / 1_000_000; + LOG.info( + "File store scan plan completed in {} ms. Files size : {}", + scanDuration, + result.size()); if (scanMetrics != null) { long allDataFiles = manifestsResult.allManifests.stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 189270616954..bf7504a5fb97 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -33,7 +33,9 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -92,9 +94,21 @@ public boolean supportsLimitPushManifestEntries() { } @Override - protected Iterator limitPushManifestEntries(Iterator entries) { + protected boolean postFilterManifestEntriesEnabled() { + return supportsLimitPushManifestEntries(); + } + + @Override + protected List postFilterManifestEntries(List entries) { checkArgument(limit != null && limit > 0 && !deletionVectorsEnabled); - return new LimitAwareManifestEntryIterator(entries, limit); + // Use LimitAwareManifestEntryIterator for limit pushdown + Iterator iterator = + new LimitAwareManifestEntryIterator(entries.iterator(), limit); + List result = new ArrayList<>(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + return result; } /** Note: Keep this thread-safe. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index ef6fd1e52b8e..2ac05b7385c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FilteredManifestEntry; @@ -37,16 +38,19 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE; @@ -55,6 +59,8 @@ /** {@link FileStoreScan} for {@link KeyValueFileStore}. */ public class KeyValueFileStoreScan extends AbstractFileStoreScan { + private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreScan.class); + private final SimpleStatsEvolutions fieldKeyStatsConverters; private final SimpleStatsEvolutions fieldValueStatsConverters; private final BucketSelectConverter bucketSelectConverter; @@ -203,29 +209,162 @@ private boolean isValueFilterEnabled() { } } + /** + * Check if limit pushdown is supported for PK tables. + * + *

Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need merge) or deletion + * vectors are enabled (can't count deleted rows). For DEDUPLICATE/FIRST_ROW, per-bucket checks + * (no overlapping, no delete rows) are done in applyLimitPushdownForBucket. + */ + @Override + public boolean supportsLimitPushManifestEntries() { + if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE) { + return false; + } + + return limit != null && limit > 0 && !deletionVectorsEnabled; + } + + /** + * Apply limit pushdown for a single bucket. Returns files to include, or null if unsafe. + * + *

Returns null if files overlap (LSM level 0 or different levels) or have delete rows. For + * non-overlapping files with no delete rows, accumulates row counts until limit is reached. + * + * @param bucketEntries files in the same bucket + * @param limit the limit to apply + * @return files to include, or null if we can't safely push down limit + */ + @Nullable + private List applyLimitPushdownForBucket( + List bucketEntries, long limit) { + // Check if this bucket has overlapping files (LSM property) + boolean hasOverlapping = !noOverlapping(bucketEntries); + + if (hasOverlapping) { + // For buckets with overlapping, we can't safely push down limit because files + // need to be merged and we can't accurately calculate the merged row count. + return null; + } + + // For buckets without overlapping and with merge engines that don't require + // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count + // and stop when limit is reached, but only if files have no delete rows. + List result = new ArrayList<>(); + long accumulatedRowCount = 0; + + for (ManifestEntry entry : bucketEntries) { + long fileRowCount = entry.file().rowCount(); + // Check if file has delete rows - if so, we can't accurately calculate + // the merged row count, so we need to stop limit pushdown + boolean hasDeleteRows = + entry.file().deleteRowCount().map(count -> count > 0L).orElse(false); + + if (hasDeleteRows) { + // If file has delete rows, we can't accurately calculate merged row count + // without reading the actual data. Can't safely push down limit. + return null; + } + + // File has no delete rows, no overlapping, and merge engine doesn't require merge. + // Safe to count rows. + result.add(entry); + accumulatedRowCount += fileRowCount; + if (accumulatedRowCount >= limit) { + break; + } + } + + return result; + } + @Override protected boolean postFilterManifestEntriesEnabled() { - return valueFilter != null && scanMode == ScanMode.ALL; + return (valueFilter != null && scanMode == ScanMode.ALL) + || supportsLimitPushManifestEntries(); } @Override protected List postFilterManifestEntries(List files) { - // We group files by bucket here, and filter them by the whole bucket filter. - // Why do this: because in primary key table, we can't just filter the value - // by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`), - // but we can do this by filter the whole bucket files - return files.stream() - .collect( - Collectors.groupingBy( - // we use LinkedHashMap to avoid disorder - file -> Pair.of(file.partition(), file.bucket()), - LinkedHashMap::new, - Collectors.toList())) - .values() - .stream() - .map(this::doFilterWholeBucketByStats) - .flatMap(Collection::stream) - .collect(Collectors.toList()); + long startTime = System.nanoTime(); + Map, List> buckets = groupByBucket(files); + + // Apply filter if valueFilter is enabled, otherwise use identity function + Function, List> bucketProcessor = + (valueFilter != null && scanMode == ScanMode.ALL) + ? this::doFilterWholeBucketByStats + : Function.identity(); + + // Apply filter (if enabled) and limit pushdown (if enabled) + boolean limitEnabled = supportsLimitPushManifestEntries(); + List result = + applyLimitPushdownToBuckets(buckets, bucketProcessor, limitEnabled); + + if (limitEnabled) { + long duration = (System.nanoTime() - startTime) / 1_000_000; + LOG.info( + "Limit pushdown for PK table completed in {} ms. Limit: {}, InputFiles: {}, OutputFiles: {}, " + + "MergeEngine: {}, ScanMode: {}, DeletionVectorsEnabled: {}", + duration, + limit, + files.size(), + result.size(), + mergeEngine, + scanMode, + deletionVectorsEnabled); + } + + return result; + } + + /** + * Apply limit pushdown to buckets with an optional bucket processor (e.g., filtering). + * + *

This method processes buckets in order, applying the bucket processor first, then applying + * limit pushdown if enabled. It stops early when the limit is reached. + * + * @param buckets buckets grouped by (partition, bucket) + * @param bucketProcessor processor to apply to each bucket before limit pushdown + * @return processed entries (filtered and limited if limit is enabled) + */ + private List applyLimitPushdownToBuckets( + Map, List> buckets, + Function, List> bucketProcessor, + boolean limitEnabled) { + List result = new ArrayList<>(); + long accumulatedRowCount = 0; + + for (List bucketEntries : buckets.values()) { + // Apply bucket processor (e.g., filtering) + List processed = bucketProcessor.apply(bucketEntries); + + if (limitEnabled) { + // Apply limit pushdown if enabled + if (accumulatedRowCount >= limit) { + // Already reached limit, stop processing remaining buckets + break; + } + + long remainingLimit = limit - accumulatedRowCount; + List processedBucket = + applyLimitPushdownForBucket(processed, remainingLimit); + if (processedBucket == null) { + // Can't safely push down limit for this bucket, include all processed entries + result.addAll(processed); + } else { + result.addAll(processedBucket); + for (ManifestEntry entry : processedBucket) { + long fileRowCount = entry.file().rowCount(); + accumulatedRowCount += fileRowCount; + } + } + } else { + // No limit pushdown, just add processed entries + result.addAll(processed); + } + } + + return result; } private List doFilterWholeBucketByStats(List entries) { @@ -316,4 +455,16 @@ private static boolean noOverlapping(List entries) { return true; } + + /** Group manifest entries by (partition, bucket) while preserving order. */ + private Map, List> groupByBucket( + List entries) { + return entries.stream() + .collect( + Collectors.groupingBy( + // we use LinkedHashMap to avoid disorder + file -> Pair.of(file.partition(), file.bucket()), + LinkedHashMap::new, + Collectors.toList())); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 4419b3890ef5..41efbe25222a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -31,6 +31,9 @@ import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult; import org.apache.paimon.types.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -40,6 +43,8 @@ /** {@link TableScan} implementation for batch planning. */ public class DataTableBatchScan extends AbstractDataTableScan { + private static final Logger LOG = LoggerFactory.getLogger(DataTableBatchScan.class); + private StartingScanner startingScanner; private boolean hasNext; @@ -134,6 +139,7 @@ private Optional applyPushDownLimit() { long scannedRowCount = 0; SnapshotReader.Plan plan = ((ScannedResult) result).plan(); List splits = plan.dataSplits(); + LOG.info("Applying limit pushdown. Original splits count: {}", splits.size()); if (splits.isEmpty()) { return Optional.of(result); } @@ -147,6 +153,11 @@ private Optional applyPushDownLimit() { if (scannedRowCount >= pushDownLimit) { SnapshotReader.Plan newPlan = new PlanImpl(plan.watermark(), plan.snapshotId(), limitedSplits); + LOG.info( + "Limit pushdown applied successfully. Original splits: {}, Limited splits: {}, Pushdown limit: {}", + splits.size(), + limitedSplits.size(), + pushDownLimit); return Optional.of(new ScannedResult(newPlan)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index e036710e8ef9..3021eeda3e2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -409,9 +409,11 @@ private List generateSplits( isStreaming ? splitGenerator.splitForStreaming(bucketFiles) : splitGenerator.splitForBatch(bucketFiles); + + // Calculate bucketPath once per bucket to avoid repeated computation + String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); for (SplitGenerator.SplitGroup splitGroup : splitGroups) { List dataFiles = splitGroup.files; - String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); builder.withDataFiles(dataFiles) .rawConvertible(splitGroup.rawConvertible) .withBucketPath(bucketPath); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 4f3d5c1c24dd..f975cf570df7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.Snapshot; import org.apache.paimon.TestFileStore; @@ -30,9 +31,10 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.SnapshotManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,7 +61,6 @@ public class KeyValueFileStoreScanTest { private TestKeyValueGenerator gen; @TempDir java.nio.file.Path tempDir; private TestFileStore store; - private SnapshotManager snapshotManager; @BeforeEach public void beforeEach() throws Exception { @@ -76,7 +77,6 @@ public void beforeEach() throws Exception { DeduplicateMergeFunction.factory(), null) .build(); - snapshotManager = store.snapshotManager(); SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); @@ -271,6 +271,276 @@ public void testDropStatsInPlan() throws Exception { } } + @Test + public void testLimitPushdownWithoutValueFilter() throws Exception { + // Write multiple files to test limit pushdown + List data1 = generateData(50); + writeData(data1); + List data2 = generateData(50); + writeData(data2); + List data3 = generateData(50); + Snapshot snapshot = writeData(data3); + + // Without limit, should read all files + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + assertThat(totalFiles).isGreaterThan(0); + + // With limit, should read fewer files (limit pushdown should work) + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(10); + List filesWithLimit = scanWithLimit.plan().files(); + // Limit pushdown should reduce the number of files read + assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles); + assertThat(filesWithLimit.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithValueFilter() throws Exception { + // Write data with different item values + List data1 = generateData(50, 0, 100L); + writeData(data1); + List data2 = generateData(50, 0, 200L); + writeData(data2); + List data3 = generateData(50, 0, 300L); + Snapshot snapshot = writeData(data3); + + // Without valueFilter, limit pushdown should work + KeyValueFileStoreScan scanWithoutFilter = store.newScan(); + scanWithoutFilter.withSnapshot(snapshot.id()).withLimit(10); + List filesWithoutFilter = scanWithoutFilter.plan().files(); + int totalFilesWithoutFilter = filesWithoutFilter.size(); + assertThat(totalFilesWithoutFilter).isGreaterThan(0); + + // With valueFilter, limit pushdown should still work. + KeyValueFileStoreScan scanWithFilter = store.newScan(); + scanWithFilter.withSnapshot(snapshot.id()); + scanWithFilter.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE) + .between(4, 100L, 200L)); + scanWithFilter.withLimit(10); + List filesWithFilter = scanWithFilter.plan().files(); + + // Limit pushdown should work with valueFilter + // The number of files should be less than or equal to the total files after filtering + assertThat(filesWithFilter.size()).isGreaterThan(0); + assertThat(filesWithFilter.size()).isLessThanOrEqualTo(totalFilesWithoutFilter); + } + + @Test + public void testLimitPushdownWithKeyFilter() throws Exception { + // Write data with different shop IDs + List data = generateData(200); + Snapshot snapshot = writeData(data); + + // With keyFilter, limit pushdown should still work (keyFilter doesn't affect limit + // pushdown) + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withKeyFilter( + new PredicateBuilder(RowType.of(new IntType(false))) + .equal(0, data.get(0).key().getInt(0))); + scan.withLimit(5); + List files = scan.plan().files(); + assertThat(files.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownMultipleBuckets() throws Exception { + // Write data to multiple buckets to test limit pushdown across buckets + List data1 = generateData(30); + writeData(data1); + List data2 = generateData(30); + writeData(data2); + List data3 = generateData(30); + Snapshot snapshot = writeData(data3); + + // Without limit, should read all files + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + assertThat(totalFiles).isGreaterThan(0); + + // With limit, should read fewer files (limit pushdown should work across buckets) + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(20); + List filesWithLimit = scanWithLimit.plan().files(); + // Limit pushdown should reduce the number of files read + assertThat(filesWithLimit.size()).isLessThanOrEqualTo(totalFiles); + assertThat(filesWithLimit.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithSmallLimit() throws Exception { + // Test limit pushdown with a very small limit + List data1 = generateData(100); + writeData(data1); + List data2 = generateData(100); + writeData(data2); + Snapshot snapshot = writeData(data2); + + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()).withLimit(1); + List files = scan.plan().files(); + // Should read at least one file, but fewer than all files + assertThat(files.size()).isGreaterThan(0); + } + + @Test + public void testLimitPushdownWithLargeLimit() throws Exception { + // Test limit pushdown with a large limit (larger than total rows) + List data1 = generateData(50); + writeData(data1); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2); + + KeyValueFileStoreScan scanWithoutLimit = store.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + + KeyValueFileStoreScan scanWithLimit = store.newScan(); + scanWithLimit.withSnapshot(snapshot.id()).withLimit(10000); + List filesWithLimit = scanWithLimit.plan().files(); + // With a large limit, should read all files + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + + @Test + public void testLimitPushdownWithPartialUpdateMergeEngine() throws Exception { + // Test that limit pushdown is disabled for PARTIAL_UPDATE merge engine + // Create a store with PARTIAL_UPDATE merge engine by setting it in schema options + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); + Schema schema = + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.singletonMap(CoreOptions.MERGE_ENGINE.key(), "partial-update"), + null); + TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema); + + TestFileStore storePartialUpdate = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .build(); + + List data1 = generateData(50); + writeData(data1, storePartialUpdate); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2, storePartialUpdate); + + KeyValueFileStoreScan scan = storePartialUpdate.newScan(); + scan.withSnapshot(snapshot.id()).withLimit(10); + // supportsLimitPushManifestEntries should return false for PARTIAL_UPDATE + assertThat(scan.supportsLimitPushManifestEntries()).isFalse(); + + // Should read all files since limit pushdown is disabled + KeyValueFileStoreScan scanWithoutLimit = storePartialUpdate.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + + List filesWithLimit = scan.plan().files(); + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + + @Test + public void testLimitPushdownWithAggregateMergeEngine() throws Exception { + // Test that limit pushdown is disabled for AGGREGATE merge engine + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); + Schema schema = + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.singletonMap(CoreOptions.MERGE_ENGINE.key(), "aggregation"), + null); + TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema); + + TestFileStore storeAggregate = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .build(); + + List data1 = generateData(50); + writeData(data1, storeAggregate); + List data2 = generateData(50); + Snapshot snapshot = writeData(data2, storeAggregate); + + KeyValueFileStoreScan scan = storeAggregate.newScan(); + scan.withSnapshot(snapshot.id()).withLimit(10); + // supportsLimitPushManifestEntries should return false for AGGREGATE + assertThat(scan.supportsLimitPushManifestEntries()).isFalse(); + + // Should read all files since limit pushdown is disabled + KeyValueFileStoreScan scanWithoutLimit = storeAggregate.newScan(); + scanWithoutLimit.withSnapshot(snapshot.id()); + List filesWithoutLimit = scanWithoutLimit.plan().files(); + int totalFiles = filesWithoutLimit.size(); + + List filesWithLimit = scan.plan().files(); + assertThat(filesWithLimit.size()).isEqualTo(totalFiles); + } + + @Test + public void testLimitPushdownWithDeletionVectors() throws Exception { + // Test that limit pushdown is disabled when deletion vectors are enabled + SchemaManager schemaManager = + new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri())); + Schema schema = + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.singletonMap( + CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"), + null); + TableSchema tableSchema = SchemaUtils.forceCommit(schemaManager, schema); + + TestFileStore storeWithDV = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .build(); + + KeyValueFileStoreScan scan = storeWithDV.newScan(); + scan.withLimit(10); + // supportsLimitPushManifestEntries should return false when deletion vectors are enabled + assertThat(scan.supportsLimitPushManifestEntries()).isFalse(); + } + private void runTestExactMatch( FileStoreScan scan, Long expectedSnapshotId, Map expected) throws Exception { @@ -307,10 +577,6 @@ private List generateData(int numRecords) { return data; } - private List generateData(int numRecords, int hr) { - return generateData(numRecords, hr, null); - } - private List generateData(int numRecords, int hr, Long itemId) { List data = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { @@ -320,7 +586,11 @@ private List generateData(int numRecords, int hr, Long itemId) { } private Snapshot writeData(List kvs) throws Exception { - List snapshots = store.commitData(kvs, gen::getPartition, this::getBucket); + return writeData(kvs, store); + } + + private Snapshot writeData(List kvs, TestFileStore testStore) throws Exception { + List snapshots = testStore.commitData(kvs, gen::getPartition, this::getBucket); return snapshots.get(snapshots.size() - 1); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index e1426fc8b56d..c0af9130a11a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -1579,4 +1579,129 @@ private void assertResult(int numProducers) { } } } + + @Test + public void testLimitPushdownWithTimeFilter() throws Exception { + // This test verifies that limit pushdown works correctly when valueFilter + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "ts TIMESTAMP(3), " + + "PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + // Insert data with different timestamps + tEnv.executeSql( + "INSERT INTO T VALUES " + + "(1, 'a', TIMESTAMP '2024-01-01 10:00:00'), " + + "(2, 'b', TIMESTAMP '2024-01-01 11:00:00'), " + + "(3, 'c', TIMESTAMP '2024-01-01 12:00:00'), " + + "(4, 'd', TIMESTAMP '2024-01-01 13:00:00'), " + + "(5, 'e', TIMESTAMP '2024-01-01 14:00:00')") + .await(); + + // Without filter, limit pushdown should work + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 3").collect()) { + List allRows = new ArrayList<>(); + iter.forEachRemaining(allRows::add); + assertThat(allRows.size()).isEqualTo(3); + } + + // Test limit pushdown with time filter (4 rows match, LIMIT 3) + try (CloseableIterator iter = + tEnv.executeSql( + "SELECT * FROM T WHERE ts >= TIMESTAMP '2024-01-01 11:00:00' LIMIT 3") + .collect()) { + List filteredRows = new ArrayList<>(); + iter.forEachRemaining(filteredRows::add); + assertThat(filteredRows.size()).isGreaterThanOrEqualTo(3); + assertThat(filteredRows.size()).isLessThanOrEqualTo(4); + for (Row row : filteredRows) { + java.time.LocalDateTime ts = (java.time.LocalDateTime) row.getField(2); + java.time.LocalDateTime filterTime = + java.time.LocalDateTime.parse("2024-01-01T11:00:00"); + assertThat(ts).isAfterOrEqualTo(filterTime); + } + } + + // Test with more restrictive filter (3 rows match, LIMIT 2) + try (CloseableIterator iter = + tEnv.executeSql( + "SELECT * FROM T WHERE ts >= TIMESTAMP '2024-01-01 12:00:00' LIMIT 2") + .collect()) { + List filteredRows2 = new ArrayList<>(); + iter.forEachRemaining(filteredRows2::add); + assertThat(filteredRows2.size()).isGreaterThanOrEqualTo(2); + assertThat(filteredRows2.size()).isLessThanOrEqualTo(3); + for (Row row : filteredRows2) { + java.time.LocalDateTime ts = (java.time.LocalDateTime) row.getField(2); + java.time.LocalDateTime filterTime = + java.time.LocalDateTime.parse("2024-01-01T12:00:00"); + assertThat(ts).isAfterOrEqualTo(filterTime); + } + } + } + + @Test + public void testLimitPushdownBasic() throws Exception { + // Test basic limit pushdown + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "PRIMARY KEY (id) NOT ENFORCED" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await(); + tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await(); + tEnv.executeSql("INSERT INTO T VALUES (7, 'g'), (8, 'h'), (9, 'i')").await(); + + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 5").collect()) { + List rows = new ArrayList<>(); + iter.forEachRemaining(rows::add); + + assertThat(rows.size()).isEqualTo(5); + } + } + + @Test + public void testLimitPushdownWithDeletionVector() throws Exception { + // Test limit pushdown is disabled when deletion vector is enabled + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + tEnv.executeSql("USE CATALOG testCatalog"); + tEnv.executeSql( + "CREATE TABLE T (" + + "id INT, " + + "name STRING, " + + "PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + "'deletion-vectors.enabled' = 'true'" + + ")"); + + tEnv.executeSql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')").await(); + tEnv.executeSql("INSERT INTO T VALUES (4, 'd'), (5, 'e'), (6, 'f')").await(); + + tEnv.executeSql("DELETE FROM T WHERE id = 2").await(); + + // Limit pushdown should be disabled when deletion vector is enabled + // because we can't accurately calculate row count after applying deletion vectors + try (CloseableIterator iter = tEnv.executeSql("SELECT * FROM T LIMIT 3").collect()) { + List rows = new ArrayList<>(); + iter.forEachRemaining(rows::add); + + assertThat(rows.size()).isEqualTo(3); + + for (Row row : rows) { + assertThat(row.getField(0)).isNotEqualTo(2); + } + } + } }