diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index beef445490..d2d72e9d68 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -238,20 +238,6 @@ index e5494726695..00937f025c2 100644 } test("A cached table preserves the partitioning and ordering of its cached SparkPlan") { -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -index 9e8d77c53f3..855e3ada7d1 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession { - } - } - -- test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") { -+ test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withTempPath { dir => - val data = sparkContext.parallelize(0 to 10).toDF("id") - data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 6f3090d8908..c08a60fb0c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1084,20 +1070,6 @@ index 04702201f82..5ee11f83ecf 100644 } assert(exchanges.size === 1) } -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -index 9f8e979e3fb..3bc9dab8023 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala -@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession { - spark.catalog.dropTempView("tmp_table") - } - -- test("SPARK-8005 input_file_name") { -+ test("SPARK-8005 input_file_name", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withTempPath { dir => - val data = sparkContext.parallelize(0 to 10, 2).toDF("id") - data.write.parquet(dir.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index d269290e616..13726a31e07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -2504,42 +2476,32 @@ index 5cdbdc27b32..307fba16578 100644 spark.range(10).selectExpr("id", "id % 3 as p") .write.partitionBy("p").saveAsTable("testDataForScan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -index 0ab8691801d..7b81f3a8f6d 100644 +index 0ab8691801d..b18a5bea944 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala -@@ -17,7 +17,9 @@ - +@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python -+import org.apache.spark.sql.IgnoreCometNativeDataFusion import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit} +import org.apache.spark.sql.comet._ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { - assert(arrowEvalNodes.size == 2) - } - -- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { -+ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - withTempPath { f => - spark.range(10).select($"id".as("a"), $"id".as("b")) -@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: FileSourceScanExec => scan + case scan: CometScanExec => scan ++ case scan: CometNativeScanExec => scan } assert(scanNodes.length == 1) // $"a" is not null and $"a" > 1 @@ -2548,13 +2510,14 @@ index 0ab8691801d..7b81f3a8f6d 100644 + val dataFilters = scanNodes.head match { + case scan: FileSourceScanExec => scan.dataFilters + case scan: CometScanExec => scan.dataFilters ++ case scan: CometNativeScanExec => scan.dataFilters + } + assert(dataFilters.length == 2) + assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a")) } } } -@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -2562,7 +2525,7 @@ index 0ab8691801d..7b81f3a8f6d 100644 } assert(scanNodes.length == 1) assert(scanNodes.head.output.map(_.name) == Seq("a")) -@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { +@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession { val scanNodes = query.queryExecution.executedPlan.collect { case scan: BatchScanExec => scan @@ -3243,29 +3206,6 @@ index de3b1ffccf0..2a76d127093 100644 override def beforeEach(): Unit = { super.beforeEach() -diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -index f3be79f9022..b4b1ea8dbc4 100644 ---- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn - import org.apache.hadoop.io.{LongWritable, Writable} - - import org.apache.spark.{SparkException, SparkFiles, TestUtils} --import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest, Row} - import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode - import org.apache.spark.sql.catalyst.plans.logical.Project - import org.apache.spark.sql.execution.WholeStageCodegenExec -@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - } - } - -- test("SPARK-11522 select input_file_name from non-parquet table") { -+ test("SPARK-11522 select input_file_name from non-parquet table", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) { - - withTempDir { tempDir => - diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6160c3e5f6c..0956d7d9edc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index acdf2afc0d..3b727f07e4 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues @@ -110,7 +110,9 @@ case class CometScanRule(session: SparkSession) metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix)) } - def transformScan(plan: SparkPlan): SparkPlan = plan match { + val fullPlan = plan + + def transformScan(scanNode: SparkPlan): SparkPlan = scanNode match { case scan if !CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) => withInfo(scan, "Comet Scan is not enabled") @@ -119,7 +121,7 @@ case class CometScanRule(session: SparkSession) // data source V1 case scanExec: FileSourceScanExec => - transformV1Scan(scanExec) + transformV1Scan(fullPlan, scanExec) // data source V2 case scanExec: BatchScanExec => @@ -135,7 +137,7 @@ case class CometScanRule(session: SparkSession) } } - private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = { + private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = { if (COMET_DPP_FALLBACK_ENABLED.get() && scanExec.partitionFilters.exists(isDynamicPruningFilter)) { @@ -170,7 +172,7 @@ case class CometScanRule(session: SparkSession) nativeIcebergCompatScan(session, scanExec, r, hadoopConf) .getOrElse(scanExec) case SCAN_NATIVE_DATAFUSION => - nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) + nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec) case SCAN_NATIVE_ICEBERG_COMPAT => nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) } @@ -181,6 +183,7 @@ case class CometScanRule(session: SparkSession) } private def nativeDataFusionScan( + plan: SparkPlan, session: SparkSession, scanExec: FileSourceScanExec, r: HadoopFsRelation, @@ -196,6 +199,20 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support metadata columns") return None } + // input_file_name, input_file_block_start, and input_file_block_length read from + // InputFileBlockHolder, a thread-local set by Spark's FileScanRDD. The native DataFusion + // scan does not use FileScanRDD, so these expressions would return empty/default values. + if (plan.exists(node => + node.expressions.exists(_.exists { + case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength => true + case _ => false + }))) { + withInfo( + scanExec, + "Native DataFusion scan is not compatible with input_file_name, " + + "input_file_block_start, or input_file_block_length") + return None + } if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) { withInfo(scanExec, "Native DataFusion scan does not support row index generation") return None