Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
.newBuilder()
.setOutputPath(outputPath)
.setCompression(codec)
.addAllColumnNames(cmd.query.output.map(_.name).asJava)
.addAllColumnNames(cmd.outputColumnNames.asJava)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operator {
      plan_id: 42
      parquet_writer: ParquetWriter {
        output_path: "file:/.../spark-warehouse/t"
        compression: SNAPPY
        column_names: ["i", "s"]       
      }
......

// Note: work_dir, job_id, and task_attempt_id will be set at execution time
// in CometNativeWriteExec, as they depend on the Spark task context

Expand Down Expand Up @@ -201,7 +201,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
throw new SparkException(s"Could not instantiate FileCommitProtocol: ${e.getMessage}")
}

CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId)
CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId, cmd.catalogTable)
}

private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -63,7 +64,8 @@ case class CometNativeWriteExec(
child: SparkPlan,
outputPath: String,
committer: Option[FileCommitProtocol] = None,
jobTrackerID: String = Utils.createTempDir().getName)
jobTrackerID: String = Utils.createTempDir().getName,
catalogTable: Option[CatalogTable] = None)
extends CometNativeExec
with UnaryExecNode {

Expand Down Expand Up @@ -135,6 +137,11 @@ case class CometNativeWriteExec(
}
}

// Refresh the catalog table cache so subsequent reads see the new data
catalogTable.foreach { ct =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was different issue - while running the test, realised table needs to be refreshed to get the new data.

session.catalog.refreshTable(ct.identifier.quotedString)
}

// Return empty RDD as write operations don't return data
sparkContext.emptyRDD[InternalRow]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,121 @@ class CometParquetWriterSuite extends CometTestBase {
}
}

private def withNativeWriteConf(f: => Unit): Unit = {
withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") {
f
}
}

// SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests
// Mirrors the Spark InsertSuite test to validate Comet native writer compatibility.

test("SPARK-38811: simple default value with concat expression") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i boolean) using parquet")
sql("alter table t add column s string default concat('abc', 'def')")
sql("insert into t values(true, default)")
checkAnswer(spark.table("t"), Row(true, "abcdef"))
}
}
}

test("SPARK-38811: multiple trailing default values") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i int) using parquet")
sql("alter table t add column s bigint default 42")
sql("alter table t add column x bigint default 43")
sql("insert into t(i) values(1)")
checkAnswer(spark.table("t"), Row(1, 42, 43))
}
}
}

test("SPARK-38811: multiple trailing defaults via add columns") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i int) using parquet")
sql("alter table t add columns s bigint default 42, x bigint default 43")
sql("insert into t(i) values(1)")
checkAnswer(spark.table("t"), Row(1, 42, 43))
}
}
}

test("SPARK-38811: default with nullable column (no default)") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i int) using parquet")
sql("alter table t add column s bigint default 42")
sql("alter table t add column x bigint")
sql("insert into t(i) values(1)")
checkAnswer(spark.table("t"), Row(1, 42, null))
}
}
}

test("SPARK-38811: expression default (41 + 1)") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i boolean) using parquet")
sql("alter table t add column s bigint default 41 + 1")
sql("insert into t(i) values(default)")
checkAnswer(spark.table("t"), Row(null, 42))
}
}
}

test("SPARK-38811: explicit defaults in multiple positions") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i boolean default false) using parquet")
sql("alter table t add column s bigint default 42")
sql("insert into t values(false, default), (default, 42)")
checkAnswer(spark.table("t"), Seq(Row(false, 42), Row(false, 42)))
}
}
}

test("SPARK-38811: default with alias over VALUES") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i boolean) using parquet")
sql("alter table t add column s bigint default 42")
sql("insert into t select * from values (false, default) as tab(col, other)")
checkAnswer(spark.table("t"), Row(false, 42))
}
}
}

test("SPARK-38811: default value in wrong order evaluates to NULL") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i boolean) using parquet")
sql("alter table t add column s bigint default 42")
sql("insert into t values (default, 43)")
checkAnswer(spark.table("t"), Row(null, 43))
}
}
}

// INSERT INTO ... SELECT with native write config fails due to pre-existing
// catalog refresh issue tracked separately. Skipping these variants.
ignore("SPARK-38811: default via SELECT statement") {
withNativeWriteConf {
withTable("t") {
sql("create table t(i boolean) using parquet")
sql("alter table t add column s bigint default 42")
sql("insert into t select false, default")
checkAnswer(spark.table("t"), Row(false, 42))
}
}
}

private def createTestData(inputDir: File): String = {
val inputPath = new File(inputDir, "input.parquet").getAbsolutePath
val schema = FuzzDataGenerator.generateSchema(
Expand Down