-
Notifications
You must be signed in to change notification settings - Fork 285
feat: Enable native Parquet writer in Spark SQL tests #3351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
coderfender
wants to merge
18
commits into
apache:main
Choose a base branch
from
coderfender:enable_spark_tests_parquet_writer
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
717bf59
enable_spark_tests_comet_native_writer
coderfender 4a6457d
enable_spark_tests_comet_native_writer
coderfender 9691bac
enable_spark_tests_comet_native_writer
coderfender cac97dd
enable_spark_tests_comet_native_writer
coderfender 2aae622
enable_spark_tests_comet_native_writer_fix_spark_4
coderfender dedba28
rebase_main
coderfender a2e63c3
rebase_main
coderfender 48eb856
rebase_main
coderfender 95bfe04
enable_spark_tests_comet_native_writer_fix_spark_4
coderfender ba8e297
enable_spark_tests_comet_native_writer_fix_spark_4
coderfender 9e5f3ca
enable_spark_tests_comet_native_writer_fix_spark_rebase_main
coderfender f515ee0
enable_spark_tests_comet_native_writer_fix_spark_rebase_main
coderfender dca1664
enable_spark_tests_comet_native_writer_fix_spark_rebase_main
coderfender d0ef401
enable_spark_tests_comet_native_writer_fix_spark_rebase_main
coderfender b71f8b2
enable_spark_tests_comet_native_writer_fix_spark_rebase_main
coderfender 7bb4e36
refactor_boolean_cast_ops_add_benchmarks
coderfender eb50466
Update spark/src/test/scala/org/apache/comet/parquet/CometParquetWrit…
coderfender 30a00de
refactor_boolean_cast_ops_add_benchmarks_rebase_main
coderfender File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ import org.apache.spark.sql.internal.SQLConf | |
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| import org.apache.comet.CometConf | ||
| import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus | ||
| import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOptions} | ||
|
|
||
| class CometParquetWriterSuite extends CometTestBase { | ||
|
|
@@ -377,6 +378,276 @@ class CometParquetWriterSuite extends CometTestBase { | |
| } | ||
| } | ||
|
|
||
| // NATIVE COMET WRITER TESTS WHICH FAIL IN SPARK | ||
| // https://github.com/apache/datafusion-comet/issues/3417 | ||
| ignore("Spark compat: empty file should be skipped while write to file") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTempPath { path => | ||
| assertCometNativeWriterCaptured { | ||
| spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString) | ||
| } | ||
| val partFiles = path | ||
| .listFiles() | ||
| .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) | ||
| assert(partFiles.length === 2) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3418 | ||
| ignore("Spark compat: SPARK-33901 ctas should not change table's schema") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("t1", "t2") { | ||
| sql("CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING parquet") | ||
| assertCometNativeWriterCaptured { | ||
| sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1") | ||
| } | ||
| checkAnswer( | ||
| sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"), | ||
| Seq(Row("char(5)"), Row("varchar(4)"))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3419 | ||
| ignore("Spark compat: SPARK-37160 CREATE TABLE AS SELECT with CHAR_AS_VARCHAR") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("t1", "t2") { | ||
| sql("CREATE TABLE t1(col CHAR(5)) USING parquet") | ||
| withSQLConf(SQLConf.CHAR_AS_VARCHAR.key -> "true") { | ||
| assertCometNativeWriterCaptured { | ||
| sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1") | ||
| } | ||
| checkAnswer( | ||
| sql("desc t2").selectExpr("data_type").where("data_type like '%char%'"), | ||
| Seq(Row("varchar(5)"))) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3420 | ||
| ignore("Spark compat: SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTempPath { dir => | ||
| val path = dir.toURI.getPath | ||
| withTable("tab1", "tab2") { | ||
| sql(s"""create table tab1 (a int) using parquet location '$path'""") | ||
| assertCometNativeWriterCaptured { | ||
| sql("insert into tab1 values(1)") | ||
| } | ||
| checkAnswer(sql("select * from tab1"), Seq(Row(1))) | ||
| sql("create table tab2 (a int) using parquet") | ||
| assertCometNativeWriterCaptured { | ||
| sql("insert into tab2 values(2)") | ||
| } | ||
| assertCometNativeWriterCaptured { | ||
| sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""") | ||
| } | ||
| sql("refresh table tab1") | ||
| checkAnswer(sql("select * from tab1"), Seq(Row(2))) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3421 | ||
| ignore("Spark compat: SPARK-38336 INSERT INTO with default columns positive tests") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("t") { | ||
| sql("create table t(i boolean, s bigint) using parquet") | ||
| assertCometNativeWriterCaptured { | ||
| sql("insert into t(i) values(true)") | ||
| } | ||
| checkAnswer(spark.table("t"), Row(true, null)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3422 | ||
| ignore("Spark compat: SPARK-38811 INSERT INTO on ALTER TABLE ADD COLUMNS positive tests") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("t") { | ||
| sql("create table t(i boolean) using parquet") | ||
| sql("alter table t add column s string default concat('abc', 'def')") | ||
| assertCometNativeWriterCaptured { | ||
| sql("insert into t values(true, default)") | ||
| } | ||
| checkAnswer(spark.table("t"), Row(true, "abcdef")) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3423 | ||
| ignore("Spark compat: SPARK-43071 INSERT INTO from non-projection queries") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("t1", "t2") { | ||
| sql("create table t1(i boolean, s bigint default 42) using parquet") | ||
| assertCometNativeWriterCaptured { | ||
| sql("insert into t1 values (true, 41), (false, default)") | ||
| } | ||
| sql("create table t2(i boolean, s bigint) using parquet") | ||
| assertCometNativeWriterCaptured { | ||
| sql("insert into t2 select * from t1 order by s") | ||
| } | ||
| checkAnswer(spark.table("t2"), Seq(Row(true, 41), Row(false, 42))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3424 | ||
| ignore("Spark compat: Insert overwrite table command should output correct schema basic") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("tbl", "tbl2") { | ||
| withView("view1") { | ||
| val df = spark.range(10).toDF("id") | ||
| assertCometNativeWriterCaptured { | ||
| df.write.format("parquet").saveAsTable("tbl") | ||
| } | ||
| spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") | ||
| spark.sql("CREATE TABLE tbl2(ID long) USING parquet") | ||
| assertCometNativeWriterCaptured { | ||
| spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") | ||
| } | ||
| checkAnswer(spark.table("tbl2"), (0 until 10).map(Row(_))) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3425 | ||
| ignore("Spark compat: parquet timestamp conversion") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTempPath { dir => | ||
| assertCometNativeWriterCaptured { | ||
| spark | ||
| .range(1) | ||
| .selectExpr("current_timestamp() as ts") | ||
| .write | ||
| .parquet(dir.toString + "/spark") | ||
| } | ||
| val result = spark.read.parquet(dir.toString + "/spark").collect() | ||
| assert(result.length == 1) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3426 | ||
| ignore("Spark compat: INSERT INTO TABLE - complex type but different names") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("tab1", "tab2") { | ||
| sql("""CREATE TABLE tab1 (s struct<a: string, b: string>) USING parquet""") | ||
| sql("""CREATE TABLE tab2 (s struct<c: string, d: string>) USING parquet""") | ||
| assertCometNativeWriterCaptured { | ||
| sql("INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))") | ||
| } | ||
| assertCometNativeWriterCaptured { | ||
| sql("INSERT INTO tab2 SELECT * FROM tab1") | ||
| } | ||
| checkAnswer(spark.table("tab2"), Row(Row("x", "y"))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3427 | ||
| ignore("Spark compat: Write Spark version into Parquet metadata") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTempPath { dir => | ||
| assertCometNativeWriterCaptured { | ||
| spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath) | ||
| } | ||
| val files = dir.listFiles().filter(_.getName.endsWith(".parquet")) | ||
| assert(files.nonEmpty, "Expected parquet files to be written") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3428 | ||
| ignore("Spark compat: write path implements onTaskCommit API correctly") { | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTempDir { dir => | ||
| val path = dir.getCanonicalPath | ||
| assertCometNativeWriterCaptured { | ||
| spark.range(10).repartition(10).write.mode("overwrite").parquet(path) | ||
| } | ||
| val files = new File(path).listFiles().filter(_.getName.startsWith("part-")) | ||
| assert(files.length > 0, "Expected part files to be written") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // COMET NATIVE WRITER Spark 4.x test failures | ||
| // https://github.com/apache/datafusion-comet/issues/3429 | ||
| ignore("Spark compat: ctas with union") { | ||
| assume(isSpark40Plus) | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("t") { | ||
| assertCometNativeWriterCaptured { | ||
| sql("CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT 2") | ||
| } | ||
| checkAnswer(spark.table("t"), Seq(Row(1), Row(2))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // https://github.com/apache/datafusion-comet/issues/3430 | ||
| ignore("Spark compat: SPARK-48817 test multi insert") { | ||
| assume(isSpark40Plus) | ||
| withSQLConf( | ||
| CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
| CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { | ||
| withTable("t1", "t2") { | ||
| sql("CREATE TABLE t1(a INT) USING parquet") | ||
| sql("CREATE TABLE t2(a INT) USING parquet") | ||
| assertCometNativeWriterCaptured { | ||
| sql("FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2 SELECT a") | ||
| } | ||
| checkAnswer(spark.table("t1"), Row(1)) | ||
| checkAnswer(spark.table("t2"), Row(1)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def createTestData(inputDir: File): String = { | ||
| val inputPath = new File(inputDir, "input.parquet").getAbsolutePath | ||
| val schema = FuzzDataGenerator.generateSchema( | ||
|
|
@@ -466,6 +737,11 @@ class CometParquetWriterSuite extends CometTestBase { | |
| s"Expected exactly one CometNativeWriteExec in the plan, but found $nativeWriteCount:\n${plan.treeString}") | ||
| } | ||
|
|
||
| private def assertCometNativeWriterCaptured(op: => Unit): Unit = { | ||
| val plan = captureWritePlan(_ => op, "") | ||
| assertHasCometNativeWriteExec(plan) | ||
| } | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @comphead , I wrote this simple util to match our other test patterns to verify that comet native writer is being used. |
||
| private def writeWithCometNativeWriteExec( | ||
| inputPath: String, | ||
| outputPath: String, | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it valid sql?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comment @comphead. I had the same surprise as well when I followed the same approach as the test we are porting to comet test suites . Multi table inserts are supported in spark (carried over approach from hive syntax). Additional details here : https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Syntax.1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corresponding spark test :
Not sure if I am missing anything here
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related links : https://github.com/apache/spark/blob/v3.5.1/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4#L444
reata/sqllineage#596