Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOptions}

class CometParquetWriterSuite extends CometTestBase {
Expand Down Expand Up @@ -377,6 +378,276 @@ class CometParquetWriterSuite extends CometTestBase {
}
}

// NATIVE COMET WRITER TESTS WHICH FAIL IN SPARK
// https://github.com/apache/datafusion-comet/issues/3417
ignore("Spark compat: empty file should be skipped while write to file") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempPath { path =>
assertCometNativeWriterCaptured {
spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString)
}
val partFiles = path
.listFiles()
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
assert(partFiles.length === 2)
}
}
}

// https://github.com/apache/datafusion-comet/issues/3418
ignore("Spark compat: SPARK-33901 ctas should not change table's schema") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING parquet")
assertCometNativeWriterCaptured {
sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1")
}
checkAnswer(
sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"),
Seq(Row("char(5)"), Row("varchar(4)")))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3419
ignore("Spark compat: SPARK-37160 CREATE TABLE AS SELECT with CHAR_AS_VARCHAR") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(col CHAR(5)) USING parquet")
withSQLConf(SQLConf.CHAR_AS_VARCHAR.key -> "true") {
assertCometNativeWriterCaptured {
sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1")
}
checkAnswer(
sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"),
Seq(Row("varchar(5)")))
}
}
}
}

