diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 522ccbc94c..654997a511 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -818,6 +818,16 @@ object CometConf extends ShimCometConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(100L * 1024 * 1024 * 1024) // 100 GB + val COMET_RESPECT_DATAFUSION_CONFIGS: ConfigEntry[Boolean] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.respectDataFusionConfigs") + .category(CATEGORY_TESTING) + .doc( + "Development and testing configuration option to allow DataFusion configs set in " + + "Spark configuration settings starting with `spark.comet.datafusion.` to be passed " + + "into native execution.") + .booleanConf + .createWithDefault(false) + val COMET_STRICT_TESTING: ConfigEntry[Boolean] = conf(s"$COMET_PREFIX.testing.strict") .category(CATEGORY_TESTING) .doc("Experimental option to enable strict testing, which will fail tests that could be " + diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 146e0feb8e..a3ec08f36c 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -246,6 +246,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( local_dirs_vec, max_temp_directory_size, task_cpus as usize, + &spark_config, )?; let plan_creation_time = start.elapsed(); @@ -300,6 +301,7 @@ fn prepare_datafusion_session_context( local_dirs: Vec, max_temp_directory_size: u64, task_cpus: usize, + spark_config: &HashMap, ) -> CometResult { let paths = local_dirs.into_iter().map(PathBuf::from).collect(); let disk_manager = DiskManagerBuilder::default() @@ -308,10 +310,7 @@ fn prepare_datafusion_session_context( let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager); rt_config = rt_config.with_memory_pool(memory_pool); - // Get Datafusion configuration from Spark Execution context - // can be configured in Comet Spark JVM using Spark --conf parameters - // e.g: spark-shell --conf spark.datafusion.sql_parser.parse_float_as_decimal=true - let session_config = SessionConfig::new() + let mut session_config = SessionConfig::new() .with_target_partitions(task_cpus) // This DataFusion context is within the scope of an executing Spark Task. We want to set // its internal parallelism to the number of CPUs allocated to Spark Tasks. This can be @@ -328,6 +327,17 @@ fn prepare_datafusion_session_context( &ScalarValue::Float64(Some(1.1)), ); + // Pass through DataFusion configs from Spark. + // e.g: spark-shell --conf spark.comet.datafusion.sql_parser.parse_float_as_decimal=true + // becomes datafusion.sql_parser.parse_float_as_decimal=true + const SPARK_COMET_DF_PREFIX: &str = "spark.comet.datafusion."; + for (key, value) in spark_config { + if let Some(df_key) = key.strip_prefix(SPARK_COMET_DF_PREFIX) { + let df_key = format!("datafusion.{df_key}"); + session_config = session_config.set_str(&df_key, value); + } + } + let runtime = rt_config.build()?; let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 3156eb3873..d27f88b496 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -270,7 +270,13 @@ object CometExecIterator extends Logging { def serializeCometSQLConfs(): Array[Byte] = { val builder = ConfigMap.newBuilder() cometSqlConfs.foreach { case (k, v) => - builder.putEntries(k, v) + if (k.startsWith(s"${CometConf.COMET_PREFIX}.datafusion.")) { + if (CometConf.COMET_RESPECT_DATAFUSION_CONFIGS.get(SQLConf.get)) { + builder.putEntries(k, v) + } + } else { + builder.putEntries(k, v) + } } builder.build().toByteArray }