Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 81 additions & 8 deletions docs/docs/spark/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;

## COPY INTO

`COPY INTO` provides a SQL command for bulk loading data files into Paimon tables and exporting table data to files. Supported formats: **CSV** and **JSON**.
`COPY INTO` provides a SQL command for bulk loading data files into Paimon tables and exporting table data to files. Supported formats: **CSV**, **JSON**, and **Parquet**.

### CSV Import

Expand Down Expand Up @@ -383,6 +383,37 @@ FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE);

JSON columns are matched **by column name** (not by position), so source field order does not matter.

### Parquet Import

```sql
COPY INTO table_name [(col1, col2, ...)]
FROM 'source_path'
FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
[ON_ERROR = ABORT_STATEMENT]
```

**Basic import:**

```sql
COPY INTO my_db.my_table
FROM '/data/parquet_files/'
FILE_FORMAT = (TYPE = PARQUET);
```

**Import with PATTERN:**

```sql
COPY INTO my_db.events
FROM '/data/lake/'
FILE_FORMAT = (TYPE = PARQUET)
PATTERN = '.*\.parquet'
FORCE = FALSE;
```

Parquet columns are matched **by column name** (not by position). Extra columns in the source files are ignored; missing columns become NULL.

### Write CSV Files

```sql
Expand Down Expand Up @@ -419,15 +450,42 @@ FILE_FORMAT = (TYPE = JSON)
OVERWRITE = TRUE;
```

### Write Parquet Files

```sql
COPY INTO 'target_path'
FROM table_name
FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```

**Basic Parquet export:**

```sql
COPY INTO '/export/data_backup/'
FROM my_db.events
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE;
```

**Export with compression:**

```sql
COPY INTO '/export/data_compressed/'
FROM my_db.events
FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
OVERWRITE = TRUE;
```

### FILE_FORMAT Options

`FILE_FORMAT` is required and must include `TYPE = CSV` or `TYPE = JSON`.
`FILE_FORMAT` is required and must include `TYPE = CSV`, `TYPE = JSON`, or `TYPE = PARQUET`.

**CSV import options:**

| Option | Description | Default |
|--------|-------------|---------|
| TYPE | File format type. `CSV` or `JSON`. | (required) |
| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| FIELD_DELIMITER | Column delimiter character. | `,` |
| SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` |
| QUOTE | Quote character for enclosing fields. | `"` |
Expand All @@ -440,17 +498,24 @@ OVERWRITE = TRUE;

| Option | Description | Default |
|--------|-------------|---------|
| TYPE | File format type. `CSV` or `JSON`. | (required) |
| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| MULTI_LINE | Parse multi-line JSON (e.g. JSON arrays or pretty-printed objects). | `FALSE` |
| NULL_IF | List of string values to interpret as NULL. | (none) |
| EMPTY_FIELD_AS_NULL | Treat empty string values as NULL. | `FALSE` |
| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |

**Parquet import options:**

| Option | Description | Default |
|--------|-------------|---------|
| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| COMPRESSION | Compression codec. Usually auto-detected; rarely needed for import. | (auto) |

**CSV write options:**

| Option | Description | Default |
|--------|-------------|---------|
| TYPE | File format type. `CSV` or `JSON`. | (required) |
| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| FIELD_DELIMITER | Column delimiter character. | `,` |
| HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` |
| QUOTE | Quote character for enclosing fields. | `"` |
Expand All @@ -461,11 +526,18 @@ OVERWRITE = TRUE;

| Option | Description | Default |
|--------|-------------|---------|
| TYPE | File format type. `CSV` or `JSON`. | (required) |
| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| DATE_FORMAT | Custom date format pattern. | Spark default |
| TIMESTAMP_FORMAT | Custom timestamp format pattern. | Spark default |
| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |

**Parquet write options:**

| Option | Description | Default |
|--------|-------------|---------|
| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| COMPRESSION | Compression codec (`SNAPPY`, `GZIP`, `NONE`, etc.). | `SNAPPY` |

### Import Options

