From 3a99062ee93cd1e08d3640721ea56ea7210a9825 Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Wed, 27 May 2026 15:16:24 +0800 Subject: [PATCH] [Flink] Fix ReadOperator to stop reading after LIMIT on dedicated split path. --- .../flink/source/operator/ReadOperator.java | 15 +- .../paimon/flink/BatchFileStoreITCase.java | 17 ++ .../operator/DedicatedSplitReadLimitTest.java | 163 ++++++++++++++++++ .../source/operator/OperatorSourceTest.java | 8 +- 4 files changed, 191 insertions(+), 12 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java index a9b9767041e6..30b2ab0d62e4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java @@ -22,6 +22,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.NestedProjectedRowData; +import org.apache.paimon.flink.source.RecordLimiter; import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; @@ -68,6 +69,7 @@ public class ReadOperator extends AbstractStreamOperator private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED; private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; private transient Counter numRecordsIn; + @Nullable private transient RecordLimiter recordLimiter; @Nullable private final Long limit; public ReadOperator( @@ -98,6 +100,7 @@ public void open() throws Exception { .getIOManager() .getSpillingDirectoriesPaths()); this.read = readSupplier.get().withIOManager(ioManager); + this.recordLimiter = RecordLimiter.create(limit); this.reuseRow = new FlinkRowData(null); this.reuseRecord = new StreamRecord<>(null); this.idlingStarted(); @@ -122,7 +125,7 @@ public void processElement(StreamRecord record) throws Exception { boolean firstRecord = true; try (CloseableIterator iterator = read.createReader(split).toCloseableIterator()) { - while (iterator.hasNext()) { + while (!reachLimit() && iterator.hasNext()) { emitEventTimeLag = System.currentTimeMillis() - eventTime; // each Split is already counted as one input record, @@ -133,10 +136,6 @@ public void processElement(StreamRecord record) throws Exception { numRecordsIn.inc(); } - if (reachLimit()) { - return; - } - reuseRow.replace(iterator.next()); if (nestedProjectedRowData == null) { reuseRecord.replace(reuseRow); @@ -145,6 +144,10 @@ public void processElement(StreamRecord record) throws Exception { reuseRecord.replace(nestedProjectedRowData); } output.collect(reuseRecord); + + if (recordLimiter != null) { + recordLimiter.increment(); + } } } // start idle when data sending is completed @@ -160,7 +163,7 @@ public void close() throws Exception { } private boolean reachLimit() { - if (limit != null && numRecordsIn.getCount() > limit) { + if (recordLimiter != null && recordLimiter.reachLimit()) { LOG.info("Reader {} reach the limit record {}.", this, limit); return true; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 93ef7fbe4d2e..eabb9221db83 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -1415,6 +1415,23 @@ public void testBatchReadSourceWithSnapshot() { .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); } + @Test + public void testDedicatedPathLimitTenOnManyRows() { + StringBuilder insert = new StringBuilder("INSERT INTO T VALUES "); + for (int i = 1; i <= 100; i++) { + if (i > 1) { + insert.append(", "); + } + insert.append(String.format("(%d, %d, %d)", i, i * 10, i * 100)); + } + batchSql(insert.toString()); + + assertThat( + batchSql( + "SELECT * FROM T /*+ OPTIONS('scan.dedicated-split-generation'='true') */ LIMIT 10")) + .hasSize(10); + } + @Test public void testBatchReadSourceWithoutSnapshot() { assertThat( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java new file mode 100644 index 000000000000..9ef7757dfaaa --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.DataTypes; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests {@link ReadOperator} limit on the dedicated split read path. */ +public class DedicatedSplitReadLimitTest { + + private static final int LIMIT = 10; + + @TempDir Path tempDir; + + private Table table; + + @BeforeEach + public void before() + throws Catalog.TableAlreadyExistException, Catalog.DatabaseNotExistException, + Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException { + Catalog catalog = + CatalogFactory.createCatalog( + CatalogContext.create(new org.apache.paimon.fs.Path(tempDir.toUri()))); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.INT()) + .column("c", DataTypes.INT()) + .primaryKey("a") + .option("bucket", "1") + .build(); + Identifier identifier = Identifier.create("default", "t"); + catalog.createDatabase("default", false); + catalog.createTable(identifier, schema, false); + this.table = catalog.getTable(identifier); + } + + @Test + public void testReadOperatorStopsAfterLimit() throws Exception { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + for (int i = 0; i < 100; i++) { + write.write(GenericRow.of(i, i, i)); + } + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(write.prepareCommit()); + write.close(); + commit.close(); + + ReadBatchCountingRead countingRead = + new ReadBatchCountingRead(table.newReadBuilder().newRead()); + ReadOperator readOperator = new ReadOperator(() -> countingRead, null, (long) LIMIT); + + OneInputStreamOperatorTestHarness harness = + new OneInputStreamOperatorTestHarness<>(readOperator); + harness.setup( + InternalSerializers.create( + RowType.of(new IntType(), new IntType(), new IntType()))); + harness.open(); + for (Split split : table.newReadBuilder().newScan().plan().splits()) { + harness.processElement(new StreamRecord<>(split)); + } + + assertThat(harness.getOutput()).hasSize(LIMIT); + assertThat(countingRead.readBatchInvocations()).isEqualTo(1); + } + + private static class ReadBatchCountingRead implements TableRead { + + private final TableRead delegate; + private final AtomicInteger readBatchInvocations = new AtomicInteger(); + + private ReadBatchCountingRead(TableRead delegate) { + this.delegate = delegate; + } + + int readBatchInvocations() { + return readBatchInvocations.get(); + } + + @Override + public TableRead withMetricRegistry(MetricRegistry registry) { + delegate.withMetricRegistry(registry); + return this; + } + + @Override + public TableRead executeFilter() { + delegate.executeFilter(); + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + delegate.withIOManager(ioManager); + return this; + } + + @Override + public RecordReader createReader(Split split) throws IOException { + RecordReader reader = delegate.createReader(split); + return new RecordReader() { + @Override + public RecordIterator readBatch() throws IOException { + readBatchInvocations.incrementAndGet(); + return reader.readBatch(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + }; + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index e4ab4ec15799..9c57f27b866d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -238,15 +238,11 @@ public void testReadOperatorWithLimit() throws Exception { } ArrayList values = new ArrayList<>(harness.getOutput()); - // In ReadOperator each Split is already counted as one input record. But in this case it - // will not happen. - // So in this case the result values's size if 3 even if the limit is 2. - // The IT case see BatchFileStoreITCase#testBatchReadSourceWithSnapshot. + // ReadOperator limit is enforced on emitted records. assertThat(values) .containsExactlyInAnyOrder( new StreamRecord<>(GenericRowData.of(1, 1, 1)), - new StreamRecord<>(GenericRowData.of(2, 2, 2)), - new StreamRecord<>(GenericRowData.of(3, 3, 3))); + new StreamRecord<>(GenericRowData.of(2, 2, 2))); } @Test