Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 16 additions & 124 deletions dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ index 1f8c5822e7d..b7de4e28813 100644
WITH t(c1) AS (SELECT replace(listagg(DISTINCT col1 COLLATE unicode_rtrim) COLLATE utf8_binary, ' ', '') FROM (VALUES ('xbc '), ('xbc '), ('a'), ('xbc'))) SELECT len(c1), regexp_count(c1, 'a'), regexp_count(c1, 'xbc') FROM t
-- !query schema
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 0f42502f1d9..f616024a9c2 100644
index 0f42502f1d9..146682eb9d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants
Expand All @@ -354,18 +354,7 @@ index 0f42502f1d9..f616024a9c2 100644
}

test("A cached table preserves the partitioning and ordering of its cached SparkPlan") {
@@ -1626,7 +1627,9 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}

- test("SPARK-35332: Make cache plan disable configs configurable - check AQE") {
+ test("SPARK-35332: Make cache plan disable configs configurable - check AQE",
+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
@@ -1661,7 +1664,12 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
@@ -1661,7 +1662,12 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
_.nodeName.contains("AdaptiveSparkPlan"))
val aqePlanRoot = findNodeInSparkPlanInfo(inMemoryScanNode.get,
_.nodeName.contains("ResultQueryStage"))
Expand All @@ -380,7 +369,7 @@ index 0f42502f1d9..f616024a9c2 100644

withTempView("t0", "t1", "t2") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 9db406ff12f..abbc91f5c11 100644
index 9db406ff12f..245e4caa319 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
Expand All @@ -401,17 +390,6 @@ index 9db406ff12f..abbc91f5c11 100644
}
assert(exchangePlans.length == 1)
}
@@ -2241,7 +2241,9 @@ class DataFrameAggregateSuite extends QueryTest
}
}

