From 7feff85c90302e8679627400cbfd027d55676ce8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 08:15:33 -0700 Subject: [PATCH 1/4] feat: pass spark.comet.datafusion.* configs through to DataFusion session Spark configs with the prefix spark.comet.datafusion.* are now passed through to DataFusion's SessionConfig. The prefix spark.comet. is stripped so that e.g. spark.comet.datafusion.sql_parser.parse_float_as_decimal becomes datafusion.sql_parser.parse_float_as_decimal in DataFusion. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 146e0feb8e..a3ec08f36c 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -246,6 +246,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( local_dirs_vec, max_temp_directory_size, task_cpus as usize, + &spark_config, )?; let plan_creation_time = start.elapsed(); @@ -300,6 +301,7 @@ fn prepare_datafusion_session_context( local_dirs: Vec, max_temp_directory_size: u64, task_cpus: usize, + spark_config: &HashMap, ) -> CometResult { let paths = local_dirs.into_iter().map(PathBuf::from).collect(); let disk_manager = DiskManagerBuilder::default() @@ -308,10 +310,7 @@ fn prepare_datafusion_session_context( let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager); rt_config = rt_config.with_memory_pool(memory_pool); - // Get Datafusion configuration from Spark Execution context - // can be configured in Comet Spark JVM using Spark --conf parameters - // e.g: spark-shell --conf spark.datafusion.sql_parser.parse_float_as_decimal=true - let session_config = SessionConfig::new() + let mut session_config = SessionConfig::new() .with_target_partitions(task_cpus) // This DataFusion context is within the scope of an executing Spark Task. We want to set // its internal parallelism to the number of CPUs allocated to Spark Tasks. This can be @@ -328,6 +327,17 @@ fn prepare_datafusion_session_context( &ScalarValue::Float64(Some(1.1)), ); + // Pass through DataFusion configs from Spark. + // e.g: spark-shell --conf spark.comet.datafusion.sql_parser.parse_float_as_decimal=true + // becomes datafusion.sql_parser.parse_float_as_decimal=true + const SPARK_COMET_DF_PREFIX: &str = "spark.comet.datafusion."; + for (key, value) in spark_config { + if let Some(df_key) = key.strip_prefix(SPARK_COMET_DF_PREFIX) { + let df_key = format!("datafusion.{df_key}"); + session_config = session_config.set_str(&df_key, value); + } + } + let runtime = rt_config.build()?; let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); From 66d247f8ed3f19b1044720a69766a06ed1513641 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Feb 2026 11:55:50 -0700 Subject: [PATCH 2/4] add new config --- .../src/main/scala/org/apache/comet/CometConf.scala | 9 +++++++++ .../scala/org/apache/comet/CometExecIterator.scala | 11 +++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 522ccbc94c..b0629e0223 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -818,6 +818,15 @@ object CometConf extends ShimCometConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB + val COMET_RESPECT_DATAFUSION_CONFIGS: ConfigEntry[Boolean] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.respectDataFusionConfigs") + .category(CATEGORY_TESTING) + .doc("Development and testing configuration option to allow DataFusion configs set in " + + "Spark configuration settings starting with `spark.comet.datafusion.` to be passed " + + "into native execution.") + .booleanConf + .createWithDefault(false) + val COMET_STRICT_TESTING: ConfigEntry[Boolean] = conf(s"$COMET_PREFIX.testing.strict") .category(CATEGORY_TESTING) .doc("Experimental option to enable strict testing, which will fail tests that could be " + diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 3156eb3873..c334113cca 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -269,8 +269,15 @@ object CometExecIterator extends Logging { def serializeCometSQLConfs(): Array[Byte] = { val builder = ConfigMap.newBuilder() - cometSqlConfs.foreach { case (k, v) => - builder.putEntries(k, v) + cometSqlConfs.foreach { + case (k, v) => + if (k.startsWith(s"${CometConf.COMET_EXEC_CONFIG_PREFIX}.datafusion.")) { + if (CometConf.COMET_RESPECT_DATAFUSION_CONFIGS.get(SQLConf.get)) { + builder.putEntries(k, v) + } + } else { + builder.putEntries(k, v) + } } builder.build().toByteArray } From 96ed1bb7004c58e541bfb4e8338c2ee8553796aa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Feb 2026 11:59:01 -0700 Subject: [PATCH 3/4] format --- .../src/main/scala/org/apache/comet/CometConf.scala | 13 +++++++------ .../scala/org/apache/comet/CometExecIterator.scala | 13 ++++++------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index b0629e0223..654997a511 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -820,12 +820,13 @@ object CometConf extends ShimCometConf { val COMET_RESPECT_DATAFUSION_CONFIGS: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.respectDataFusionConfigs") - .category(CATEGORY_TESTING) - .doc("Development and testing configuration option to allow DataFusion configs set in " + - "Spark configuration settings starting with `spark.comet.datafusion.` to be passed " + - "into native execution.") - .booleanConf - .createWithDefault(false) + .category(CATEGORY_TESTING) + .doc( + "Development and testing configuration option to allow DataFusion configs set in " + + "Spark configuration settings starting with `spark.comet.datafusion.` to be passed " + + "into native execution.") + .booleanConf + .createWithDefault(false) val COMET_STRICT_TESTING: ConfigEntry[Boolean] = conf(s"$COMET_PREFIX.testing.strict") .category(CATEGORY_TESTING) diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index c334113cca..2528f5ad69 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -269,15 +269,14 @@ object CometExecIterator extends Logging { def serializeCometSQLConfs(): Array[Byte] = { val builder = ConfigMap.newBuilder() - cometSqlConfs.foreach { - case (k, v) => - if (k.startsWith(s"${CometConf.COMET_EXEC_CONFIG_PREFIX}.datafusion.")) { - if (CometConf.COMET_RESPECT_DATAFUSION_CONFIGS.get(SQLConf.get)) { - builder.putEntries(k, v) - } - } else { + cometSqlConfs.foreach { case (k, v) => + if (k.startsWith(s"${CometConf.COMET_EXEC_CONFIG_PREFIX}.datafusion.")) { + if (CometConf.COMET_RESPECT_DATAFUSION_CONFIGS.get(SQLConf.get)) { builder.putEntries(k, v) } + } else { + builder.putEntries(k, v) + } } builder.build().toByteArray } From 77636c10f701bc9b7140bb4a3aba09a8c16ff7c9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 12 Feb 2026 11:59:58 -0700 Subject: [PATCH 4/4] fix --- spark/src/main/scala/org/apache/comet/CometExecIterator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 2528f5ad69..d27f88b496 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -270,7 +270,7 @@ object CometExecIterator extends Logging { def serializeCometSQLConfs(): Array[Byte] = { val builder = ConfigMap.newBuilder() cometSqlConfs.foreach { case (k, v) => - if (k.startsWith(s"${CometConf.COMET_EXEC_CONFIG_PREFIX}.datafusion.")) { + if (k.startsWith(s"${CometConf.COMET_PREFIX}.datafusion.")) { if (CometConf.COMET_RESPECT_DATAFUSION_CONFIGS.get(SQLConf.get)) { builder.putEntries(k, v) }