From ec42eb9a8cc12421b084c7c05620591acca1fbd3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 12:47:24 -0700 Subject: [PATCH 1/3] fix: unignore input_file_name Spark SQL tests for native_datafusion The native_datafusion scan now correctly falls back to Spark's FileSourceScanExec when metadata columns (like input_file_name) are present, so the 3 input_file_name tests no longer need to be ignored. For ExtractPythonUDFsSuite, the issue was that the test's collect pattern didn't match CometNativeScanExec. Fixed by adding CometNativeScanExec to the collect and dataFilters match blocks. Closes #3312 Co-Authored-By: Claude Opus 4.6 --- dev/diffs/3.5.8.diff | 129 +++---------------------------------------- 1 file changed, 9 insertions(+), 120 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index beef445490..ac2ea0d8c1 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -238,20 +238,6 @@ index e5494726695..00937f025c2 100644 } test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -index 9e8d77c53f3..855e3ada7d1 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { - } - } - -- test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") { -+ test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withTempPath { dir => - val data = sparkContext.parallelize(0 to 10).toDF("id") - data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 6f3090d8908..c08a60fb0c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -588,57 +574,6 @@ index 93275487f29..510e3087e0f 100644 }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -new file mode 100644 -index 00000000000..1ee842b6f62 ---- /dev/null -+++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala -@@ -0,0 +1,45 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.spark.sql -+ -+import org.scalactic.source.Position -+import org.scalatest.Tag -+ -+import org.apache.spark.sql.test.SQLTestUtils -+ -+/** -+ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). -+ */ -+case class IgnoreComet(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") -+case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") -+ -+/** -+ * Helper trait that disables Comet for all tests regardless of default config values. -+ */ -+trait IgnoreCometSuite extends SQLTestUtils { -+ override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit -+ pos: Position): Unit = { -+ if (isCometEnabled) { -+ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) -+ } else { -+ super.test(testName, testTags: _*)(testFun) -+ } -+ } -+} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 7af826583bd..3c3def1eb67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -1084,20 +1019,6 @@ index 04702201f82..5ee11f83ecf 100644 } assert(exchanges.size === 1) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -index 9f8e979e3fb..3bc9dab8023 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession { - spark.catalog.dropTempView("tmp_table") - } - -- test("SPARK-8005 input_file_name") { -+ test("SPARK-8005 input_file_name", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withTempPath { dir => - val data = sparkContext.parallelize(0 to 10, 2).toDF("id") - data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index d269290e616..13726a31e07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -2504,42 +2425,32 @@ index 5cdbdc27b32..307fba16578 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..7b81f3a8f6d 100644 +index 0ab8691801d..b18a5bea944 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -17,7 +17,9 @@ - +@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python -+import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { - assert(arrowEvalNodes.size == 2) - } - -- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { -+ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - withTempPath { f => - spark.range(10).select($"id".as("a"), $"id".as("b")) -@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) // $"a" is not null and $"a" > 1 @@ -2548,13 +2459,14 @@ index 0ab8691801d..7b81f3a8f6d 100644 + val dataFilters = scanNodes.head match { + case scan: FileSourceScanExec => scan.dataFilters + case scan: CometScanExec => scan.dataFilters ++ case scan: CometNativeScanExec => scan.dataFilters + } + assert(dataFilters.length == 2) + assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a")) } } } -@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2562,7 +2474,7 @@ index 0ab8691801d..7b81f3a8f6d 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3243,29 +3155,6 @@ index de3b1ffccf0..2a76d127093 100644 override def beforeEach(): Unit = { super.beforeEach() -diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -index f3be79f9022..b4b1ea8dbc4 100644 ---- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn - import org.apache.hadoop.io.{LongWritable, Writable} - - import org.apache.spark.{SparkException, SparkFiles, TestUtils} --import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode - import org.apache.spark.sql.catalyst.plans.logical.Project - import org.apache.spark.sql.execution.WholeStageCodegenExec -@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - } - } - -- test("SPARK-11522 select input_file_name from non-parquet table") { -+ test("SPARK-11522 select input_file_name from non-parquet table", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - - withTempDir { tempDir => - diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6160c3e5f6c..0956d7d9edc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala From 188cd8658d8216a07910076ee4ab63dcbb0fd6a9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 17:24:03 -0700 Subject: [PATCH 2/3] fix: restore IgnoreComet.scala in 3.5.8 Spark SQL test diff The previous commit accidentally removed the IgnoreComet.scala file creation from the diff, causing 94 compilation errors when applied to Spark 3.5.8. Co-Authored-By: Claude Opus 4.6 --- dev/diffs/3.5.8.diff | 51 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index ac2ea0d8c1..d2d72e9d68 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -574,6 +574,57 @@ index 93275487f29..510e3087e0f 100644 }.flatten assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) } +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +new file mode 100644 +index 00000000000..1ee842b6f62 +--- /dev/null ++++ b/sql/core/src/test/scala/org/apache/spark/sql/IgnoreComet.scala +@@ -0,0 +1,45 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.spark.sql ++ ++import org.scalactic.source.Position ++import org.scalatest.Tag ++ ++import org.apache.spark.sql.test.SQLTestUtils ++ ++/** ++ * Tests with this tag will be ignored when Comet is enabled (e.g., via `ENABLE_COMET`). ++ */ ++case class IgnoreComet(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeIcebergCompat(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeDataFusion(reason: String) extends Tag("DisableComet") ++case class IgnoreCometNativeScan(reason: String) extends Tag("DisableComet") ++ ++/** ++ * Helper trait that disables Comet for all tests regardless of default config values. ++ */ ++trait IgnoreCometSuite extends SQLTestUtils { ++ override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit ++ pos: Position): Unit = { ++ if (isCometEnabled) { ++ ignore(testName + " (disabled when Comet is on)", testTags: _*)(testFun) ++ } else { ++ super.test(testName, testTags: _*)(testFun) ++ } ++ } ++} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 7af826583bd..3c3def1eb67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala From ab357d180ba5c753068d58da4031ac97f5cf04b8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 10 Feb 2026 12:05:56 -0700 Subject: [PATCH 3/3] fix: fall back scan when plan uses input_file_name expressions CometScanExec does not populate InputFileBlockHolder (the thread-local that Spark's FileScanRDD sets), so input_file_name(), input_file_block_start(), and input_file_block_length() return empty or default values when Comet replaces the scan. Detect these expressions in the plan and fall back to Spark's FileSourceScanExec. Co-Authored-By: Claude Opus 4.6 --- .../apache/comet/rules/CometScanRule.scala | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index acdf2afc0d..3b727f07e4 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues @@ -110,7 +110,9 @@ case class CometScanRule(session: SparkSession) metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix)) } - def transformScan(plan: SparkPlan): SparkPlan = plan match { + val fullPlan = plan + + def transformScan(scanNode: SparkPlan): SparkPlan = scanNode match { case scan if !CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) => withInfo(scan, "Comet Scan is not enabled") @@ -119,7 +121,7 @@ case class CometScanRule(session: SparkSession) // data source V1 case scanExec: FileSourceScanExec => - transformV1Scan(scanExec) + transformV1Scan(fullPlan, scanExec) // data source V2 case scanExec: BatchScanExec => @@ -135,7 +137,7 @@ case class CometScanRule(session: SparkSession) } } - private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = { + private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = { if (COMET_DPP_FALLBACK_ENABLED.get() && scanExec.partitionFilters.exists(isDynamicPruningFilter)) { @@ -170,7 +172,7 @@ case class CometScanRule(session: SparkSession) nativeIcebergCompatScan(session, scanExec, r, hadoopConf) .getOrElse(scanExec) case SCAN_NATIVE_DATAFUSION => - nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) + nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec) case SCAN_NATIVE_ICEBERG_COMPAT => nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) } @@ -181,6 +183,7 @@ case class CometScanRule(session: SparkSession) } private def nativeDataFusionScan( + plan: SparkPlan, session: SparkSession, scanExec: FileSourceScanExec, r: HadoopFsRelation, @@ -196,6 +199,20 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support metadata columns") return None } + // input_file_name, input_file_block_start, and input_file_block_length read from + // InputFileBlockHolder, a thread-local set by Spark's FileScanRDD. The native DataFusion + // scan does not use FileScanRDD, so these expressions would return empty/default values. + if (plan.exists(node => + node.expressions.exists(_.exists { + case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength => true + case _ => false + }))) { + withInfo( + scanExec, + "Native DataFusion scan is not compatible with input_file_name, " + + "input_file_block_start, or input_file_block_length") + return None + } if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) { withInfo(scanExec, "Native DataFusion scan does not support row index generation") return None