Skip to content

Commit bdd89f2

Browse files
peter-tothdongjoon-hyun
authored andcommitted
[SPARK-56469][SQL][TESTS] BufferedRowsReader lifecycle hardening
### What changes were proposed in this pull request? Extracted from #55116. The test-framework reader in `InMemoryBaseTable` now tracks a closed flag and throws `IllegalStateException` for reads, double-closes, or metric fetches on a closed reader. This ensures future tests catch reader lifecycle bugs that were previously hidden by the noop `close()`. ### Why are the changes needed? Improve test framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #55336 from peter-toth/SPARK-56469-bufferedrowsreader-lifecycle-hardening. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 53f6b15 commit bdd89f2

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -877,15 +877,21 @@ private class BufferedRowsReader(
877877

878878
private var index: Int = -1
879879
private var rowsRead: Long = 0
880+
private var closed: Boolean = false
881+
882+
private def checkNotClosed(op: String): Unit =
883+
if (closed) throw new IllegalStateException(s"$op called on a closed BufferedRowsReader")
880884

881885
override def next(): Boolean = {
886+
checkNotClosed("next()")
882887
index += 1
883888
val hasNext = index < partition.rows.length
884889
if (hasNext) rowsRead += 1
885890
hasNext
886891
}
887892

888893
override def get(): InternalRow = {
894+
checkNotClosed("get()")
889895
val originalRow = partition.rows(index)
890896
val values = new Array[Any](nonMetadataColumns.length)
891897
nonMetadataColumns.zipWithIndex.foreach { case (col, idx) =>
@@ -895,7 +901,13 @@ private class BufferedRowsReader(
895901
addMetadata(new GenericInternalRow(values))
896902
}
897903

898-
override def close(): Unit = {}
904+
// Intentionally strict: double-close throws rather than being idempotent (as Closeable permits).
905+
// This is test code whose purpose is to catch reader lifecycle bugs early; a silent no-op on
906+
// double-close would mask the very errors we want to detect.
907+
override def close(): Unit = {
908+
checkNotClosed("close()")
909+
closed = true
910+
}
899911

900912
private def extractFieldValue(
901913
field: StructField,
@@ -1041,6 +1053,7 @@ private class BufferedRowsReader(
10411053
}
10421054

10431055
override def currentMetricsValues(): Array[CustomTaskMetric] = {
1056+
checkNotClosed("currentMetricsValues()")
10441057
val metric = new CustomTaskMetric {
10451058
override def name(): String = "rows_read"
10461059
override def value(): Long = rowsRead

0 commit comments

Comments
 (0)