[Flink] Fix ReadOperator to stop reading after LIMIT on dedicated split path.#7991
[Flink] Fix ReadOperator to stop reading after LIMIT on dedicated split path.#7991wwj6591812 wants to merge 1 commit into
Conversation
leaves12138
left a comment
There was a problem hiding this comment.
LGTM from code review.
Moving the limiter state to the operator level and checking it before iterator.hasNext() fixes the dedicated split path correctly: once the emitted-record limit is reached, the short-circuit prevents RecordReaderIterator.advanceIfNeeded() from opening more batches. The updated OperatorSourceTest expectation also matches the intended semantics that the limit is enforced on emitted rows, not on the input split counter.
I tried to run the focused Flink tests locally, but this environment could not resolve the current 1.5-SNAPSHOT artifacts (paimon-bundle, paimon-service-client, paimon-service-runtime), so this approval is based on code review only.
| @@ -68,6 +69,7 @@ public class ReadOperator extends AbstractStreamOperator<RowData> | |||
| private transient long emitEventTimeLag = FileStoreSourceReaderMetrics.UNDEFINED; | |||
| private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE; | |||
| private transient Counter numRecordsIn; | |||
There was a problem hiding this comment.
Thanks for the review and for raising this.
We would prefer to keep numRecordsIn. It is used for Flink I/O metrics, not for limit enforcement anymore. In open(), we bind it to getNumRecordsInCounter(), and in processElement() we increment it so emitted rows are reflected in the standard numRecordsIn metric.
After this change, limit enforcement is handled only by RecordLimiter. The old use of numRecordsIn in reachLimit() was part of the bug, so we moved that responsibility out.
If we remove it, the existing numRecordsIn metric on this operator would no longer be updated.
Thanks again for your review.
@JingsongLi
Thank you very much for the review and for approving the change. Your understanding of the short-circuit before hasNext() matches what we intended. We appreciate that your approval is based on code review. We will keep an eye on CI results and share them here if anything comes up. Thanks again for your time and review. |
Problem
With
scan.dedicated-split-generation=true,LIMIT Nreturns correct results but scans the entire ORC file. Flink UI shows the Limit stage ~19s for 10 records.A related core-layer fix is in #7994:
ApplyBitmapIndexRecordReaderdoes not signal reader exhaustion after the bitmap selection is consumed, soRecordReaderIteratorkeeps callingreadBatch()until EOF. That PR fixes the root cause for alltoCloseableIterator()/forEachRemaining()callers. This PR (#7991) fixes the same symptom on the Flink dedicated split path by stoppingReadOperatorfrom callinghasNext()after the limit is reached.Root Cause --- Short Summary
When
scan.dedicated-split-generation=true,ReadOperator.processElement()useswhile (iterator.hasNext())withreachLimit()checked inside the loop body. That does not stop the extrahasNext()after N rows, becausehasNext()is evaluated in thewhilecondition before the body runs — includingreachLimit(). Even after 10 rows are emitted, the loop enters a new iteration and callshasNext()first, which triggersRecordReaderIterator.advanceIfNeeded()and repeatedly callsRecordReader.readBatch()until EOF.The old
reachLimit()also could not reliably stop at N: it usednumRecordsIn > limit(not>=), and skippednumRecordsIn.inc()on the first row — so after emitting 10 rows,numRecordsInwas still 9 andreachLimit()returned false anyway.Although
ApplyBitmapIndexRecordReader(fromReadBuilder.withLimit(N)) stops yielding rows after N, the iterator layer keeps callingreadBatch(). The FLIP-27 path avoids this viaRecordLimiterinFileStoreSourceSplitReader.fetch(). This PR applies the same pattern inReadOperator.Root Cause --- Detailed explanation
1. Read path when
dedicated-split-generation=trueFlink SQL
LIMIT Nis pushed down to Paimon in two places:ReadBuilder.withLimit(N)— may wrap the storage reader (e.g.FileIndexEvaluator→ApplyBitmapIndexRecordReader). Becausefile-index.read.enableddefaults totrue, a query with onlyLIMIT N(no filter) is pushed down as a bitmap selection, and the ORC reader is wrapped byApplyBitmapIndexRecordReader.ReadOperator(limit=N)— reads splits on the dedicated path (MonitorSource→ shuffle →ReadOperator).When the wrapper returns
nullafter row N,RecordReaderIteratortreats it as batch exhaustion and keeps callingreadBatch()until EOF.The slow case is in
ReadOperator.processElement(), which reads via:2. What the old loop did
Previously,
ReadOperator.processElement()used:Execution order on each iteration: (A) → (B) → (C) → (D).
3. Why
reachLimit()inside the loop does not prevent the extrahasNext()A common question: after 10 rows are emitted and
numRecordsIn.inc()has run, shouldn'treachLimit()return true and stop the loop?No — for two reasons:
①
hasNext()runs beforereachLimit()The limit check is in the loop body, but
hasNext()is in thewhilecondition. After the 10th row is emitted, the loop starts iteration 11:So even if
reachLimit()would eventually return true,hasNext()has already been called and the expensive scan may have started.② The old
reachLimit()condition was wrong for this purposeFor
limit = 10, after emitting 10 rows:numRecordsInafterinc()numRecordsIn > 10After the 10th row,
numRecordsInis 9, not 10 — soreachLimit()is still false. The loop proceeds to callhasNext()again.4. Why one extra
hasNext()scans the whole fileRecordReaderIterator.advanceIfNeeded()does:With
withLimit(N),ApplyBitmapIndexFileRecordIterator.next()returnsnullonceposition > last.RecordReaderIteratortreats that as batch exhaustion and keeps callingreadBatch()until EOF.Arthas on
LIMIT 10:This matches the ~19s Limit stage in Flink UI.
5. Why the non-dedicated path does not hit this
FileStoreSourceSplitReader.fetch()checks the limit beforereadBatch():ReadOperatorhad no equivalent guard beforehasNext().Evidence
Flink UI: Limit operator ~19s, 10 records.

Arthas:
trace RecordReaderIterator.advanceIfNeeded shows a single call (~18.7s) containing 96 invocations of RecordReader.readBatch() (and 96 next()/releaseBatch() pairs), confirming the iterator loops until EOF after the limit bitmap stops yielding rows.
Fix
Align
ReadOperatorwithFileStoreSourceSplitReader:RecordLimiter— increment after each emit (1 row = 1 count), withcounter >= limit.hasNext()via&&short-circuit:When
recordLimiter.reachLimit()is true,iterator.hasNext()is not evaluated, soRecordReaderIterator.advanceIfNeeded()is never called and the full-file ORC scan is avoided.Testing
DedicatedSplitReadLimitTest— 100-row split,LIMIT 10→ 10 rows,readBatchcalled onceOperatorSourceTest.testReadOperatorWithLimit— limit=2 → exactly 2 rowsBatchFileStoreITCase.testDedicatedPathLimitTenOnManyRows— 100 rows INSERT,LIMIT 10→ 10 rowsmvn test -pl paimon-flink/paimon-flink-common \ -Dtest=DedicatedSplitReadLimitTest,OperatorSourceTest#testReadOperatorWithLimit,BatchFileStoreITCase#testDedicatedPathLimitTenOnManyRows