Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1c19b51
Work
Nov 26, 2025
062296f
Merge remote-tracking branch 'origin/main' into native_csv_read
Dec 2, 2025
0d9355f
Work
Dec 5, 2025
4479678
Merge remote-tracking branch 'origin/main' into native_csv_read
Dec 5, 2025
6c12812
Work
Dec 6, 2025
b601956
work
Dec 12, 2025
2ffd9cb
Merge remote-tracking branch 'origin/main' into native_csv_read
Dec 13, 2025
c685235
work
Dec 13, 2025
768b3e9
impl map_from_entries
Dec 14, 2025
c68c342
Revert "impl map_from_entries"
Dec 16, 2025
d887555
Merge branch 'apache:main' into main
kazantsev-maksim Dec 16, 2025
231aa90
Merge branch 'apache:main' into main
kazantsev-maksim Dec 17, 2025
2be0069
Merge remote-tracking branch 'origin/main' into native_csv_read
Dec 17, 2025
7ea16ee
work
Dec 17, 2025
c521006
work
Dec 19, 2025
9500bbb
Merge branch 'apache:main' into main
kazantsev-maksim Dec 24, 2025
9577481
Merge branch 'apache:main' into main
kazantsev-maksim Dec 28, 2025
8796a68
Merge remote-tracking branch 'origin/main' into native_csv_read
Dec 29, 2025
0f06936
WIP
Dec 30, 2025
033ba8b
WIP
Dec 30, 2025
dafa0de
WIP
Dec 30, 2025
1809df8
Work
Jan 1, 2026
3791557
Merge branch 'apache:main' into main
kazantsev-maksim Jan 2, 2026
7c2f082
Merge branch 'apache:main' into main
kazantsev-maksim Jan 3, 2026
609a605
Merge branch 'apache:main' into main
kazantsev-maksim Jan 6, 2026
d8c7760
Final approach
Jan 6, 2026
88aeb33
Fix workflows
Jan 6, 2026
b2a0c28
Fix fmt
Jan 6, 2026
ba98e37
Fix params
Jan 6, 2026
cd449c5
Fix tests
Jan 6, 2026
a1801c1
Fix rust fmt
Jan 6, 2026
65251eb
Fix fmt
Jan 6, 2026
a151b2c
Merge branch 'apache:main' into main
kazantsev-maksim Jan 7, 2026
ad3e7f5
Merge branch 'apache:main' into main
kazantsev-maksim Jan 10, 2026
da73c27
Merge remote-tracking branch 'origin/main' into native_csv_read
Jan 10, 2026
a21ae07
Fix tests
Jan 11, 2026
bb5debe
Run tpch
Jan 14, 2026
ea92e4b
Merge branch 'apache:main' into main
kazantsev-maksim Jan 14, 2026
2cc6a98
Merge remote-tracking branch 'origin/main' into native_csv_read
Jan 14, 2026
139ecff
Add spark options to tpcbench.py
Jan 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
- name: "csv"
value: |
org.apache.comet.csv.CometCsvNativeReadSuite
- name: "exec"
value: |
org.apache.comet.exec.CometAggregateSuite
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ jobs:
org.apache.spark.sql.comet.ParquetEncryptionITCase
org.apache.comet.exec.CometNativeReaderSuite
org.apache.comet.CometIcebergNativeSuite
- name: "csv"
value: |
org.apache.comet.csv.CometCsvNativeReadSuite
- name: "exec"
value: |
org.apache.comet.exec.CometAggregateSuite
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_CSV_V2_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.csv.v2.enabled")
.category(CATEGORY_TESTING)
.doc(
"Whether to use the native Comet V2 CSV reader for improved performance. " +
"Default: false (uses standard Spark CSV reader) " +
"Experimental: Performance benefits are workload-dependent.")
.booleanConf
.createWithDefault(false)

val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =
conf("spark.comet.parquet.respectFilterPushdown")
.category(CATEGORY_PARQUET)
Expand Down
3 changes: 2 additions & 1 deletion dev/benchmarks/comet-tpch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ $SPARK_HOME/bin/spark-submit \
--data $TPCH_DATA \
--queries $TPCH_QUERIES \
--output . \
--iterations 1
--iterations 1 \
--format parquet
3 changes: 2 additions & 1 deletion dev/benchmarks/spark-tpch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ $SPARK_HOME/bin/spark-submit \
--data $TPCH_DATA \
--queries $TPCH_QUERIES \
--output . \
--iterations 1
--iterations 1 \
--format parquet
13 changes: 8 additions & 5 deletions dev/benchmarks/tpcbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
from pyspark.sql import SparkSession
import time
from typing import Dict

