Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 9 additions & 69 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -238,20 +238,6 @@ index e5494726695..00937f025c2 100644
}

test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 9e8d77c53f3..855e3ada7d1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -790,7 +790,8 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
}
}

- test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD") {
+ test("input_file_name, input_file_block_start, input_file_block_length - FileScanRDD",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) {
withTempPath { dir =>
val data = sparkContext.parallelize(0 to 10).toDF("id")
data.write.parquet(dir.getCanonicalPath)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 6f3090d8908..c08a60fb0c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
Expand Down Expand Up @@ -1084,20 +1070,6 @@ index 04702201f82..5ee11f83ecf 100644
}
assert(exchanges.size === 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 9f8e979e3fb..3bc9dab8023 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -87,7 +87,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
spark.catalog.dropTempView("tmp_table")
}

- test("SPARK-8005 input_file_name") {
+ test("SPARK-8005 input_file_name",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) {
withTempPath { dir =>
val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
data.write.parquet(dir.getCanonicalPath)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index d269290e616..13726a31e07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
Expand Down Expand Up @@ -2504,42 +2476,32 @@ index 5cdbdc27b32..307fba16578 100644
spark.range(10).selectExpr("id", "id % 3 as p")
.write.partitionBy("p").saveAsTable("testDataForScan")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
index 0ab8691801d..7b81f3a8f6d 100644
index 0ab8691801d..b18a5bea944 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
@@ -17,7 +17,9 @@

@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.python

+import org.apache.spark.sql.IgnoreCometNativeDataFusion
import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit}
+import org.apache.spark.sql.comet._
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
assert(arrowEvalNodes.size == 2)
}

- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") {
+ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { f =>
spark.range(10).select($"id".as("a"), $"id".as("b"))
@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {

val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
+ case scan: CometScanExec => scan
+ case scan: CometNativeScanExec => scan
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {

val scanNodes = query.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan
+ case scan: CometScanExec => scan
+ case scan: CometNativeScanExec => scan
}
assert(scanNodes.length == 1)
// $"a" is not null and $"a" > 1
Expand All @@ -2548,21 +2510,22 @@ index 0ab8691801d..7b81f3a8f6d 100644
+ val dataFilters = scanNodes.head match {
+ case scan: FileSourceScanExec => scan.dataFilters
+ case scan: CometScanExec => scan.dataFilters
+ case scan: CometNativeScanExec => scan.dataFilters
+ }
+ assert(dataFilters.length == 2)
+ assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a"))
}
}
}
@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {

val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
+ case scan: CometBatchScanExec => scan
}
assert(scanNodes.length == 1)
assert(scanNodes.head.output.map(_.name) == Seq("a"))
@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {

val scanNodes = query.queryExecution.executedPlan.collect {
case scan: BatchScanExec => scan
Expand Down Expand Up @@ -3243,29 +3206,6 @@ index de3b1ffccf0..2a76d127093 100644

override def beforeEach(): Unit = {
super.beforeEach()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index f3be79f9022..b4b1ea8dbc4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
import org.apache.hadoop.io.{LongWritable, Writable}

import org.apache.spark.{SparkException, SparkFiles, TestUtils}
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreCometNativeDataFusion, QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.WholeStageCodegenExec
@@ -448,7 +448,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
}
}

- test("SPARK-11522 select input_file_name from non-parquet table") {
+ test("SPARK-11522 select input_file_name from non-parquet table",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3312")) {

withTempDir { tempDir =>

diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 6160c3e5f6c..0956d7d9edc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
Expand Down
27 changes: 22 additions & 5 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
Expand Down Expand Up @@ -110,7 +110,9 @@ case class CometScanRule(session: SparkSession)
metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix))
}

def transformScan(plan: SparkPlan): SparkPlan = plan match {
val fullPlan = plan

def transformScan(scanNode: SparkPlan): SparkPlan = scanNode match {
case scan if !CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) =>
withInfo(scan, "Comet Scan is not enabled")

Expand All @@ -119,7 +121,7 @@ case class CometScanRule(session: SparkSession)

// data source V1
case scanExec: FileSourceScanExec =>
transformV1Scan(scanExec)
transformV1Scan(fullPlan, scanExec)

// data source V2
case scanExec: BatchScanExec =>
Expand All @@ -135,7 +137,7 @@ case class CometScanRule(session: SparkSession)
}
}

private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = {

if (COMET_DPP_FALLBACK_ENABLED.get() &&
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
Expand Down Expand Up @@ -170,7 +172,7 @@ case class CometScanRule(session: SparkSession)
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
.getOrElse(scanExec)
case SCAN_NATIVE_DATAFUSION =>
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_ICEBERG_COMPAT =>
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
}
Expand All @@ -181,6 +183,7 @@ case class CometScanRule(session: SparkSession)
}

private def nativeDataFusionScan(
plan: SparkPlan,
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
Expand All @@ -196,6 +199,20 @@ case class CometScanRule(session: SparkSession)
withInfo(scanExec, "Native DataFusion scan does not support metadata columns")
return None
}
// input_file_name, input_file_block_start, and input_file_block_length read from
// InputFileBlockHolder, a thread-local set by Spark's FileScanRDD. The native DataFusion
// scan does not use FileScanRDD, so these expressions would return empty/default values.
if (plan.exists(node =>
node.expressions.exists(_.exists {
case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength => true
case _ => false
}))) {
withInfo(
scanExec,
"Native DataFusion scan is not compatible with input_file_name, " +
"input_file_block_start, or input_file_block_length")
return None
}
if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
withInfo(scanExec, "Native DataFusion scan does not support row index generation")
return None
Expand Down
Loading