| Option | Description | Default |
Expand All @@ -486,7 +558,8 @@ When an explicit column list is provided (e.g., `COPY INTO t (col1, col2) FROM .

- **CSV**: Columns are mapped **positionally** to the specified column list.
- **JSON**: Columns are matched **by name** to the specified column list.
- The number of source columns must match the column list length (CSV). For JSON, missing fields in the source become NULL.
- **Parquet**: Columns are matched **by name** to the specified column list.
- The number of source columns must match the column list length (CSV). For JSON and Parquet, missing fields in the source become NULL.
- Columns not in the list are filled with their **DEFAULT value** (if defined in the table schema) or **NULL**.
- Non-nullable columns without a default value that are not in the list will cause an error.

Expand Down Expand Up @@ -524,7 +597,7 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files have been successfull

### Limitations

- Only **CSV** and **JSON** formats are supported.
- Only **CSV**, **JSON**, and **Parquet** formats are supported.
- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not supported.
- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the entire command.
- `SINGLE = TRUE` (single-file output) is not supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ nonReserved
| COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | ABORT_STATEMENT | OVERWRITE
| CSV
| JSON
| PARQUET
;

ALTER: 'ALTER';
Expand Down Expand Up @@ -248,6 +249,7 @@ ABORT_STATEMENT: 'ABORT_STATEMENT';
OVERWRITE: 'OVERWRITE';
CSV: 'CSV';
JSON: 'JSON';
PARQUET: 'PARQUET';

PLUS: '+';
MINUS: '-';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ sealed trait FileFormatType
object FileFormatType {
case object CSV extends FileFormatType
case object JSON extends FileFormatType
case object PARQUET extends FileFormatType
case class Unsupported(name: String) extends FileFormatType
}

Expand Down Expand Up @@ -53,6 +54,15 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin
case _ =>
}
}
case FileFormatType.PARQUET =>
mapped.remove("mode")
options.foreach {
case (k, v) =>
k match {
case "COMPRESSION" => mapped("compression") = v
case _ =>
}
}
case _ =>
}
mapped.toMap
Expand Down Expand Up @@ -83,6 +93,14 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin
case _ =>
}
}
case FileFormatType.PARQUET =>
options.foreach {
case (k, v) =>
k match {
case "COMPRESSION" => mapped("compression") = v
case _ =>
}
}
case _ =>
}
mapped.toMap
Expand All @@ -108,6 +126,7 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin
}
val validKeys = formatType match {
case FileFormatType.JSON => CopyFileFormat.VALID_JSON_IMPORT_KEYS
case FileFormatType.PARQUET => CopyFileFormat.VALID_PARQUET_IMPORT_KEYS
case _ => CopyFileFormat.VALID_CSV_IMPORT_KEYS
}
val invalid = options.keys.filterNot(validKeys.contains)
Expand All @@ -132,6 +151,7 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin
validateFormatType()
val validKeys = formatType match {
case FileFormatType.JSON => CopyFileFormat.VALID_JSON_EXPORT_KEYS
case FileFormatType.PARQUET => CopyFileFormat.VALID_PARQUET_EXPORT_KEYS
case _ => CopyFileFormat.VALID_CSV_EXPORT_KEYS
}
val invalid = options.keys.filterNot(validKeys.contains)
Expand All @@ -145,9 +165,10 @@ case class CopyFileFormat(formatType: FileFormatType, options: Map[String, Strin
formatType match {
case FileFormatType.CSV =>
case FileFormatType.JSON =>
case FileFormatType.PARQUET =>
case FileFormatType.Unsupported(name) =>
throw new IllegalArgumentException(
s"Unsupported file format type: $name. Supported types: CSV, JSON")
s"Unsupported file format type: $name. Supported types: CSV, JSON, PARQUET")
}
}
}
Expand Down Expand Up @@ -185,13 +206,22 @@ object CopyFileFormat {
"TIMESTAMP_FORMAT"
)

val VALID_PARQUET_IMPORT_KEYS: Set[String] = Set(
"COMPRESSION"
)

val VALID_PARQUET_EXPORT_KEYS: Set[String] = Set(
"COMPRESSION"
)

// Unit Separator (U+001F) used to encode multi-value lists in a single string
val LIST_SEPARATOR: String = "\u001f"

def parseFormatType(typeStr: String): FileFormatType = {
typeStr.toUpperCase match {
case "CSV" => FileFormatType.CSV
case "JSON" => FileFormatType.JSON
case "PARQUET" => FileFormatType.PARQUET
case other => FileFormatType.Unsupported(other)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ case class CopyIntoLocationExec(
fileFormat.formatType match {
case FileFormatType.JSON =>
df.write.options(writerOptions).mode(saveMode).json(targetPath)
case FileFormatType.PARQUET =>
df.write.options(writerOptions).mode(saveMode).parquet(targetPath)
case _ =>
df.write.options(writerOptions).mode(saveMode).csv(targetPath)
}
Expand Down
Loading
Loading