# rename same columns aliases
# a, a, b, b -> a, a_1, b, b_1
Expand All @@ -37,7 +38,7 @@ def dedup_columns(df):
new_cols.append(f"{c}_{counts[c]}")
return df.toDF(*new_cols)

def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, query_num: int = None, write_path: str = None):
def main(benchmark: str, data_path: str, query_path: str, iterations: int, output: str, name: str, format: str, query_num: int = None, write_path: str = None, options: Dict[str, str] = None):

# Initialize a SparkSession
spark = SparkSession.builder \
Expand All @@ -58,9 +59,9 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
raise "invalid benchmark"

for table in table_names:
path = f"{data_path}/{table}.parquet"
path = f"{data_path}/{table}.{format}"
print(f"Registering table {table} using path {path}")
df = spark.read.parquet(path)
df = spark.read.format(format).options(**options).load(path)
df.createOrReplaceTempView(table)

conf_dict = {k: v for k, v in spark.sparkContext.getConf().getAll()}
Expand Down Expand Up @@ -146,15 +147,17 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DataFusion benchmark derived from TPC-H / TPC-DS")
parser.add_argument("--benchmark", required=True, help="Benchmark to run (tpch or tpcds)")
parser.add_argument("--benchmark", required=True, default="tpch", help="Benchmark to run (tpch or tpcds)")
parser.add_argument("--data", required=True, help="Path to data files")
parser.add_argument("--queries", required=True, help="Path to query files")
parser.add_argument("--iterations", required=False, default="1", help="How many iterations to run")
parser.add_argument("--output", required=True, help="Path to write output")
parser.add_argument("--name", required=True, help="Prefix for result file e.g. spark/comet/gluten")
parser.add_argument("--query", required=False, type=int, help="Specific query number to run (1-based). If not specified, all queries will be run.")
parser.add_argument("--write", required=False, help="Path to save query results to, in Parquet format.")
parser.add_argument("--format", required=True, default="parquet", help="Input file format (parquet, csv, json)")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary to add the ability to pass CSV reading options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For CSV:

tpcbench.py
--name spark
--benchmark tpch
--data $TPCH_DATA
--queries $TPCH_QUERIES
--output .
--iterations 1
--format csv
--options '{"header": "true", "delimiter": ","}'

parser.add_argument("--options", type=json.loads, required=False, default={}, help='Spark options as JSON string, e.g., \'{"header": "true", "delimiter": ","}\'')
args = parser.parse_args()

main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.query, args.write)
main(args.benchmark, args.data, args.queries, int(args.iterations), args.output, args.name, args.format, args.query, args.write, args.options)

