Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.source.RecordLimiter;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class ReadOperator extends AbstractStreamOperator<RowData>
private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED;
private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
private transient Counter numRecordsIn;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove numRecordsIn?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review and for raising this.

We would prefer to keep numRecordsIn. It is used for Flink I/O metrics, not for limit enforcement anymore. In open(), we bind it to getNumRecordsInCounter(), and in processElement() we increment it so emitted rows are reflected in the standard numRecordsIn metric.

After this change, limit enforcement is handled only by RecordLimiter. The old use of numRecordsIn in reachLimit() was part of the bug, so we moved that responsibility out.

If we remove it, the existing numRecordsIn metric on this operator would no longer be updated.

Thanks again for your review.
@JingsongLi

@Nullable private transient RecordLimiter recordLimiter;
@Nullable private final Long limit;

public ReadOperator(
Expand Down Expand Up @@ -98,6 +100,7 @@ public void open() throws Exception {
.getIOManager()
.getSpillingDirectoriesPaths());
this.read = readSupplier.get().withIOManager(ioManager);
this.recordLimiter = RecordLimiter.create(limit);
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(null);
this.idlingStarted();
Expand All @@ -122,7 +125,7 @@ public void processElement(StreamRecord<Split> record) throws Exception {
boolean firstRecord = true;
try (CloseableIterator<InternalRow> iterator =
read.createReader(split).toCloseableIterator()) {
while (iterator.hasNext()) {
while (!reachLimit() && iterator.hasNext()) {
emitEventTimeLag = System.currentTimeMillis() - eventTime;

// each Split is already counted as one input record,
Expand All @@ -133,10 +136,6 @@ public void processElement(StreamRecord<Split> record) throws Exception {
numRecordsIn.inc();
}

if (reachLimit()) {
return;
}

reuseRow.replace(iterator.next());
if (nestedProjectedRowData == null) {
reuseRecord.replace(reuseRow);
Expand All @@ -145,6 +144,10 @@ public void processElement(StreamRecord<Split> record) throws Exception {
reuseRecord.replace(nestedProjectedRowData);
}
output.collect(reuseRecord);

if (recordLimiter != null) {
recordLimiter.increment();
}
}
}
// start idle when data sending is completed
Expand All @@ -160,7 +163,7 @@ public void close() throws Exception {
}

private boolean reachLimit() {
if (limit != null && numRecordsIn.getCount() > limit) {
if (recordLimiter != null && recordLimiter.reachLimit()) {
LOG.info("Reader {} reach the limit record {}.", this, limit);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,23 @@ public void testBatchReadSourceWithSnapshot() {
.containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222));
}

@Test
public void testDedicatedPathLimitTenOnManyRows() {
StringBuilder insert = new StringBuilder("INSERT INTO T VALUES ");
for (int i = 1; i <= 100; i++) {
if (i > 1) {
insert.append(", ");
}
insert.append(String.format("(%d, %d, %d)", i, i * 10, i * 100));
}
batchSql(insert.toString());

assertThat(
batchSql(
"SELECT * FROM T /*+ OPTIONS('scan.dedicated-split-generation'='true') */ LIMIT 10"))
.hasSize(10);
}

@Test
public void testBatchReadSourceWithoutSnapshot() {
assertThat(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source.operator;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataTypes;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests {@link ReadOperator} limit on the dedicated split read path. */
public class DedicatedSplitReadLimitTest {

private static final int LIMIT = 10;

@TempDir Path tempDir;

private Table table;

@BeforeEach
public void before()
throws Catalog.TableAlreadyExistException, Catalog.DatabaseNotExistException,
Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException {
Catalog catalog =
CatalogFactory.createCatalog(
CatalogContext.create(new org.apache.paimon.fs.Path(tempDir.toUri())));
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.column("c", DataTypes.INT())
.primaryKey("a")
.option("bucket", "1")
.build();
Identifier identifier = Identifier.create("default", "t");
catalog.createDatabase("default", false);
catalog.createTable(identifier, schema, false);
this.table = catalog.getTable(identifier);
}

@Test
public void testReadOperatorStopsAfterLimit() throws Exception {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
for (int i = 0; i < 100; i++) {
write.write(GenericRow.of(i, i, i));
}
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(write.prepareCommit());
write.close();
commit.close();

ReadBatchCountingRead countingRead =
new ReadBatchCountingRead(table.newReadBuilder().newRead());
ReadOperator readOperator = new ReadOperator(() -> countingRead, null, (long) LIMIT);

OneInputStreamOperatorTestHarness<Split, RowData> harness =
new OneInputStreamOperatorTestHarness<>(readOperator);
harness.setup(
InternalSerializers.create(
RowType.of(new IntType(), new IntType(), new IntType())));
harness.open();
for (Split split : table.newReadBuilder().newScan().plan().splits()) {
harness.processElement(new StreamRecord<>(split));
}

assertThat(harness.getOutput()).hasSize(LIMIT);
assertThat(countingRead.readBatchInvocations()).isEqualTo(1);
}

private static class ReadBatchCountingRead implements TableRead {

private final TableRead delegate;
private final AtomicInteger readBatchInvocations = new AtomicInteger();

private ReadBatchCountingRead(TableRead delegate) {
this.delegate = delegate;
}

int readBatchInvocations() {
return readBatchInvocations.get();
}

@Override
public TableRead withMetricRegistry(MetricRegistry registry) {
delegate.withMetricRegistry(registry);
return this;
}

@Override
public TableRead executeFilter() {
delegate.executeFilter();
return this;
}

@Override
public TableRead withIOManager(IOManager ioManager) {
delegate.withIOManager(ioManager);
return this;
}

@Override
public RecordReader<InternalRow> createReader(Split split) throws IOException {
RecordReader<InternalRow> reader = delegate.createReader(split);
return new RecordReader<InternalRow>() {
@Override
public RecordIterator<InternalRow> readBatch() throws IOException {
readBatchInvocations.incrementAndGet();
return reader.readBatch();
}

@Override
public void close() throws IOException {
reader.close();
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,11 @@ public void testReadOperatorWithLimit() throws Exception {
}
ArrayList<Object> values = new ArrayList<>(harness.getOutput());

// In ReadOperator each Split is already counted as one input record. But in this case it
// will not happen.
// So in this case the result values's size if 3 even if the limit is 2.
// The IT case see BatchFileStoreITCase#testBatchReadSourceWithSnapshot.
// ReadOperator limit is enforced on emitted records.
assertThat(values)
.containsExactlyInAnyOrder(
new StreamRecord<>(GenericRowData.of(1, 1, 1)),
new StreamRecord<>(GenericRowData.of(2, 2, 2)),
new StreamRecord<>(GenericRowData.of(3, 3, 3)));
new StreamRecord<>(GenericRowData.of(2, 2, 2)));
}

@Test
Expand Down