diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexFileRecordIterator.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexFileRecordIterator.java index eec931d3e98f..240d92fb5fb3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexFileRecordIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexFileRecordIterator.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** * A {@link FileRecordIterator} wraps a {@link FileRecordIterator} and {@link BitmapIndexResult}. @@ -35,12 +36,16 @@ public class ApplyBitmapIndexFileRecordIterator implements FileRecordIterator iterator; private final RoaringBitmap32 bitmap; private final int last; + private final AtomicBoolean exhausted; - public ApplyBitmapIndexFileRecordIterator( - FileRecordIterator iterator, BitmapIndexResult fileIndexResult) { + ApplyBitmapIndexFileRecordIterator( + FileRecordIterator iterator, + BitmapIndexResult fileIndexResult, + AtomicBoolean exhausted) { this.iterator = iterator; this.bitmap = fileIndexResult.get(); this.last = bitmap.last(); + this.exhausted = exhausted; } @Override @@ -63,9 +68,13 @@ public InternalRow next() throws IOException { } int position = (int) returnedPosition(); if (position > last) { + exhausted.set(true); return null; } if (bitmap.contains(position)) { + if (position >= last) { + exhausted.set(true); + } return next; } } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java index 3b1207c8bd6e..9b17bcc3cc1e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** A {@link RecordReader} which apply {@link BitmapIndexResult} to filter record. */ public class ApplyBitmapIndexRecordReader implements FileRecordReader { @@ -34,6 +35,8 @@ public class ApplyBitmapIndexRecordReader implements FileRecordReader reader, BitmapIndexResult fileIndexResult) { this.reader = reader; @@ -43,12 +46,16 @@ public ApplyBitmapIndexRecordReader( @Nullable @Override public FileRecordIterator readBatch() throws IOException { + if (exhausted.get()) { + return null; + } + FileRecordIterator batch = reader.readBatch(); if (batch == null) { return null; } - return new ApplyBitmapIndexFileRecordIterator(batch, fileIndexResult); + return new ApplyBitmapIndexFileRecordIterator(batch, fileIndexResult, exhausted); } @Override diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReaderTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReaderTest.java new file mode 100644 index 000000000000..2fbdc2709c7d --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReaderTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex.bitmap; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.RoaringBitmap32; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ApplyBitmapIndexRecordReader}. */ +public class ApplyBitmapIndexRecordReaderTest { + + private static final Path DUMMY_PATH = new Path("/dummy"); + + @Test + public void testLimitStopsAfterFirstBatchWithPreSelectedUnderlying() throws Exception { + // Simulates DataFileRecordReader#readBatchInternal: iterator.selection(selection) + int limit = 10; + int batchSize = 20; + int totalRows = 100; + RoaringBitmap32 limitBitmap = RoaringBitmap32.bitmapOfRange(0, totalRows).limit(limit); + BitmapIndexResult selection = new BitmapIndexResult(() -> limitBitmap); + PreSelectedCountingFileRecordReader underlying = + new PreSelectedCountingFileRecordReader(totalRows, batchSize, limitBitmap); + ApplyBitmapIndexRecordReader reader = + new ApplyBitmapIndexRecordReader(underlying, selection); + + List rows = readViaCloseableIterator(reader); + + assertThat(rows).containsExactlyElementsOf(range(limit)); + assertThat(underlying.readBatchCount()).isEqualTo(1); + } + + @Test + public void testSparseBitmapWithPreSelectedUnderlying() throws Exception { + int batchSize = 5; + int totalRows = 20; + RoaringBitmap32 sparseBitmap = RoaringBitmap32.bitmapOf(0, 2, 4, 6, 8); + BitmapIndexResult selection = new BitmapIndexResult(() -> sparseBitmap); + PreSelectedCountingFileRecordReader underlying = + new PreSelectedCountingFileRecordReader(totalRows, batchSize, sparseBitmap); + ApplyBitmapIndexRecordReader reader = + new ApplyBitmapIndexRecordReader(underlying, selection); + + List rows = readViaCloseableIterator(reader); + + assertThat(rows).containsExactly(0, 2, 4, 6, 8); + assertThat(underlying.readBatchCount()).isEqualTo(2); + } + + @Test + public void testLimitStopsAfterFirstBatch() throws Exception { + int limit = 10; + int batchSize = 20; + int totalRows = 100; + CountingFileRecordReader underlying = new CountingFileRecordReader(totalRows, batchSize); + BitmapIndexResult selection = + new BitmapIndexResult( + () -> RoaringBitmap32.bitmapOfRange(0, totalRows).limit(limit)); + ApplyBitmapIndexRecordReader reader = + new ApplyBitmapIndexRecordReader(underlying, selection); + + List rows = readViaCloseableIterator(reader); + + assertThat(rows).containsExactlyElementsOf(range(limit)); + assertThat(underlying.readBatchCount()).isEqualTo(1); + } + + @Test + public void testForEachRemainingStopsAfterLimit() throws Exception { + int limit = 10; + int batchSize = 20; + int totalRows = 100; + CountingFileRecordReader underlying = new CountingFileRecordReader(totalRows, batchSize); + BitmapIndexResult selection = + new BitmapIndexResult( + () -> RoaringBitmap32.bitmapOfRange(0, totalRows).limit(limit)); + ApplyBitmapIndexRecordReader reader = + new ApplyBitmapIndexRecordReader(underlying, selection); + + List rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(row.getInt(0))); + + assertThat(rows).containsExactlyElementsOf(range(limit)); + assertThat(underlying.readBatchCount()).isEqualTo(1); + } + + @Test + public void testSparseBitmapStillStopsAtLastPosition() throws Exception { + int batchSize = 5; + int totalRows = 20; + CountingFileRecordReader underlying = new CountingFileRecordReader(totalRows, batchSize); + BitmapIndexResult selection = + new BitmapIndexResult(() -> RoaringBitmap32.bitmapOf(0, 2, 4, 6, 8)); + ApplyBitmapIndexRecordReader reader = + new ApplyBitmapIndexRecordReader(underlying, selection); + + List rows = readViaCloseableIterator(reader); + + assertThat(rows).containsExactly(0, 2, 4, 6, 8); + assertThat(underlying.readBatchCount()).isEqualTo(2); + } + + private static List readViaCloseableIterator(ApplyBitmapIndexRecordReader reader) + throws Exception { + List rows = new ArrayList<>(); + try (CloseableIterator iterator = reader.toCloseableIterator()) { + while (iterator.hasNext()) { + rows.add(iterator.next().getInt(0)); + } + } + return rows; + } + + private static List range(int endExclusive) { + List result = new ArrayList<>(endExclusive); + for (int i = 0; i < endExclusive; i++) { + result.add(i); + } + return result; + } + + /** + * Simulates {@code DataFileRecordReader#readBatchInternal}, which applies {@code + * iterator.selection(selection)} before wrapping with {@link ApplyBitmapIndexRecordReader}. + */ + private static class PreSelectedCountingFileRecordReader + implements FileRecordReader { + + private final int totalRows; + private final int batchSize; + private final RoaringBitmap32 selectionBitmap; + private final AtomicInteger readBatchCount = new AtomicInteger(0); + private int nextBatchStart; + + private PreSelectedCountingFileRecordReader( + int totalRows, int batchSize, RoaringBitmap32 selectionBitmap) { + this.totalRows = totalRows; + this.batchSize = batchSize; + this.selectionBitmap = selectionBitmap; + } + + int readBatchCount() { + return readBatchCount.get(); + } + + @Override + public FileRecordIterator readBatch() { + readBatchCount.incrementAndGet(); + if (nextBatchStart >= totalRows) { + return null; + } + int batchStart = nextBatchStart; + int batchEnd = Math.min(nextBatchStart + batchSize, totalRows); + nextBatchStart = batchEnd; + return new PositionFileRecordIterator(batchStart, batchEnd).selection(selectionBitmap); + } + + @Override + public void close() {} + } + + private static class CountingFileRecordReader implements FileRecordReader { + + private final int totalRows; + private final int batchSize; + private final AtomicInteger readBatchCount = new AtomicInteger(0); + private int nextBatchStart; + + private CountingFileRecordReader(int totalRows, int batchSize) { + this.totalRows = totalRows; + this.batchSize = batchSize; + } + + int readBatchCount() { + return readBatchCount.get(); + } + + @Override + public FileRecordIterator readBatch() { + readBatchCount.incrementAndGet(); + if (nextBatchStart >= totalRows) { + return null; + } + int batchStart = nextBatchStart; + int batchEnd = Math.min(nextBatchStart + batchSize, totalRows); + nextBatchStart = batchEnd; + return new PositionFileRecordIterator(batchStart, batchEnd); + } + + @Override + public void close() {} + } + + private static class PositionFileRecordIterator implements FileRecordIterator { + + private final int end; + private int nextPosition; + private int returnedPosition = -1; + + private PositionFileRecordIterator(int start, int end) { + this.nextPosition = start; + this.end = end; + } + + @Override + public InternalRow next() { + if (nextPosition >= end) { + return null; + } + returnedPosition = nextPosition; + return GenericRow.of(nextPosition++); + } + + @Override + public long returnedPosition() { + return returnedPosition; + } + + @Override + public Path filePath() { + return DUMMY_PATH; + } + + @Override + public void releaseBatch() {} + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index e1bd365b2380..1ffd4669d4ec 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -77,6 +77,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchMergeHandler; +import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.commons.math3.random.RandomDataGenerator; @@ -1258,6 +1259,43 @@ public void testLimitPushDown() throws Exception { Thread.sleep(1_000); } + @Test + public void testLimitWithCloseableIterator() throws Exception { + RowType rowType = RowType.builder().field("id", DataTypes.INT()).build(); + Consumer configure = + options -> { + options.set(FILE_FORMAT, FILE_FORMAT_PARQUET); + options.set(WRITE_ONLY, true); + options.set(SOURCE_SPLIT_TARGET_SIZE, MemorySize.ofMebiBytes(256)); + }; + FileStoreTable table = createUnawareBucketFileStoreTable(rowType, configure); + + int rowCount = 5000; + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + for (int i = 0; i < rowCount; i++) { + write.write(GenericRow.of(i)); + } + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); + + int limit = 10; + TableScan.Plan plan = table.newScan().withLimit(limit).plan(); + RecordReader reader = + table.newRead().withLimit(limit).createReader(plan.splits()); + AtomicInteger count = new AtomicInteger(0); + try (CloseableIterator iterator = reader.toCloseableIterator()) { + while (iterator.hasNext()) { + iterator.next(); + count.incrementAndGet(); + } + } + assertThat(count.get()).isEqualTo(limit); + + Thread.sleep(1_000); + } + @Test public void testWithShardAppendTable() throws Exception { FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, -1)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 93ef7fbe4d2e..a3cbc264b96c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -1415,6 +1415,38 @@ public void testBatchReadSourceWithSnapshot() { .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); } + @Test + public void testDedicatedPathLimitTenOnManyRows() { + sql("CREATE TABLE limit_many_rows (a INT, b INT, c INT)"); + StringBuilder insertValues = new StringBuilder(); + for (int i = 1; i <= 100; i++) { + if (i > 1) { + insertValues.append(", "); + } + insertValues.append(String.format("(%d, %d, %d)", i, i * 10, i * 100)); + } + batchSql("INSERT INTO limit_many_rows VALUES " + insertValues); + + List result = + batchSql( + "SELECT * FROM limit_many_rows " + + "/*+ OPTIONS('scan.dedicated-split-generation'='true') */ " + + "LIMIT 10"); + assertThat(result).hasSize(10); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1, 10, 100), + Row.of(2, 20, 200), + Row.of(3, 30, 300), + Row.of(4, 40, 400), + Row.of(5, 50, 500), + Row.of(6, 60, 600), + Row.of(7, 70, 700), + Row.of(8, 80, 800), + Row.of(9, 90, 900), + Row.of(10, 100, 1000)); + } + @Test public void testBatchReadSourceWithoutSnapshot() { assertThat(