1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
| `spark.comet.parquet.write.enabled` | Whether to enable native Parquet write through Comet. When enabled, Comet will intercept Parquet write operations and execute them natively. This feature is highly experimental and only partially implemented. It should not be used in production. | false |
| `spark.comet.scan.csv.v2.enabled` | Whether to use the native Comet V2 CSV reader for improved performance. Default: false (uses standard Spark CSV reader) Experimental: Performance benefits are workload-dependent. | false |
| `spark.comet.sparkToColumnar.enabled` | Whether to enable Spark to Arrow columnar conversion. When this is turned on, Comet will convert operators in `spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before processing. This is an experimental feature and has known issues with non-UTC timezones. | false |
| `spark.comet.sparkToColumnar.supportedOperatorList` | A comma-separated list of operators that will be converted to Arrow columnar format when `spark.comet.sparkToColumnar.enabled` is true. | Range,InMemoryTableScan,RDDScan |
| `spark.comet.testing.strict` | Experimental option to enable strict testing, which will fail tests that could be more comprehensive, such as checking for a specific fallback reason. It can be overridden by the environment variable `ENABLE_COMET_STRICT_TESTING`. | false |
Expand Down
86 changes: 86 additions & 0 deletions native/core/src/execution/operators/csv_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::execution::operators::ExecutionError;
use arrow::datatypes::{Field, SchemaRef};
use datafusion::common::DataFusionError;
use datafusion::common::Result;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::CsvSource;
use datafusion_comet_proto::spark_operator::CsvOptions;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::source::DataSourceExec;
use datafusion_datasource::PartitionedFile;
use itertools::Itertools;
use std::sync::Arc;

pub fn init_csv_datasource_exec(
object_store_url: ObjectStoreUrl,
file_groups: Vec<Vec<PartitionedFile>>,
data_schema: SchemaRef,
partition_schema: SchemaRef,
projection_vector: Vec<usize>,
csv_options: &CsvOptions,
) -> Result<Arc<DataSourceExec>, ExecutionError> {
let csv_source = build_csv_source(csv_options.clone());

let file_groups = file_groups
.iter()
.map(|files| FileGroup::new(files.clone()))
.collect();

let partition_fields = partition_schema
.fields()
.iter()
.map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
.collect_vec();

let file_scan_config: FileScanConfig =
FileScanConfigBuilder::new(object_store_url, data_schema, csv_source)
.with_file_groups(file_groups)
.with_table_partition_cols(partition_fields)
.with_projection_indices(Some(projection_vector))
.build();

Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
}

fn build_csv_source(options: CsvOptions) -> Arc<CsvSource> {
let delimiter = string_to_u8(&options.delimiter, "delimiter").unwrap();
let quote = string_to_u8(&options.quote, "quote").unwrap();
let escape = string_to_u8(&options.escape, "escape").unwrap();
let terminator = string_to_u8(&options.terminator, "terminator").unwrap();
let comment = options
.comment
.map(|c| string_to_u8(&c, "comment").unwrap());
let csv_source = CsvSource::new(options.has_header, delimiter, quote)
.with_escape(Some(escape))
.with_comment(comment)
.with_terminator(Some(terminator))
.with_truncate_rows(options.truncated_rows);
Arc::new(csv_source)
}

fn string_to_u8(option: &str, option_name: &str) -> Result<u8> {
match option.as_bytes().first() {
Some(&ch) if ch.is_ascii() => Ok(ch),
_ => Err(DataFusionError::Configuration(format!(
"invalid {option_name} character '{option}': must be an ASCII character"
))),
}
}
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ pub use expand::ExpandExec;
mod iceberg_scan;
mod parquet_writer;
pub use parquet_writer::ParquetWriterExec;
mod csv_scan;
pub mod projection;
mod scan;
pub use csv_scan::init_csv_datasource_exec;

/// Error returned during executing operators.
#[derive(thiserror::Error, Debug)]
Expand Down
40 changes: 40 additions & 0 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod expression_registry;
pub mod macros;
pub mod operator_registry;

use crate::execution::operators::init_csv_datasource_exec;
use crate::execution::operators::IcebergScanExec;
use crate::{
errors::ExpressionError,
Expand Down Expand Up @@ -95,6 +96,7 @@ use datafusion::physical_expr::window::WindowExpr;
use datafusion::physical_expr::LexOrdering;

use crate::parquet::parquet_exec::init_datasource_exec;

use arrow::array::{
new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array,
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
Expand Down Expand Up @@ -1120,6 +1122,44 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
))
}
OpStruct::CsvScan(scan) => {
let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
let partition_schema =
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
let projection_vector: Vec<usize> =
scan.projection_vector.iter().map(|i| *i as usize).collect();
let object_store_options: HashMap<String, String> = scan
.object_store_options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let one_file = scan
.file_partitions
.first()
.and_then(|f| f.partitioned_file.first())
.map(|f| f.file_path.clone())
.ok_or(GeneralError("Failed to locate file".to_string()))?;
let (object_store_url, _) = prepare_object_store_with_configs(
self.session_ctx.runtime_env(),
one_file,
&object_store_options,
)?;
let files =
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
let scan = init_csv_datasource_exec(
object_store_url,
file_groups,
data_schema,
partition_schema,
projection_vector,
&scan.csv_options.clone().unwrap(),
)?;
Ok((
vec![],
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
))
}
OpStruct::Scan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/planner/operator_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum OperatorType {
SortMergeJoin,
HashJoin,
Window,
CsvScan,
}

/// Global registry of operator builders
Expand Down Expand Up @@ -151,5 +152,6 @@ fn get_operator_type(spark_operator: &Operator) -> Option<OperatorType> {
OpStruct::HashJoin(_) => Some(OperatorType::HashJoin),
OpStruct::Window(_) => Some(OperatorType::Window),
OpStruct::Explode(_) => None, // Not yet in OperatorType enum
OpStruct::CsvScan(_) => Some(OperatorType::CsvScan),
}
}
20 changes: 20 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message Operator {
IcebergScan iceberg_scan = 112;
ParquetWriter parquet_writer = 113;
Explode explode = 114;
CsvScan csv_scan = 115;
}
}

Expand Down Expand Up @@ -110,6 +111,25 @@ message NativeScan {
bool encryption_enabled = 14;
}

message CsvScan {
repeated SparkStructField data_schema = 1;
repeated SparkStructField partition_schema = 2;
repeated int32 projection_vector = 3;
repeated SparkFilePartition file_partitions = 4;
map<string, string> object_store_options = 5;
CsvOptions csv_options = 6;
}

message CsvOptions {
bool has_header = 1;
string delimiter = 2;
string quote = 3;
string escape = 4;
optional string comment = 5;
string terminator = 7;
bool truncated_rows = 8;
}

message IcebergScan {
// Schema to read
repeated SparkStructField required_schema = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
import org.apache.comet.CometSparkSessionExtensions._
import org.apache.comet.rules.CometExecRule.allExecs
import org.apache.comet.serde.{CometOperatorSerde, Compatible, Incompatible, OperatorOuterClass, Unsupported}
import org.apache.comet.serde._
import org.apache.comet.serde.operator._
import org.apache.comet.serde.operator.CometDataWritingCommand

object CometExecRule {

Expand Down Expand Up @@ -191,6 +190,9 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
case scan: CometBatchScanExec if scan.nativeIcebergScanMetadata.isDefined =>
convertToComet(scan, CometIcebergNativeScan).getOrElse(scan)

case scan: CometBatchScanExec if scan.wrapped.scan.isInstanceOf[CSVScan] =>
convertToComet(scan, CometCsvNativeScanExec).getOrElse(scan)

// Comet JVM + native scan for V1 and V2
case op if isCometScan(op) =>
convertToComet(op, CometScanWrapper).getOrElse(op)
Expand Down
42 changes: 42 additions & 0 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -255,6 +256,47 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
withInfos(scanExec, fallbackReasons.toSet)
}

case scan: CSVScan if COMET_CSV_V2_NATIVE_ENABLED.get() =>
val fallbackReasons = new ListBuffer[String]()
val schemaSupported =
CometBatchScanExec.isSchemaSupported(scan.readDataSchema, fallbackReasons)
if (!schemaSupported) {
fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
}
val partitionSchemaSupported =
CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema, fallbackReasons)
if (!partitionSchemaSupported) {
fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported"
}
val corruptedRecordsColumnName =
SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
val containsCorruptedRecordsColumn =
!scan.readDataSchema.fieldNames.contains(corruptedRecordsColumnName)
if (!containsCorruptedRecordsColumn) {
fallbackReasons += "Comet doesn't support the processing of corrupted records"
}
val isInferSchemaEnabled = scan.options.getBoolean("inferSchema", false)
if (isInferSchemaEnabled) {
fallbackReasons += "Comet doesn't support inferSchema=true option"
}
val delimiter =
Option(scan.options.get("delimiter"))
.orElse(Option(scan.options.get("sep")))
.getOrElse(",")
val isSingleCharacterDelimiter = delimiter.length == 1
if (!isSingleCharacterDelimiter) {
fallbackReasons +=
s"Comet supports only single-character delimiters, but got: '$delimiter'"
}
if (schemaSupported && partitionSchemaSupported && containsCorruptedRecordsColumn
&& !isInferSchemaEnabled && isSingleCharacterDelimiter) {
CometBatchScanExec(
scanExec.clone().asInstanceOf[BatchScanExec],
runtimeFilters = scanExec.runtimeFilters)
} else {
withInfos(scanExec, fallbackReasons.toSet)
}

// Iceberg scan - patched version implementing SupportsComet interface
case s: SupportsComet if !COMET_ICEBERG_NATIVE_ENABLED.get() =>
val fallbackReasons = new ListBuffer[String]()
Expand Down
Loading