- test("SPARK-47430 Support GROUP BY MapType") {
+ test("SPARK-47430 Support GROUP BY MapType",
+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
def genMapData(dataType: String): String = {
s"""
|case when id % 4 == 0 then map()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index ed182322aec..1ae6afa686a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
Expand Down Expand Up @@ -1212,7 +1190,7 @@ index 0df7f806272..52d33d67328 100644

test("non-matching optional group") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 2e33f6505ab..e1e93ab3bad 100644
index 2e33f6505ab..47fa031add5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -23,10 +23,11 @@ import org.apache.spark.SparkRuntimeException
Expand Down Expand Up @@ -1250,14 +1228,7 @@ index 2e33f6505ab..e1e93ab3bad 100644
}
assert(exchanges.size === 1)
}
@@ -2674,22 +2681,32 @@ class SubquerySuite extends QueryTest
}
}

- test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery") {
+ test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery",
+ IgnoreComet("TODO: ignore for first stage of 4.0, " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
@@ -2678,18 +2685,26 @@ class SubquerySuite extends QueryTest
def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
val df = sql(query)
checkAnswer(df, answer)
Expand Down Expand Up @@ -1308,15 +1279,10 @@ index fee375db10a..8c2c24e2c5f 100644
val v = VariantBuilder.parseJson(s, false)
new VariantVal(v.getValue, v.getMetadata)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
index 11e9547dfc5..be9ae40ab3d 100644
index 11e9547dfc5..d3bb92ae7e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
@@ -20,10 +20,11 @@ package org.apache.spark.sql.collation
import scala.jdk.CollectionConverters.MapHasAsJava

import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row}
@@ -24,6 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.CollationFactory
Expand All @@ -1334,44 +1300,22 @@ index 11e9547dfc5..be9ae40ab3d 100644
}.nonEmpty
)
}
@@ -1505,7 +1508,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}
}

- test("hash join should be used for collated strings if sort merge join is not forced") {
+ test("hash join should be used for collated strings if sort merge join is not forced",
+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
val t1 = "T_1"
val t2 = "T_2"

@@ -1611,6 +1616,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
@@ -1611,6 +1614,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
} else {
assert(!collectFirst(queryPlan) {
case b: BroadcastHashJoinExec => b.leftKeys.head
+ case b: CometBroadcastHashJoinExec => b.leftKeys.head
}.head.isInstanceOf[ArrayTransform])
}
}
@@ -1676,6 +1682,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
@@ -1676,6 +1680,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
} else {
assert(!collectFirst(queryPlan) {
case b: BroadcastHashJoinExec => b.leftKeys.head
+ case b: CometBroadcastHashJoinExec => b.leftKeys.head
}.head.isInstanceOf[ArrayTransform])
}
}
@@ -1815,7 +1822,9 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
}
}

- test("rewrite with collationkey shouldn't disrupt multiple join conditions") {
+ test("rewrite with collationkey shouldn't disrupt multiple join conditions",
+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
val t1 = "T_1"
val t2 = "T_2"

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 3eeed2e4175..9f21d547c1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
Expand Down Expand Up @@ -1461,15 +1405,10 @@ index 2a0ab21ddb0..6030e7c2b9b 100644
} finally {
spark.listenerManager.unregister(listener)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index c73e8e16fbb..88cd0d47da3 100644
index c73e8e16fbb..26d0cddd34a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -20,10 +20,11 @@ import java.sql.Timestamp
import java.util.Collections

import org.apache.spark.SparkConf
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, IgnoreComet, Row}
@@ -24,6 +24,7 @@ import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression}
import org.apache.spark.sql.catalyst.plans.physical
Expand Down Expand Up @@ -1503,17 +1442,6 @@ index c73e8e16fbb..88cd0d47da3 100644
})
}

@@ -370,7 +372,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
checkAnswer(df.sort("res"), Seq(Row(10.0), Row(15.5), Row(41.0)))
}

- test("SPARK-48655: order by on partition keys should not introduce additional shuffle") {
+ test("SPARK-48655: order by on partition keys should not introduce additional shuffle",
+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
val items_partitions = Array(identity("price"), identity("id"))
createTable(items, itemsColumns, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index f62e092138a..c0404bfe85e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
Expand Down Expand Up @@ -1583,15 +1511,12 @@ index 418ca3430bb..eb8267192f8 100644
withTempPath { path =>
val dir = path.getCanonicalPath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
index d1b11a74cf3..08087c80201 100644
index d1b11a74cf3..1950d363dba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/InsertSortForLimitAndOffsetSuite.scala
@@ -17,8 +17,9 @@

package org.apache.spark.sql.execution
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

-import org.apache.spark.sql.{Dataset, QueryTest}
+import org.apache.spark.sql.{Dataset, IgnoreComet, QueryTest}
import org.apache.spark.sql.{Dataset, QueryTest}
import org.apache.spark.sql.IntegratedUDFTestUtils._
+import org.apache.spark.sql.comet.CometCollectLimitExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand All @@ -1606,39 +1531,6 @@ index d1b11a74cf3..08087c80201 100644
case _ => false
}.isDefined)
}
@@ -77,7 +78,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
assert(!hasLocalSort(physicalPlan))
}

- test("root LIMIT preserves data ordering with CollectLimitExec") {
+ test("root LIMIT preserves data ordering with CollectLimitExec",
+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
val df = spark.range(10).orderBy($"id" % 8).limit(2)
df.collect()
@@ -88,7 +91,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
}
}

- test("middle LIMIT preserves data ordering with the extra sort") {
+ test("middle LIMIT preserves data ordering with the extra sort",
+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
withSQLConf(
SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1",
// To trigger the bug, we have to disable the coalescing optimization. Otherwise we use only
@@ -117,7 +122,9 @@ class InsertSortForLimitAndOffsetSuite extends QueryTest
assert(!hasLocalSort(physicalPlan))
}

- test("middle OFFSET preserves data ordering with the extra sort") {
+ test("middle OFFSET preserves data ordering with the extra sort",
+ IgnoreComet("TODO: ignore for first stage of 4.0 " +
+ "https://github.com/apache/datafusion-comet/issues/1948")) {
val df = 1.to(10).map(v => v -> v).toDF("c1", "c2").orderBy($"c1" % 8)
verifySortAdded(df.offset(2))
verifySortAdded(df.filter($"c2" > rand()).offset(2))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
index 743ec41dbe7..9f30d6c8e04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
Expand Down Expand Up @@ -3466,7 +3358,7 @@ index 86c4e49f6f6..2e639e5f38d 100644
val tblTargetName = "tbl_target"
val tblSourceQualified = s"default.$tblSourceName"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index f0f3f94b811..d64e4e54e22 100644
index f0f3f94b811..fb836730ecf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -27,13 +27,14 @@ import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -3538,7 +3430,7 @@ index f0f3f94b811..d64e4e54e22 100644
+ */
+ protected def enableCometAnsiMode: Boolean = {
+ val v = System.getenv("ENABLE_COMET_ANSI_MODE")
+ v != null && v.toBoolean
+ if (v != null) v.toBoolean else true
+ }
+
+ /**
Expand Down
Loading