diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md index 449d8871edbe..56899796eab5 100644 --- a/docs/docs/spark/sql-write.md +++ b/docs/docs/spark/sql-write.md @@ -314,7 +314,7 @@ INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c; ## COPY INTO -`COPY INTO` provides a SQL command for bulk loading data files into Paimon tables and exporting table data to files. Supported formats: **CSV** and **JSON**. +`COPY INTO` provides a SQL command for bulk loading data files into Paimon tables and exporting table data to files. Supported formats: **CSV**, **JSON**, and **Parquet**. ### CSV Import @@ -383,6 +383,37 @@ FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE); JSON columns are matched **by column name** (not by position), so source field order does not matter. +### Parquet Import + +```sql +COPY INTO table_name [(col1, col2, ...)] +FROM 'source_path' +FILE_FORMAT = (TYPE = PARQUET [, option = value, ...]) +[PATTERN = 'regex'] +[FORCE = TRUE|FALSE] +[ON_ERROR = ABORT_STATEMENT] +``` + +**Basic import:** + +```sql +COPY INTO my_db.my_table +FROM '/data/parquet_files/' +FILE_FORMAT = (TYPE = PARQUET); +``` + +**Import with PATTERN:** + +```sql +COPY INTO my_db.events +FROM '/data/lake/' +FILE_FORMAT = (TYPE = PARQUET) +PATTERN = '.*\.parquet' +FORCE = FALSE; +``` + +Parquet columns are matched **by column name** (not by position). Extra columns in the source files are ignored; missing columns become NULL. + ### Write CSV Files ```sql @@ -419,15 +450,42 @@ FILE_FORMAT = (TYPE = JSON) OVERWRITE = TRUE; ``` +### Write Parquet Files + +```sql +COPY INTO 'target_path' +FROM table_name +FILE_FORMAT = (TYPE = PARQUET [, option = value, ...]) +[OVERWRITE = TRUE|FALSE] +``` + +**Basic Parquet export:** + +```sql +COPY INTO '/export/data_backup/' +FROM my_db.events +FILE_FORMAT = (TYPE = PARQUET) +OVERWRITE = TRUE; +``` + +**Export with compression:** + +```sql +COPY INTO '/export/data_compressed/' +FROM my_db.events +FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP) +OVERWRITE = TRUE; +``` + ### FILE_FORMAT Options -`FILE_FORMAT` is required and must include `TYPE = CSV` or `TYPE = JSON`. +`FILE_FORMAT` is required and must include `TYPE = CSV`, `TYPE = JSON`, or `TYPE = PARQUET`. **CSV import options:** | Option | Description | Default | |--------|-------------|---------| -| TYPE | File format type. `CSV` or `JSON`. | (required) | +| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) | | FIELD_DELIMITER | Column delimiter character. | `,` | | SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` | | QUOTE | Quote character for enclosing fields. | `"` | @@ -440,17 +498,24 @@ OVERWRITE = TRUE; | Option | Description | Default | |--------|-------------|---------| -| TYPE | File format type. `CSV` or `JSON`. | (required) | +| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) | | MULTI_LINE | Parse multi-line JSON (e.g. JSON arrays or pretty-printed objects). | `FALSE` | | NULL_IF | List of string values to interpret as NULL. | (none) | | EMPTY_FIELD_AS_NULL | Treat empty string values as NULL. | `FALSE` | | COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` | +**Parquet import options:** + +| Option | Description | Default | +|--------|-------------|---------| +| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) | +| COMPRESSION | Compression codec. Usually auto-detected; rarely needed for import. | (auto) | + **CSV write options:** | Option | Description | Default | |--------|-------------|---------| -| TYPE | File format type. `CSV` or `JSON`. | (required) | +| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) | | FIELD_DELIMITER | Column delimiter character. | `,` | | HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` | | QUOTE | Quote character for enclosing fields. | `"` | @@ -461,11 +526,18 @@ OVERWRITE = TRUE; | Option | Description | Default | |--------|-------------|---------| -| TYPE | File format type. `CSV` or `JSON`. | (required) | +| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) | | DATE_FORMAT | Custom date format pattern. | Spark default | | TIMESTAMP_FORMAT | Custom timestamp format pattern. | Spark default | | COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` | +**Parquet write options:** + +| Option | Description | Default | +|--------|-------------|---------| +| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) | +| COMPRESSION | Compression codec (`SNAPPY`, `GZIP`, `NONE`, etc.). | `SNAPPY` | + ### Import Options | Option | Description | Default | @@ -486,7 +558,8 @@ When an explicit column list is provided (e.g., `COPY INTO t (col1, col2) FROM . - **CSV**: Columns are mapped **positionally** to the specified column list. - **JSON**: Columns are matched **by name** to the specified column list. -- The number of source columns must match the column list length (CSV). For JSON, missing fields in the source become NULL. +- **Parquet**: Columns are matched **by name** to the specified column list. +- The number of source columns must match the column list length (CSV). For JSON and Parquet, missing fields in the source become NULL. - Columns not in the list are filled with their **DEFAULT value** (if defined in the table schema) or **NULL**. - Non-nullable columns without a default value that are not in the list will cause an error. @@ -524,7 +597,7 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files have been successfull ### Limitations -- Only **CSV** and **JSON** formats are supported. +- Only **CSV**, **JSON**, and **Parquet** formats are supported. - Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not supported. - `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the entire command. - `SINGLE = TRUE` (single-file output) is not supported. diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index f87884634af8..1bbbca478bc8 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -206,6 +206,7 @@ nonReserved | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | ABORT_STATEMENT | OVERWRITE | CSV | JSON + | PARQUET ; ALTER: 'ALTER'; @@ -248,6 +249,7 @@ ABORT_STATEMENT: 'ABORT_STATEMENT'; OVERWRITE: 'OVERWRITE'; CSV: 'CSV'; JSON: 'JSON'; +PARQUET: 'PARQUET'; PLUS: '+'; MINUS: '-'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala index ae5d56eaea5f..4730307193f6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala @@ -23,6 +23,7 @@ sealed trait FileFormatType object FileFormatType { case object CSV extends FileFormatType case object JSON extends FileFormatType + case object PARQUET extends FileFormatType case class Unsupported(name: String) extends FileFormatType } @@ -53,6 +54,15 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin case _ => } } + case FileFormatType.PARQUET => + mapped.remove("mode") + options.foreach { + case (k, v) => + k match { + case "COMPRESSION" => mapped("compression") = v + case _ => + } + } case _ => } mapped.toMap @@ -83,6 +93,14 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin case _ => } } + case FileFormatType.PARQUET => + options.foreach { + case (k, v) => + k match { + case "COMPRESSION" => mapped("compression") = v + case _ => + } + } case _ => } mapped.toMap @@ -108,6 +126,7 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin } val validKeys = formatType match { case FileFormatType.JSON => CopyFileFormat.VALID_JSON_IMPORT_KEYS + case FileFormatType.PARQUET => CopyFileFormat.VALID_PARQUET_IMPORT_KEYS case _ => CopyFileFormat.VALID_CSV_IMPORT_KEYS } val invalid = options.keys.filterNot(validKeys.contains) @@ -132,6 +151,7 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin validateFormatType() val validKeys = formatType match { case FileFormatType.JSON => CopyFileFormat.VALID_JSON_EXPORT_KEYS + case FileFormatType.PARQUET => CopyFileFormat.VALID_PARQUET_EXPORT_KEYS case _ => CopyFileFormat.VALID_CSV_EXPORT_KEYS } val invalid = options.keys.filterNot(validKeys.contains) @@ -145,9 +165,10 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin formatType match { case FileFormatType.CSV => case FileFormatType.JSON => + case FileFormatType.PARQUET => case FileFormatType.Unsupported(name) => throw new IllegalArgumentException( - s"Unsupported file format type: $name. Supported types: CSV, JSON") + s"Unsupported file format type: $name. Supported types: CSV, JSON, PARQUET") } } } @@ -185,6 +206,14 @@ object CopyFileFormat { "TIMESTAMP_FORMAT" ) + val VALID_PARQUET_IMPORT_KEYS: Set[String] = Set( + "COMPRESSION" + ) + + val VALID_PARQUET_EXPORT_KEYS: Set[String] = Set( + "COMPRESSION" + ) + // Unit Separator (U+001F) used to encode multi-value lists in a single string val LIST_SEPARATOR: String = "\u001f" @@ -192,6 +221,7 @@ object CopyFileFormat { typeStr.toUpperCase match { case "CSV" => FileFormatType.CSV case "JSON" => FileFormatType.JSON + case "PARQUET" => FileFormatType.PARQUET case other => FileFormatType.Unsupported(other) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala index 69387a5071bf..45471bab8458 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala @@ -54,6 +54,8 @@ case class CopyIntoLocationExec( fileFormat.formatType match { case FileFormatType.JSON => df.write.options(writerOptions).mode(saveMode).json(targetPath) + case FileFormatType.PARQUET => + df.write.options(writerOptions).mode(saveMode).parquet(targetPath) case _ => df.write.options(writerOptions).mode(saveMode).csv(targetPath) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala index 4763d8c784ad..5ef05b4188a9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala @@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable import org.apache.paimon.types.DataField import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -48,7 +49,8 @@ case class CopyIntoTableExec( pattern: Option[String], force: Boolean, out: Seq[Attribute]) - extends PaimonLeafV2CommandExec { + extends PaimonLeafV2CommandExec + with Logging { override def output: Seq[Attribute] = out @@ -72,9 +74,168 @@ case class CopyIntoTableExec( } val filePaths = filesToLoad.map(_.getPath.toString) - val stringSchema = buildStringSchema(targetColumns) val readerOptions = fileFormat.toSparkReaderOptions + fileFormat.formatType match { + case FileFormatType.PARQUET => + runParquetImport( + paimonTable, + filePaths, + targetColumns, + writableColumns, + fields, + filesToLoad, + skippedFiles, + readerOptions) + case _ => + runTextImport( + paimonTable, + filePaths, + targetColumns, + writableColumns, + fields, + filesToLoad, + skippedFiles, + readerOptions) + } + } + + /** + * Parquet import pipeline. Unlike CSV/JSON which read as strings then cast, Parquet files already + * have typed columns, so the flow is: + * 1. Read source Parquet with native types + * 2. Project and cast columns to match target table schema (by column name, not position) + * 3. Validate that no non-null values become null after casting (detect type incompatibility) + * 4. Write to Paimon table + * 5. Record load history for idempotent re-runs (FORCE=FALSE dedup) + */ + private def runParquetImport( + paimonTable: FileStoreTable, + filePaths: Array[String], + targetColumns: Seq[String], + writableColumns: Seq[String], + fields: Seq[DataField], + filesToLoad: Array[FileStatus], + skippedFiles: Array[FileStatus], + readerOptions: Map[String, String]): Seq[InternalRow] = { + val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*) + + val selectedDf = buildParquetDataFrame(rawDf, targetColumns, writableColumns, fields) + validateParquetCast(rawDf, targetColumns, writableColumns, fields) + + val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident) + selectedDf.write.format("paimon").mode("append").insertInto(tableName) + + val countDf = spark.read.options(readerOptions).parquet(filePaths: _*) + recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles, countDf) + } + + /** + * Build the projection DataFrame for Parquet import. Maps source columns to target table columns + * by name (case-insensitive). For each writable column: + * - If it's in targetColumns AND exists in source: cast source column to target type + * - If it's in targetColumns but missing from source: fill with NULL + * - If it's NOT in targetColumns (unmapped): fill with default value or NULL + */ + private def buildParquetDataFrame( + rawDf: DataFrame, + targetColumns: Seq[String], + writableColumns: Seq[String], + fields: Seq[DataField]): DataFrame = { + val resolver = spark.sessionState.conf.resolver + val sourceColumns = rawDf.columns.toSeq + + val selectExprs: Seq[Column] = writableColumns.map { + colName => + if (targetColumns.exists(tc => resolver(tc, colName))) { + val srcCol = sourceColumns.find(s => resolver(s, colName)) + srcCol match { + case Some(s) => + val field = fields.find(_.name() == colName).get + val sparkType = + org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`()) + col(s).cast(sparkType).as(colName) + case None => + val field = fields.find(_.name() == colName).get + val sparkType = + org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`()) + lit(null).cast(sparkType).as(colName) + } + } else { + resolveDefaultColumn(fields.find(_.name() == colName).get, colName) + } + } + rawDf.select(selectExprs: _*) + } + + /** + * Validate that casting Parquet source columns to target types does not silently lose data. + * Detection strategy: if a source value is non-null but becomes null after casting, the cast + * failed (e.g., a string "abc" cast to IntegerType → null). Aborts immediately on first failure. + */ + private def validateParquetCast( + rawDf: DataFrame, + targetColumns: Seq[String], + writableColumns: Seq[String], + fields: Seq[DataField]): Unit = { + val resolver = spark.sessionState.conf.resolver + val sourceColumns = rawDf.columns.toSeq + + val castCheckCols = ArrayBuffer[(String, String)]() + var validationDf = rawDf + + writableColumns.zip(fields).foreach { + case (colName, field) => + if (targetColumns.exists(tc => resolver(tc, colName))) { + sourceColumns.find(s => resolver(s, colName)).foreach { + srcColName => + val sparkType = + org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`()) + val castColName = s"__pq_cv_$colName" + validationDf = validationDf.withColumn(castColName, col(srcColName).cast(sparkType)) + castCheckCols += ((srcColName, castColName)) + } + } + } + + if (castCheckCols.nonEmpty) { + val badCastFilter = castCheckCols + .map { case (src, dst) => col(src).isNotNull && col(dst).isNull } + .reduce(_ || _) + val badRows = validationDf.filter(badCastFilter).limit(1).collect() + if (badRows.nonEmpty) { + val example = castCheckCols.find { + case (src, dst) => + val row = badRows(0) + val srcIdx = validationDf.schema.fieldIndex(src) + val dstIdx = validationDf.schema.fieldIndex(dst) + !row.isNullAt(srcIdx) && row.isNullAt(dstIdx) + } + throw new IllegalArgumentException( + s"ON_ERROR = ABORT_STATEMENT: Cast failure in column '${example.map(_._1).getOrElse("unknown")}'. Source data contains values that cannot be converted to the target type.") + } + } + } + + /** + * Text-based (CSV/JSON) import pipeline. Reads all columns as strings first, then: + * 1. Rename positional columns (CSV) or keep named columns (JSON) + * 2. Fill unmapped columns with default values + * 3. Cast all string columns to target types with validation + * 4. Write to Paimon table + * 5. Record load history + */ + private def runTextImport( + paimonTable: FileStoreTable, + filePaths: Array[String], + targetColumns: Seq[String], + writableColumns: Seq[String], + fields: Seq[DataField], + filesToLoad: Array[FileStatus], + skippedFiles: Array[FileStatus], + readerOptions: Map[String, String]): Seq[InternalRow] = { + val stringSchema = buildStringSchema(targetColumns) + val sourceDf = readSourceData(filePaths, stringSchema, readerOptions) val finalDf = buildFinalDataFrame(sourceDf, targetColumns, writableColumns, fields) @@ -83,13 +244,13 @@ case class CopyIntoTableExec( val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident) castedDf.write.format("paimon").mode("append").insertInto(tableName) - recordHistoryAndBuildResults( - paimonTable, - filesToLoad, - skippedFiles, - filePaths, - stringSchema, - readerOptions) + val countDf = fileFormat.formatType match { + case FileFormatType.JSON => + spark.read.options(readerOptions).schema(stringSchema).json(filePaths: _*) + case _ => + spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: _*) + } + recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles, countDf) } private def buildStringSchema(targetColumns: Seq[String]): StructType = { @@ -229,20 +390,7 @@ case class CopyIntoTableExec( if (targetColumns.contains(colName)) { col(colName) } else { - val field = fields.find(_.name() == colName).get - val defaultVal = field.defaultValue() - if (defaultVal != null) { - val sparkType = - org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`()) - try { - val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal) - SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName) - } catch { - case _: Exception => lit(null).cast(sparkType).as(colName) - } - } else { - lit(null).as(colName) - } + resolveDefaultColumn(fields.find(_.name() == colName).get, colName) } } renamedDf.select(selectExprs: _*) @@ -294,25 +442,22 @@ case class CopyIntoTableExec( castedDf } + /** + * Record successfully loaded files to load history (for FORCE=FALSE idempotent dedup), and build + * the result rows showing per-file load status. Accepts a pre-built countDf that will be grouped + * by input_file_name() to get per-file row counts — this allows both Parquet and text paths to + * share the same logic. + */ private def recordHistoryAndBuildResults( paimonTable: FileStoreTable, filesToLoad: Array[FileStatus], skippedFiles: Array[FileStatus], - filePaths: Array[String], - stringSchema: StructType, - readerOptions: Map[String, String]): Seq[InternalRow] = { + countDf: DataFrame): Seq[InternalRow] = { val paimonPath = new org.apache.paimon.fs.Path(paimonTable.location().toString) val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), paimonPath) val snapshotId = paimonTable.snapshotManager().latestSnapshotId() val loadedAt = System.currentTimeMillis() - val countDf = fileFormat.formatType match { - case FileFormatType.JSON => - spark.read.options(readerOptions).schema(stringSchema).json(filePaths: _*) - case _ => - spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: _*) - } - val rowCounts = countDf .groupBy(input_file_name().as("file")) .count() @@ -361,4 +506,25 @@ case class CopyIntoTableExec( 0L) }.toSeq } + + /** Resolve the default value expression for a column not populated from source data. */ + private def resolveDefaultColumn(field: DataField, colName: String): Column = { + val defaultVal = field.defaultValue() + if (defaultVal != null) { + val sparkType = + org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`()) + try { + val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal) + SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName) + } catch { + case e: Exception => + logWarning( + s"Failed to parse default value '$defaultVal' for column '$colName': " + + s"${e.getMessage}. Using null instead.") + lit(null).cast(sparkType).as(colName) + } + } else { + lit(null).as(colName) + } + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala index aa764bd30e05..e0970362a0a5 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala @@ -234,7 +234,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase { val e = intercept[IllegalArgumentException] { spark.sql(s"""COPY INTO $dbName0.copy_unsup |FROM '${dir.getAbsolutePath}' - |FILE_FORMAT = (TYPE = PARQUET) + |FILE_FORMAT = (TYPE = ORC) |""".stripMargin) } assert( @@ -1083,4 +1083,343 @@ class CopyIntoTestBase extends PaimonSparkTestBase { spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case") } + + // ========== Parquet Tests ========== + + private def withParquetDir(testBody: File => Unit): Unit = { + val dir = Files.createTempDirectory("copy_into_parquet_test").toFile + try testBody(dir) + finally deleteRecursively(dir) + } + + private def createParquetFile( + dir: File, + name: String, + data: Seq[Row], + schema: org.apache.spark.sql.types.StructType): Unit = { + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + df.coalesce(1).write.parquet(new File(dir, name).getAbsolutePath) + } + + private def createParquetSingleFile( + dir: File, + fileName: String, + data: Seq[Row], + schema: org.apache.spark.sql.types.StructType): Unit = { + val tmpDir = new File(dir, s"_tmp_$fileName") + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + df.coalesce(1).write.parquet(tmpDir.getAbsolutePath) + val partFile = tmpDir.listFiles().find(_.getName.endsWith(".parquet")).get + partFile.renameTo(new File(dir, fileName)) + deleteRecursively(tmpDir) + } + + test("COPY INTO: basic Parquet import") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_basic") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_basic (id INT, name STRING, age INT)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = StructType( + Seq( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("age", IntegerType))) + createParquetFile(dir, "data", Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)), schema) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_basic + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_parquet_basic ORDER BY id"), + Seq(Row(1, "Alice", 30), Row(2, "Bob", 25))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_basic") + } + + test("COPY INTO: Parquet column name matching ignores order") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_order") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_order (id INT, name STRING, age INT)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = StructType( + Seq( + StructField("age", IntegerType), + StructField("name", StringType), + StructField("id", IntegerType))) + createParquetFile(dir, "data", Seq(Row(30, "Alice", 1), Row(25, "Bob", 2)), schema) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_order + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_parquet_order ORDER BY id"), + Seq(Row(1, "Alice", 30), Row(2, "Bob", 25))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_order") + } + + test("COPY INTO: Parquet with explicit column list") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_cols") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_cols (id INT, name STRING, age INT)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + createParquetFile(dir, "data", Seq(Row(1, "Alice"), Row(2, "Bob")), schema) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_cols (id, name) + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_parquet_cols ORDER BY id"), + Seq(Row(1, "Alice", null), Row(2, "Bob", null))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_cols") + } + + test("COPY INTO: Parquet export") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_export") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_export (id INT, name STRING)") + spark.sql(s"INSERT INTO $dbName0.copy_parquet_export VALUES (1, 'Alice'), (2, 'Bob')") + + withParquetDir { + dir => + val outputPath = new File(dir, "output").getAbsolutePath + spark.sql(s"""COPY INTO '$outputPath' + |FROM $dbName0.copy_parquet_export + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + val readBack = spark.read.parquet(outputPath) + checkAnswer(readBack.orderBy("id"), Seq(Row(1, "Alice"), Row(2, "Bob"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_export") + } + + test("COPY INTO: Parquet export with COMPRESSION") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_compress") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_compress (id INT, name STRING)") + spark.sql(s"INSERT INTO $dbName0.copy_parquet_compress VALUES (1, 'Alice'), (2, 'Bob')") + + withParquetDir { + dir => + val outputPath = new File(dir, "output").getAbsolutePath + spark.sql(s"""COPY INTO '$outputPath' + |FROM $dbName0.copy_parquet_compress + |FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP) + |""".stripMargin) + + val readBack = spark.read.parquet(outputPath) + checkAnswer(readBack.orderBy("id"), Seq(Row(1, "Alice"), Row(2, "Bob"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_compress") + } + + test("COPY INTO: Parquet export then import round-trip") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_src") + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_dst") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_rt_src (id INT, name STRING, score DOUBLE)") + spark.sql( + s"INSERT INTO $dbName0.copy_parquet_rt_src VALUES (1, 'Alice', 95.5), (2, 'Bob', 87.3)") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_rt_dst (id INT, name STRING, score DOUBLE)") + + withParquetDir { + dir => + val outputPath = new File(dir, "export").getAbsolutePath + spark.sql(s"""COPY INTO '$outputPath' + |FROM $dbName0.copy_parquet_rt_src + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_rt_dst + |FROM '$outputPath' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_parquet_rt_dst ORDER BY id"), + Seq(Row(1, "Alice", 95.5), Row(2, "Bob", 87.3))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_src") + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_dst") + } + + test("COPY INTO: Parquet extra fields are ignored") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_extra") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_extra (id INT, name STRING)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = StructType( + Seq( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("extra", StringType))) + createParquetFile( + dir, + "data", + Seq(Row(1, "Alice", "ignore"), Row(2, "Bob", "ignore")), + schema) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_extra + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_parquet_extra ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_extra") + } + + test("COPY INTO: Parquet missing fields become null") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_missing") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_missing (id INT, name STRING, age INT)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + createParquetFile(dir, "data", Seq(Row(1, "Alice"), Row(2, "Bob")), schema) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_missing + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_parquet_missing ORDER BY id"), + Seq(Row(1, "Alice", null), Row(2, "Bob", null))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_missing") + } + + test("COPY INTO: Parquet FORCE=FALSE skips already-loaded files") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_force") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_force (id INT, name STRING)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + createParquetFile(dir, "data", Seq(Row(1, "Alice")), schema) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_force + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_force + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET) + |FORCE = FALSE + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_parquet_force ORDER BY id"), + Seq(Row(1, "Alice"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_force") + } + + test("COPY INTO: Parquet PATTERN filters files") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_pattern") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_pattern (id INT, name STRING)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + createParquetSingleFile(dir, "include_data.parquet", Seq(Row(1, "Alice")), schema) + createParquetSingleFile(dir, "exclude_data.parquet", Seq(Row(2, "Bob")), schema) + + spark.sql(s"""COPY INTO $dbName0.copy_parquet_pattern + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = PARQUET) + |PATTERN = 'include.*' + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_parquet_pattern ORDER BY id"), + Seq(Row(1, "Alice"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_pattern") + } + + test("COPY INTO: Parquet unsupported option errors") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_opt_err") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_opt_err (id INT, name STRING)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + createParquetFile(dir, "data", Seq(Row(1, "Alice")), schema) + + intercept[IllegalArgumentException] { + spark.sql(s"""COPY INTO $dbName0.copy_parquet_opt_err + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET, FIELD_DELIMITER = ',') + |""".stripMargin) + } + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_opt_err") + } + + test("COPY INTO: Parquet rows_loaded count is accurate") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count") + spark.sql(s"CREATE TABLE $dbName0.copy_parquet_count (id INT, name STRING)") + + withParquetDir { + dir => + import org.apache.spark.sql.types._ + val schema = + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + createParquetFile( + dir, + "data", + Seq(Row(1, "Alice"), Row(2, "Bob"), Row(3, "Charlie")), + schema) + + val result = spark.sql(s"""COPY INTO $dbName0.copy_parquet_count + |FROM '${dir.getAbsolutePath}/data' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + + val rows = result.collect() + val totalLoaded = rows.map(_.getLong(2)).sum + assert(totalLoaded == 3) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count") + } }