// https://github.com/apache/datafusion-comet/issues/3420
ignore("Spark compat: SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempPath { dir =>
val path = dir.toURI.getPath
withTable("tab1", "tab2") {
sql(s"""create table tab1 (a int) using parquet location '$path'""")
assertCometNativeWriterCaptured {
sql("insert into tab1 values(1)")
}
checkAnswer(sql("select * from tab1"), Seq(Row(1)))
sql("create table tab2 (a int) using parquet")
assertCometNativeWriterCaptured {
sql("insert into tab2 values(2)")
}
assertCometNativeWriterCaptured {
sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""")
}
sql("refresh table tab1")
checkAnswer(sql("select * from tab1"), Seq(Row(2)))
}
}
}
}

// https://github.com/apache/datafusion-comet/issues/3421
ignore("Spark compat: SPARK-38336 INSERT INTO with default columns positive tests") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t") {
sql("create table t(i boolean, s bigint) using parquet")
assertCometNativeWriterCaptured {
sql("insert into t(i) values(true)")
}
checkAnswer(spark.table("t"), Row(true, null))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3422
ignore("Spark compat: SPARK-38811 INSERT INTO on ALTER TABLE ADD COLUMNS positive tests") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t") {
sql("create table t(i boolean) using parquet")
sql("alter table t add column s string default concat('abc', 'def')")
assertCometNativeWriterCaptured {
sql("insert into t values(true, default)")
}
checkAnswer(spark.table("t"), Row(true, "abcdef"))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3423
ignore("Spark compat: SPARK-43071 INSERT INTO from non-projection queries") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t1", "t2") {
sql("create table t1(i boolean, s bigint default 42) using parquet")
assertCometNativeWriterCaptured {
sql("insert into t1 values (true, 41), (false, default)")
}
sql("create table t2(i boolean, s bigint) using parquet")
assertCometNativeWriterCaptured {
sql("insert into t2 select * from t1 order by s")
}
checkAnswer(spark.table("t2"), Seq(Row(true, 41), Row(false, 42)))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3424
ignore("Spark compat: Insert overwrite table command should output correct schema basic") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("tbl", "tbl2") {
withView("view1") {
val df = spark.range(10).toDF("id")
assertCometNativeWriterCaptured {
df.write.format("parquet").saveAsTable("tbl")
}
spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
spark.sql("CREATE TABLE tbl2(ID long) USING parquet")
assertCometNativeWriterCaptured {
spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
}
checkAnswer(spark.table("tbl2"), (0 until 10).map(Row(_)))
}
}
}
}

// https://github.com/apache/datafusion-comet/issues/3425
ignore("Spark compat: parquet timestamp conversion") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempPath { dir =>
assertCometNativeWriterCaptured {
spark
.range(1)
.selectExpr("current_timestamp() as ts")
.write
.parquet(dir.toString + "/spark")
}
val result = spark.read.parquet(dir.toString + "/spark").collect()
assert(result.length == 1)
}
}
}

// https://github.com/apache/datafusion-comet/issues/3426
ignore("Spark compat: INSERT INTO TABLE - complex type but different names") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("tab1", "tab2") {
sql("""CREATE TABLE tab1 (s struct<a: string, b: string>) USING parquet""")
sql("""CREATE TABLE tab2 (s struct<c: string, d: string>) USING parquet""")
assertCometNativeWriterCaptured {
sql("INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))")
}
assertCometNativeWriterCaptured {
sql("INSERT INTO tab2 SELECT * FROM tab1")
}
checkAnswer(spark.table("tab2"), Row(Row("x", "y")))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3427
ignore("Spark compat: Write Spark version into Parquet metadata") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempPath { dir =>
assertCometNativeWriterCaptured {
spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
}
val files = dir.listFiles().filter(_.getName.endsWith(".parquet"))
assert(files.nonEmpty, "Expected parquet files to be written")
}
}
}

// https://github.com/apache/datafusion-comet/issues/3428
ignore("Spark compat: write path implements onTaskCommit API correctly") {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTempDir { dir =>
val path = dir.getCanonicalPath
assertCometNativeWriterCaptured {
spark.range(10).repartition(10).write.mode("overwrite").parquet(path)
}
val files = new File(path).listFiles().filter(_.getName.startsWith("part-"))
assert(files.length > 0, "Expected part files to be written")
}
}
}

// COMET NATIVE WRITER Spark 4.x test failures
// https://github.com/apache/datafusion-comet/issues/3429
ignore("Spark compat: ctas with union") {
assume(isSpark40Plus)
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t") {
assertCometNativeWriterCaptured {
sql("CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT 2")
}
checkAnswer(spark.table("t"), Seq(Row(1), Row(2)))
}
}
}

// https://github.com/apache/datafusion-comet/issues/3430
ignore("Spark compat: SPARK-48817 test multi insert") {
assume(isSpark40Plus)
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
withTable("t1", "t2") {
sql("CREATE TABLE t1(a INT) USING parquet")
sql("CREATE TABLE t2(a INT) USING parquet")
assertCometNativeWriterCaptured {
sql("FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2 SELECT a")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it valid sql?

Copy link
Contributor Author

@coderfender coderfender Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comment @comphead. I had the same surprise as well when I followed the same approach as the test we are porting to comet test suites . Multi table inserts are supported in spark (carried over approach from hive syntax). Additional details here : https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Syntax.1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corresponding spark test :

  test("SPARK-48817: test multi inserts") {
    withTable("t1", "t2", "t3") {
      createTable("t1", Seq("i"), Seq("int"))
      createTable("t2", Seq("i"), Seq("int"))
      createTable("t3", Seq("i"), Seq("int"))
      sql(s"INSERT INTO t1 VALUES (1), (2), (3)")
      val df = sql(
        """
          |FROM (select /*+ REPARTITION(3) */ i from t1)
          |INSERT INTO t2 SELECT i
          |INSERT INTO t3 SELECT i
          |""".stripMargin
      )
     

Not sure if I am missing anything here

Copy link
Contributor Author

@coderfender coderfender Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
checkAnswer(spark.table("t1"), Row(1))
checkAnswer(spark.table("t2"), Row(1))
}
}
}

private def createTestData(inputDir: File): String = {
val inputPath = new File(inputDir, "input.parquet").getAbsolutePath
val schema = FuzzDataGenerator.generateSchema(
Expand Down Expand Up @@ -466,6 +737,11 @@ class CometParquetWriterSuite extends CometTestBase {
s"Expected exactly one CometNativeWriteExec in the plan, but found $nativeWriteCount:\n${plan.treeString}")
}

private def assertCometNativeWriterCaptured(op: => Unit): Unit = {
val plan = captureWritePlan(_ => op, "")
assertHasCometNativeWriteExec(plan)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@comphead , I wrote this simple util to match our other test patterns to verify that comet native writer is being used.

private def writeWithCometNativeWriteExec(
inputPath: String,
outputPath: String,
Expand Down