From 35ebdf55897bf732f57863e71fb1409d8264e564 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 07:39:57 -0700 Subject: [PATCH 01/21] Add opt-in DataFusion physical optimizer for native execution Add spark.comet.exec.nativePhysicalOptimizer.enabled config (default false) that runs DataFusion's physical optimizer rules on the native plan before execution. This enables optimizations such as projection pushdown, coalesce batches, filter pushdown, and limit pushdown. Per-operator Spark SQL metrics are skipped when enabled since the optimizer creates a new plan tree that invalidates the SparkPlan-to-ExecutionPlan mapping. Co-Authored-By: Claude Opus 4.6 --- .../scala/org/apache/comet/CometConf.scala | 12 ++++++++ native/core/src/execution/jni_api.rs | 28 +++++++++++++++++-- native/core/src/execution/spark_config.rs | 2 ++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 522ccbc94c..eb60919ae1 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -549,6 +549,18 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_NATIVE_PHYSICAL_OPTIMIZER_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.exec.nativePhysicalOptimizer.enabled") + .category(CATEGORY_EXEC) + .doc( + "When enabled, Comet will run DataFusion's physical optimizer rules on the " + + "native query plan before execution. This can improve performance through " + + "additional optimizations such as projection pushdown, coalesce batches, " + + "filter pushdown, and limit pushdown. Note: per-operator Spark SQL metrics " + + "will not be available when this is enabled.") + .booleanConf + .createWithDefault(false) + val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose" val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback" diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 146e0feb8e..d88cbcc88b 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -83,7 +83,7 @@ use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_t use crate::execution::spark_config::{ SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_MAX_TEMP_DIRECTORY_SIZE, - COMET_TRACING_ENABLED, + COMET_NATIVE_PHYSICAL_OPTIMIZER_ENABLED, COMET_TRACING_ENABLED, }; use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID}; use datafusion_comet_proto::spark_operator::operator::OpStruct; @@ -153,6 +153,8 @@ struct ExecutionContext { pub memory_pool_config: MemoryPoolConfig, /// Whether to log memory usage on each call to execute_plan pub tracing_enabled: bool, + /// Whether to run DataFusion's physical optimizer on the native plan + pub native_physical_optimizer_enabled: bool, } /// Accept serialized query plan and return the address of the native query plan. @@ -192,6 +194,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED); let max_temp_directory_size = spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024); + let native_physical_optimizer_enabled = + spark_config.get_bool(COMET_NATIVE_PHYSICAL_OPTIMIZER_ENABLED); with_trace("createPlan", tracing_enabled, || { // Init JVM classes @@ -286,6 +290,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( explain_native, memory_pool_config, tracing_enabled, + native_physical_optimizer_enabled, }); Ok(Box::into_raw(exec_context) as i64) @@ -495,9 +500,25 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let physical_plan_time = start.elapsed(); exec_context.plan_creation_time += physical_plan_time; - exec_context.root_op = Some(Arc::clone(&root_op)); exec_context.scans = scans; + let root_op = if exec_context.native_physical_optimizer_enabled { + let state = exec_context.session_ctx.state(); + let optimizers = state.physical_optimizers(); + let config = state.config_options().clone(); + let mut optimized_plan = Arc::clone(&root_op.native_plan); + for optimizer in optimizers { + optimized_plan = optimizer.optimize(optimized_plan, &config)?; + } + // Create a flat SparkPlan with no children since the optimizer + // invalidates the SparkPlan-to-ExecutionPlan mapping used for metrics + Arc::new(SparkPlan::new(root_op.plan_id, optimized_plan, vec![])) + } else { + root_op + }; + + exec_context.root_op = Some(Arc::clone(&root_op)); + if exec_context.explain_native { let formatted_plan_str = DisplayableExecutionPlan::new(root_op.native_plan.as_ref()).indent(true); @@ -618,6 +639,9 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan( /// Updates the metrics of the query plan. fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> CometResult<()> { + if exec_context.native_physical_optimizer_enabled { + return Ok(()); + } if let Some(native_query) = &exec_context.root_op { let metrics = exec_context.metrics.as_obj(); update_comet_metric(env, metrics, native_query) diff --git a/native/core/src/execution/spark_config.rs b/native/core/src/execution/spark_config.rs index 60ebb2ff8b..0c4c326eb3 100644 --- a/native/core/src/execution/spark_config.rs +++ b/native/core/src/execution/spark_config.rs @@ -21,6 +21,8 @@ pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled"; pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled"; pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled"; pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize"; +pub(crate) const COMET_NATIVE_PHYSICAL_OPTIMIZER_ENABLED: &str = + "spark.comet.exec.nativePhysicalOptimizer.enabled"; pub(crate) trait SparkConfig { fn get_bool(&self, name: &str) -> bool; From 5085794fbb3962daa38b63c6305ea2d4b7829edb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 07:43:56 -0700 Subject: [PATCH 02/21] Pass spark.comet.datafusion.* configs through to DataFusion session Spark configs with the prefix spark.comet.datafusion.* are now passed through to DataFusion's SessionConfig. The prefix spark.comet. is stripped so that e.g. spark.comet.datafusion.sql_parser.parse_float_as_decimal becomes datafusion.sql_parser.parse_float_as_decimal in DataFusion. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index d88cbcc88b..a667d6927e 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -250,6 +250,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( local_dirs_vec, max_temp_directory_size, task_cpus as usize, + &spark_config, )?; let plan_creation_time = start.elapsed(); @@ -305,6 +306,7 @@ fn prepare_datafusion_session_context( local_dirs: Vec, max_temp_directory_size: u64, task_cpus: usize, + spark_config: &HashMap, ) -> CometResult { let paths = local_dirs.into_iter().map(PathBuf::from).collect(); let disk_manager = DiskManagerBuilder::default() @@ -313,10 +315,7 @@ fn prepare_datafusion_session_context( let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager); rt_config = rt_config.with_memory_pool(memory_pool); - // Get Datafusion configuration from Spark Execution context - // can be configured in Comet Spark JVM using Spark --conf parameters - // e.g: spark-shell --conf spark.datafusion.sql_parser.parse_float_as_decimal=true - let session_config = SessionConfig::new() + let mut session_config = SessionConfig::new() .with_target_partitions(task_cpus) // This DataFusion context is within the scope of an executing Spark Task. We want to set // its internal parallelism to the number of CPUs allocated to Spark Tasks. This can be @@ -333,6 +332,17 @@ fn prepare_datafusion_session_context( &ScalarValue::Float64(Some(1.1)), ); + // Pass through DataFusion configs from Spark. + // e.g: spark-shell --conf spark.comet.datafusion.sql_parser.parse_float_as_decimal=true + // becomes datafusion.sql_parser.parse_float_as_decimal=true + const SPARK_COMET_DF_PREFIX: &str = "spark.comet.datafusion."; + for (key, value) in spark_config { + if let Some(df_key) = key.strip_prefix(SPARK_COMET_DF_PREFIX) { + let df_key = format!("datafusion.{df_key}"); + session_config = session_config.set_str(&df_key, value); + } + } + let runtime = rt_config.build()?; let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); From 6631f8dfe86854741c241f8e36204cf53f5453f6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 07:56:34 -0700 Subject: [PATCH 03/21] improve config docs --- .../main/scala/org/apache/comet/CometConf.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index eb60919ae1..7d6fe34d26 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -122,16 +122,14 @@ object CometConf extends ShimCometConf { val SCAN_AUTO = "auto" val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl") - .category(CATEGORY_SCAN) + .category(CATEGORY_PARQUET) .doc( - "The implementation of Comet Native Scan to use. Available modes are " + + "The implementation of Comet's Parquet scan to use. Available scans are " + s"`$SCAN_NATIVE_DATAFUSION`, and `$SCAN_NATIVE_ICEBERG_COMPAT`. " + - s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation of scan based on " + - "DataFusion. " + - s"`$SCAN_NATIVE_ICEBERG_COMPAT` is the recommended native implementation that " + - "exposes apis to read parquet columns natively and supports complex types. " + - s"`$SCAN_AUTO` (default) chooses the best scan.") - .internal() + s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation, and " + + s"`$SCAN_NATIVE_ICEBERG_COMPAT` is a hybrid implementation that supports some " + + "additional features, such as row indexes and field ids. " + + s"`$SCAN_AUTO` (default) chooses the best available scan based on the scan schema.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) .checkValues(Set(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO)) From ffb23f8315827178e2aab482649c55f237bc1681 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 07:57:59 -0700 Subject: [PATCH 04/21] optimize --- dev/benchmarks/comet-tpch.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh index f0709b7ef0..865d044ef8 100755 --- a/dev/benchmarks/comet-tpch.sh +++ b/dev/benchmarks/comet-tpch.sh @@ -41,6 +41,9 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ --conf spark.comet.exec.replaceSortMergeJoin=true \ + --conf spark.comet.scan.impl=native_datafusion \ + --conf spark.comet.datafusion.execution.parquet.pushdown_filters=true \ + --conf spark.comet.exec.nativePhysicalOptimizer.enabled=true \ --conf spark.comet.expression.Cast.allowIncompatible=true \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ From 0a5ed958a46893b0d9b920f6ea57acb49929ee2e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 08:02:37 -0700 Subject: [PATCH 05/21] Log physical plan before and after DataFusion optimization When both the native physical optimizer and explain native are enabled, log the plan tree before and after running DataFusion optimizer rules so users can see exactly what changed. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a667d6927e..2cd4dc748a 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -517,9 +517,23 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let optimizers = state.physical_optimizers(); let config = state.config_options().clone(); let mut optimized_plan = Arc::clone(&root_op.native_plan); + + if exec_context.explain_native { + let before = DisplayableExecutionPlan::new(optimized_plan.as_ref()) + .indent(true); + info!("Comet native plan before DataFusion optimization:\n{before}"); + } + for optimizer in optimizers { optimized_plan = optimizer.optimize(optimized_plan, &config)?; } + + if exec_context.explain_native { + let after = DisplayableExecutionPlan::new(optimized_plan.as_ref()) + .indent(true); + info!("Comet native plan after DataFusion optimization:\n{after}"); + } + // Create a flat SparkPlan with no children since the optimizer // invalidates the SparkPlan-to-ExecutionPlan mapping used for metrics Arc::new(SparkPlan::new(root_op.plan_id, optimized_plan, vec![])) From a56d2641218f1f6b709cebb926d18765a78a2fb3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 08:05:37 -0700 Subject: [PATCH 06/21] Run make format Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 2cd4dc748a..9b3ee0c7ab 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -519,8 +519,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let mut optimized_plan = Arc::clone(&root_op.native_plan); if exec_context.explain_native { - let before = DisplayableExecutionPlan::new(optimized_plan.as_ref()) - .indent(true); + let before = + DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); info!("Comet native plan before DataFusion optimization:\n{before}"); } @@ -529,8 +529,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( } if exec_context.explain_native { - let after = DisplayableExecutionPlan::new(optimized_plan.as_ref()) - .indent(true); + let after = + DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); info!("Comet native plan after DataFusion optimization:\n{after}"); } From 89ce7c7e67c2d98c50bbe1f4dc496d2ebcad52f5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 08:13:19 -0700 Subject: [PATCH 07/21] mark feature as experimental --- common/src/main/scala/org/apache/comet/CometConf.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 7d6fe34d26..c9bcced9d0 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -549,15 +549,15 @@ object CometConf extends ShimCometConf { val COMET_NATIVE_PHYSICAL_OPTIMIZER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.nativePhysicalOptimizer.enabled") - .category(CATEGORY_EXEC) + .category(CATEGORY_TESTING) .doc( "When enabled, Comet will run DataFusion's physical optimizer rules on the " + "native query plan before execution. This can improve performance through " + "additional optimizations such as projection pushdown, coalesce batches, " + "filter pushdown, and limit pushdown. Note: per-operator Spark SQL metrics " + - "will not be available when this is enabled.") + "will not be available when this is enabled. This feature is highly experimental.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose" val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback" From c955b0233f67bc264b11572d1ce3c3eaacac1562 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 09:20:46 -0700 Subject: [PATCH 08/21] Skip EnsureCooperative optimizer to fix infinite busy loop The EnsureCooperative rule inserts CooperativeExec nodes that consume Tokio's per-task coop budget. After ~128 batches the budget is exhausted and CooperativeExec returns Poll::Pending, but Comet's manual poll!() loop never yields back to block_on so the budget is never reset, causing the task to spin forever without producing output. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 9b3ee0c7ab..a9cd37cbb0 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -525,6 +525,15 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( } for optimizer in optimizers { + // EnsureCooperative inserts CooperativeExec nodes that use + // Tokio's per-task coop budget. This is incompatible with + // Comet's manual poll!() loop: once the budget is exhausted + // (~128 batches), CooperativeExec returns Poll::Pending but + // the budget is never reset (block_on never re-polls the + // top-level future), causing an infinite busy loop. + if optimizer.name() == "EnsureCooperative" { + continue; + } optimized_plan = optimizer.optimize(optimized_plan, &config)?; } From e3dc369f21e37771109190070be242ee46cb83a4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 09:31:50 -0700 Subject: [PATCH 09/21] Skip DataFusion optimizer rules incompatible with Comet execution model Spark handles distribution and sorting via shuffles, and Comet always executes partition 0 within each Spark task. Several DataFusion physical optimizer rules assume full control over distribution and scheduling, which conflicts with this model: - EnforceDistribution: inserts RepartitionExec - EnforceSorting/OutputRequirements: adds sorting Spark already handles - JoinSelection: may change join types Comet doesn't support - SanityCheckPlan: rejects plans based on inapplicable distribution rules - EnsureCooperative: CooperativeExec causes infinite busy loop in Comet's manual poll!() loop The remaining rules (projection pushdown, coalesce batches, filter pushdown, limit pushdown, etc.) are safe and beneficial. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 29 +++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a9cd37cbb0..bb3f7a7b82 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -524,14 +524,29 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( info!("Comet native plan before DataFusion optimization:\n{before}"); } + // Skip optimizer rules that are incompatible with Comet's + // execution model where Spark handles distribution/sorting + // via shuffles and Comet always executes partition 0. + let skip_rules = [ + // Inserts RepartitionExec which conflicts with Comet + // executing a single partition per Spark task + "EnforceDistribution", + // Adds sorting that Spark already handles externally + "EnforceSorting", + // Bookkeeping for EnforceSorting + "OutputRequirements", + // May change join implementations to types Comet + // doesn't support natively + "JoinSelection", + // Validates distribution/ordering properties that don't + // apply in Comet's Spark-managed execution model + "SanityCheckPlan", + // CooperativeExec uses Tokio's per-task coop budget + // which is never reset in Comet's manual poll!() loop + "EnsureCooperative", + ]; for optimizer in optimizers { - // EnsureCooperative inserts CooperativeExec nodes that use - // Tokio's per-task coop budget. This is incompatible with - // Comet's manual poll!() loop: once the budget is exhausted - // (~128 batches), CooperativeExec returns Poll::Pending but - // the budget is never reset (block_on never re-polls the - // top-level future), causing an infinite busy loop. - if optimizer.name() == "EnsureCooperative" { + if skip_rules.contains(&optimizer.name()) { continue; } optimized_plan = optimizer.optimize(optimized_plan, &config)?; From 0b8771b6916d7a201acce080e0a519a2ad373854 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 09:38:50 -0700 Subject: [PATCH 10/21] Re-enable metrics when native physical optimizer is active Keep the original SparkPlan tree structure (with children) instead of flattening it, and remove the early return in update_metrics. The optimizer preserves the general plan shape, so the tree structure remains valid for metrics collection. The root node's native plan is replaced with the optimized version for execution while children retain their original references. Co-Authored-By: Claude Opus 4.6 --- .../src/main/scala/org/apache/comet/CometConf.scala | 3 +-- native/core/src/execution/jni_api.rs | 13 +++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index c9bcced9d0..7fa1482341 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -554,8 +554,7 @@ object CometConf extends ShimCometConf { "When enabled, Comet will run DataFusion's physical optimizer rules on the " + "native query plan before execution. This can improve performance through " + "additional optimizations such as projection pushdown, coalesce batches, " + - "filter pushdown, and limit pushdown. Note: per-operator Spark SQL metrics " + - "will not be available when this is enabled. This feature is highly experimental.") + "filter pushdown, and limit pushdown. This feature is highly experimental.") .booleanConf .createWithDefault(true) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index bb3f7a7b82..fed6710749 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -558,9 +558,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( info!("Comet native plan after DataFusion optimization:\n{after}"); } - // Create a flat SparkPlan with no children since the optimizer - // invalidates the SparkPlan-to-ExecutionPlan mapping used for metrics - Arc::new(SparkPlan::new(root_op.plan_id, optimized_plan, vec![])) + // Keep the original SparkPlan tree structure (for metrics) + // but replace the root's native plan with the optimized version + Arc::new(SparkPlan::new( + root_op.plan_id, + optimized_plan, + root_op.children.clone(), + )) } else { root_op }; @@ -687,9 +691,6 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan( /// Updates the metrics of the query plan. fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> CometResult<()> { - if exec_context.native_physical_optimizer_enabled { - return Ok(()); - } if let Some(native_query) = &exec_context.root_op { let metrics = exec_context.metrics.as_obj(); update_comet_metric(env, metrics, native_query) From 00e6c45857372c68f32066e3ddf9120dd958ad4d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 09:56:55 -0700 Subject: [PATCH 11/21] Fix clippy clone_on_ref_ptr lint Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index fed6710749..41a7eb0b58 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -515,7 +515,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let root_op = if exec_context.native_physical_optimizer_enabled { let state = exec_context.session_ctx.state(); let optimizers = state.physical_optimizers(); - let config = state.config_options().clone(); + let config = Arc::clone(state.config_options()); let mut optimized_plan = Arc::clone(&root_op.native_plan); if exec_context.explain_native { From 5ad54364ba4ba46610ed117db34b81d9a9e9b8fc Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 12:26:02 -0700 Subject: [PATCH 12/21] Skip FilterPushdown, CoalesceBatches, and ProjectionPushdown optimizer rules - Use starts_with matching to catch both FilterPushdown and FilterPushdown(Post) variants - Spark already handles projection and filter pushdown at the logical level so these are redundant and add overhead Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 41a7eb0b58..6e68ddf8f5 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -544,9 +544,19 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( // CooperativeExec uses Tokio's per-task coop budget // which is never reset in Comet's manual poll!() loop "EnsureCooperative", + // Filter pushdown into parquet scans can cause + // performance regressions + "FilterPushdown", + "CoalesceBatches", + // Spark already handles projection pushdown at the + // logical level + "ProjectionPushdown", ]; for optimizer in optimizers { - if skip_rules.contains(&optimizer.name()) { + if skip_rules + .iter() + .any(|r| optimizer.name().starts_with(r)) + { continue; } optimized_plan = optimizer.optimize(optimized_plan, &config)?; From a53c2c95578963c1b8f9f8c90101242c3b9238ed Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 13:36:46 -0700 Subject: [PATCH 13/21] Configure physical optimizer rules at session creation and add logging Replace runtime name-based filtering with an explicit allow list of physical optimizer rules set via SessionStateBuilder. Only rules that are compatible with Comet's execution model are included. Add temporary info logging to detect which rules actually modify the plan. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 75 +++++++++++++++------------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 6e68ddf8f5..ff49b0d4e8 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -345,7 +345,39 @@ fn prepare_datafusion_session_context( let runtime = rt_config.build()?; - let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime)); + // Only include physical optimizer rules that are compatible with + // Comet's execution model. Spark handles distribution, sorting, + // filter/projection pushdown, and join selection externally. + use datafusion::physical_optimizer::{ + aggregate_statistics::AggregateStatistics, + combine_partial_final_agg::CombinePartialFinalAggregate, + limit_pushdown::LimitPushdown, + limit_pushdown_past_window::LimitPushPastWindows, + limited_distinct_aggregation::LimitedDistinctAggregation, + topk_aggregation::TopKAggregation, + update_aggr_exprs::OptimizeAggregateOrder, + PhysicalOptimizerRule, + }; + use datafusion::execution::SessionStateBuilder; + + let physical_optimizer_rules: Vec> = vec![ + Arc::new(AggregateStatistics::new()), + Arc::new(LimitedDistinctAggregation::new()), + Arc::new(CombinePartialFinalAggregate::new()), + Arc::new(OptimizeAggregateOrder::new()), + Arc::new(TopKAggregation::new()), + Arc::new(LimitPushPastWindows::new()), + Arc::new(LimitPushdown::new()), + ]; + + let state = SessionStateBuilder::new() + .with_config(session_config) + .with_runtime_env(Arc::new(runtime)) + .with_default_features() + .with_physical_optimizer_rules(physical_optimizer_rules) + .build(); + + let mut session_ctx = SessionContext::new_with_state(state); datafusion::functions_nested::register_all(&mut session_ctx)?; register_datafusion_spark_function(&session_ctx); @@ -524,42 +556,15 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( info!("Comet native plan before DataFusion optimization:\n{before}"); } - // Skip optimizer rules that are incompatible with Comet's - // execution model where Spark handles distribution/sorting - // via shuffles and Comet always executes partition 0. - let skip_rules = [ - // Inserts RepartitionExec which conflicts with Comet - // executing a single partition per Spark task - "EnforceDistribution", - // Adds sorting that Spark already handles externally - "EnforceSorting", - // Bookkeeping for EnforceSorting - "OutputRequirements", - // May change join implementations to types Comet - // doesn't support natively - "JoinSelection", - // Validates distribution/ordering properties that don't - // apply in Comet's Spark-managed execution model - "SanityCheckPlan", - // CooperativeExec uses Tokio's per-task coop budget - // which is never reset in Comet's manual poll!() loop - "EnsureCooperative", - // Filter pushdown into parquet scans can cause - // performance regressions - "FilterPushdown", - "CoalesceBatches", - // Spark already handles projection pushdown at the - // logical level - "ProjectionPushdown", - ]; for optimizer in optimizers { - if skip_rules - .iter() - .any(|r| optimizer.name().starts_with(r)) - { - continue; - } + let before_plan = Arc::clone(&optimized_plan); optimized_plan = optimizer.optimize(optimized_plan, &config)?; + if !Arc::ptr_eq(&before_plan, &optimized_plan) { + info!( + "Physical optimizer rule '{}' changed the plan", + optimizer.name() + ); + } } if exec_context.explain_native { From edbc3fdc3699510c5380a21e56cb4712d81cf144 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 14:09:12 -0700 Subject: [PATCH 14/21] Add physical common subexpression elimination optimizer rule Introduce a PhysicalOptimizerRule that identifies repeated subexpressions within ProjectionExec nodes and rewrites the plan to compute them once via an intermediate projection, avoiding redundant evaluation. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 3 + native/core/src/execution/mod.rs | 1 + native/core/src/execution/physical_cse.rs | 333 ++++++++++++++++++++++ 3 files changed, 337 insertions(+) create mode 100644 native/core/src/execution/physical_cse.rs diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index ff49b0d4e8..0efb59d5a9 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -360,6 +360,8 @@ fn prepare_datafusion_session_context( }; use datafusion::execution::SessionStateBuilder; + use crate::execution::physical_cse::PhysicalCommonSubexprEliminate; + let physical_optimizer_rules: Vec> = vec![ Arc::new(AggregateStatistics::new()), Arc::new(LimitedDistinctAggregation::new()), @@ -368,6 +370,7 @@ fn prepare_datafusion_session_context( Arc::new(TopKAggregation::new()), Arc::new(LimitPushPastWindows::new()), Arc::new(LimitPushdown::new()), + Arc::new(PhysicalCommonSubexprEliminate::new()), ]; let state = SessionStateBuilder::new() diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index 85fc672461..1fae6a04f5 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -21,6 +21,7 @@ pub mod expressions; pub mod jni_api; pub(crate) mod metrics; pub mod operators; +pub(crate) mod physical_cse; pub(crate) mod planner; pub mod serde; pub mod shuffle; diff --git a/native/core/src/execution/physical_cse.rs b/native/core/src/execution/physical_cse.rs new file mode 100644 index 0000000000..e430d96a16 --- /dev/null +++ b/native/core/src/execution/physical_cse.rs @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical common subexpression elimination (CSE) optimizer rule. +//! +//! Identifies repeated subexpressions within a `ProjectionExec`'s expression +//! list and rewrites the plan to compute them once via an intermediate +//! projection. + +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use datafusion::common::config::ConfigOptions; +use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion}; +use datafusion::common::Result; +use datafusion::physical_expr::expressions::{Column, Literal}; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_expr_common::physical_expr::is_volatile; + +/// A wrapper around `Arc` that implements `Eq` and `Hash` +/// by delegating to the trait-object implementations on `dyn PhysicalExpr`. +struct ExprKey(Arc); + +impl PartialEq for ExprKey { + fn eq(&self, other: &Self) -> bool { + self.0.as_ref() == other.0.as_ref() + } +} + +impl Eq for ExprKey {} + +impl Hash for ExprKey { + fn hash(&self, state: &mut H) { + self.0.as_ref().hash(state); + } +} + +/// Physical optimizer rule that eliminates common subexpressions within +/// `ProjectionExec` nodes. +#[derive(Debug)] +pub struct PhysicalCommonSubexprEliminate; + +impl PhysicalCommonSubexprEliminate { + pub fn new() -> Self { + Self + } +} + +impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + plan.transform_up(|node| { + if let Some(projection) = node.as_any().downcast_ref::() { + try_optimize_projection(projection) + } else { + Ok(Transformed::no(node)) + } + }) + .data() + } + + fn name(&self) -> &str { + "physical_common_subexpr_eliminate" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// Returns `true` if the expression is trivial (a `Column` or `Literal`) +/// and therefore not worth extracting as a common subexpression. +fn is_trivial(expr: &Arc) -> bool { + expr.as_any().downcast_ref::().is_some() + || expr.as_any().downcast_ref::().is_some() +} + +/// Recursively collect all sub-expressions from `expr` and increment their +/// occurrence count in `counts`. +fn collect_subexprs( + expr: &Arc, + counts: &mut HashMap, +) { + if is_trivial(expr) || is_volatile(expr) { + return; + } + let key = ExprKey(Arc::clone(expr)); + *counts.entry(key).or_insert(0) += 1; + for child in expr.children() { + collect_subexprs(child, counts); + } +} + +/// Identify sub-expressions that appear 2+ times across the projection's +/// expression list. Returns a deduplicated vec. +fn find_common_subexprs( + exprs: &[ProjectionExpr], +) -> Vec> { + let mut counts: HashMap = HashMap::new(); + for proj_expr in exprs { + collect_subexprs(&proj_expr.expr, &mut counts); + } + // Collect expressions with count >= 2, preserving insertion order is not + // required since we assign deterministic names based on index. + counts + .into_iter() + .filter(|(_, count)| *count >= 2) + .map(|(key, _)| key.0) + .collect() +} + +/// Replace occurrences of any common subexpression in `expr` with a `Column` +/// reference into the intermediate projection's schema. +fn rewrite_expr( + expr: Arc, + cse_map: &HashMap, +) -> Result> { + expr.transform_down(|node| { + if is_trivial(&node) { + return Ok(Transformed::no(node)); + } + let lookup = ExprKey(Arc::clone(&node)); + if let Some((name, index)) = cse_map.get(&lookup) { + // Replace with a column reference and skip recursing into children + let col = Arc::new(Column::new(name, *index)) + as Arc; + Ok(Transformed::new(col, true, TreeNodeRecursion::Jump)) + } else { + Ok(Transformed::no(node)) + } + }) + .data() +} + +/// Attempt to optimize a single `ProjectionExec` by extracting common +/// subexpressions into an intermediate projection. +fn try_optimize_projection( + projection: &ProjectionExec, +) -> Result>> { + let proj_exprs = projection.expr(); + let common = find_common_subexprs(proj_exprs); + if common.is_empty() { + return Ok(Transformed::no( + Arc::new(projection.clone()) as Arc + )); + } + + let input = projection.input(); + let input_schema = input.schema(); + let num_input_cols = input_schema.fields().len(); + + // Build the intermediate projection: pass through all input columns, then + // append one column per common subexpression. + let mut intermediate_exprs: Vec = Vec::new(); + for (i, field) in input_schema.fields().iter().enumerate() { + intermediate_exprs.push(ProjectionExpr { + expr: Arc::new(Column::new(field.name(), i)), + alias: field.name().clone(), + }); + } + + // Map from common subexpression -> (cse_name, column_index_in_intermediate) + let mut cse_map: HashMap = HashMap::new(); + for (idx, cse_expr) in common.iter().enumerate() { + let cse_name = format!("__cse_{idx}"); + let col_index = num_input_cols + idx; + intermediate_exprs.push(ProjectionExpr { + expr: Arc::clone(cse_expr), + alias: cse_name.clone(), + }); + cse_map.insert(ExprKey(Arc::clone(cse_expr)), (cse_name, col_index)); + } + + let intermediate = + Arc::new(ProjectionExec::try_new(intermediate_exprs, Arc::clone(input))?) + as Arc; + + // Rewrite the top projection expressions to reference the intermediate. + let mut new_proj_exprs: Vec = Vec::new(); + for proj_expr in proj_exprs { + let rewritten = rewrite_expr(Arc::clone(&proj_expr.expr), &cse_map)?; + new_proj_exprs.push(ProjectionExpr { + expr: rewritten, + alias: proj_expr.alias.clone(), + }); + } + + let new_projection = + Arc::new(ProjectionExec::try_new(new_proj_exprs, intermediate)?) as Arc; + + Ok(Transformed::yes(new_projection)) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_expr::expressions::{binary, col}; + use datafusion::physical_plan::empty::EmptyExec; + use datafusion::logical_expr::Operator; + + fn test_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])) + } + + #[test] + fn test_cse_extracts_common_subexpr() -> Result<()> { + let schema = test_schema(); + let input: Arc = Arc::new(EmptyExec::new(schema.clone())); + + let a = col("a", &schema)?; + let b = col("b", &schema)?; + + // (a + b) * 2, (a + b) * 3 — both share (a + b) + let a_plus_b_1 = binary(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema)?; + let a_plus_b_2 = binary(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema)?; + + let two = Arc::new(Literal::new(datafusion::common::ScalarValue::Int32(Some(2)))); + let three = Arc::new(Literal::new(datafusion::common::ScalarValue::Int32(Some(3)))); + + let expr_x = binary(a_plus_b_1, Operator::Multiply, two, &schema)?; + let expr_y = binary(a_plus_b_2, Operator::Multiply, three, &schema)?; + + let projection = ProjectionExec::try_new( + vec![ + ProjectionExpr { + expr: expr_x, + alias: "x".to_string(), + }, + ProjectionExpr { + expr: expr_y, + alias: "y".to_string(), + }, + ], + input, + )?; + + let plan: Arc = Arc::new(projection); + let config = ConfigOptions::new(); + let rule = PhysicalCommonSubexprEliminate::new(); + let optimized = rule.optimize(plan, &config)?; + + // The optimized plan should be a ProjectionExec wrapping another ProjectionExec + let top = optimized + .as_any() + .downcast_ref::() + .expect("top should be ProjectionExec"); + let intermediate = top + .input() + .as_any() + .downcast_ref::() + .expect("intermediate should be ProjectionExec"); + + // Intermediate should have input columns + 1 CSE column + assert_eq!(intermediate.expr().len(), 3); // a, b, __cse_0 + + // The CSE column should be named __cse_0 + let cse_alias = &intermediate.expr()[2].alias; + assert_eq!(cse_alias, "__cse_0"); + + // Top projection should still produce 2 output columns + assert_eq!(top.expr().len(), 2); + assert_eq!(top.expr()[0].alias, "x"); + assert_eq!(top.expr()[1].alias, "y"); + + Ok(()) + } + + #[test] + fn test_no_cse_when_no_common_subexpr() -> Result<()> { + let schema = test_schema(); + let input: Arc = Arc::new(EmptyExec::new(schema.clone())); + + let a = col("a", &schema)?; + let b = col("b", &schema)?; + + let projection = ProjectionExec::try_new( + vec![ + ProjectionExpr { + expr: a, + alias: "a".to_string(), + }, + ProjectionExpr { + expr: b, + alias: "b".to_string(), + }, + ], + input, + )?; + + let plan: Arc = Arc::new(projection); + let config = ConfigOptions::new(); + let rule = PhysicalCommonSubexprEliminate::new(); + let optimized = rule.optimize(Arc::clone(&plan), &config)?; + + // No intermediate projection should be inserted + let top = optimized + .as_any() + .downcast_ref::() + .expect("should be ProjectionExec"); + // Input should be EmptyExec, not another ProjectionExec + assert!(top + .input() + .as_any() + .downcast_ref::() + .is_none()); + + Ok(()) + } +} From 2e9f34e8e615eef245117098bb3a6e557fee308d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 15:02:12 -0700 Subject: [PATCH 15/21] Log before/after plans on optimizer change and isolate CSE rule for testing Comment out all other physical optimizer rules temporarily to test the new CSE rule in isolation. Add before/after plan logging when a rule changes the plan. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 36 ++++++++++++++++------------ 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 0efb59d5a9..2da8209253 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -349,13 +349,13 @@ fn prepare_datafusion_session_context( // Comet's execution model. Spark handles distribution, sorting, // filter/projection pushdown, and join selection externally. use datafusion::physical_optimizer::{ - aggregate_statistics::AggregateStatistics, - combine_partial_final_agg::CombinePartialFinalAggregate, - limit_pushdown::LimitPushdown, - limit_pushdown_past_window::LimitPushPastWindows, - limited_distinct_aggregation::LimitedDistinctAggregation, - topk_aggregation::TopKAggregation, - update_aggr_exprs::OptimizeAggregateOrder, + // aggregate_statistics::AggregateStatistics, + // combine_partial_final_agg::CombinePartialFinalAggregate, + // limit_pushdown::LimitPushdown, + // limit_pushdown_past_window::LimitPushPastWindows, + // limited_distinct_aggregation::LimitedDistinctAggregation, + // topk_aggregation::TopKAggregation, + // update_aggr_exprs::OptimizeAggregateOrder, PhysicalOptimizerRule, }; use datafusion::execution::SessionStateBuilder; @@ -363,13 +363,13 @@ fn prepare_datafusion_session_context( use crate::execution::physical_cse::PhysicalCommonSubexprEliminate; let physical_optimizer_rules: Vec> = vec![ - Arc::new(AggregateStatistics::new()), - Arc::new(LimitedDistinctAggregation::new()), - Arc::new(CombinePartialFinalAggregate::new()), - Arc::new(OptimizeAggregateOrder::new()), - Arc::new(TopKAggregation::new()), - Arc::new(LimitPushPastWindows::new()), - Arc::new(LimitPushdown::new()), + // Arc::new(AggregateStatistics::new()), + // Arc::new(LimitedDistinctAggregation::new()), + // Arc::new(CombinePartialFinalAggregate::new()), + // Arc::new(OptimizeAggregateOrder::new()), + // Arc::new(TopKAggregation::new()), + // Arc::new(LimitPushPastWindows::new()), + // Arc::new(LimitPushdown::new()), Arc::new(PhysicalCommonSubexprEliminate::new()), ]; @@ -563,8 +563,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let before_plan = Arc::clone(&optimized_plan); optimized_plan = optimizer.optimize(optimized_plan, &config)?; if !Arc::ptr_eq(&before_plan, &optimized_plan) { + let before_display = + DisplayableExecutionPlan::new(before_plan.as_ref()).indent(true); + let after_display = + DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); info!( - "Physical optimizer rule '{}' changed the plan", + "Physical optimizer rule '{}' changed the plan\n\ + BEFORE:\n{before_display}\n\ + AFTER:\n{after_display}", optimizer.name() ); } From bb5a3049136badbdd2606abcd355ddeb3372c88b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 8 Feb 2026 15:10:05 -0700 Subject: [PATCH 16/21] Fix false positive plan-change detection in physical CSE optimizer Return the original Arc when no common subexpressions are found so Arc::ptr_eq correctly identifies unchanged plans. Also includes formatting changes from cargo fmt. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 2 +- native/core/src/execution/physical_cse.rs | 30 +++++++++++++---------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 2da8209253..30cef604be 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -348,6 +348,7 @@ fn prepare_datafusion_session_context( // Only include physical optimizer rules that are compatible with // Comet's execution model. Spark handles distribution, sorting, // filter/projection pushdown, and join selection externally. + use datafusion::execution::SessionStateBuilder; use datafusion::physical_optimizer::{ // aggregate_statistics::AggregateStatistics, // combine_partial_final_agg::CombinePartialFinalAggregate, @@ -358,7 +359,6 @@ fn prepare_datafusion_session_context( // update_aggr_exprs::OptimizeAggregateOrder, PhysicalOptimizerRule, }; - use datafusion::execution::SessionStateBuilder; use crate::execution::physical_cse::PhysicalCommonSubexprEliminate; diff --git a/native/core/src/execution/physical_cse.rs b/native/core/src/execution/physical_cse.rs index e430d96a16..fd5df60732 100644 --- a/native/core/src/execution/physical_cse.rs +++ b/native/core/src/execution/physical_cse.rs @@ -29,10 +29,10 @@ use datafusion::common::config::ConfigOptions; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion}; use datafusion::common::Result; use datafusion::physical_expr::expressions::{Column, Literal}; +use datafusion::physical_expr_common::physical_expr::is_volatile; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion::physical_plan::ExecutionPlan; -use datafusion::physical_expr_common::physical_expr::is_volatile; /// A wrapper around `Arc` that implements `Eq` and `Hash` /// by delegating to the trait-object implementations on `dyn PhysicalExpr`. @@ -70,8 +70,8 @@ impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { _config: &ConfigOptions, ) -> Result> { plan.transform_up(|node| { - if let Some(projection) = node.as_any().downcast_ref::() { - try_optimize_projection(projection) + if node.as_any().downcast_ref::().is_some() { + try_optimize_projection(node) } else { Ok(Transformed::no(node)) } @@ -155,14 +155,13 @@ fn rewrite_expr( /// Attempt to optimize a single `ProjectionExec` by extracting common /// subexpressions into an intermediate projection. fn try_optimize_projection( - projection: &ProjectionExec, + node: Arc, ) -> Result>> { + let projection = node.as_any().downcast_ref::().unwrap(); let proj_exprs = projection.expr(); let common = find_common_subexprs(proj_exprs); if common.is_empty() { - return Ok(Transformed::no( - Arc::new(projection.clone()) as Arc - )); + return Ok(Transformed::no(node)); } let input = projection.input(); @@ -191,9 +190,10 @@ fn try_optimize_projection( cse_map.insert(ExprKey(Arc::clone(cse_expr)), (cse_name, col_index)); } - let intermediate = - Arc::new(ProjectionExec::try_new(intermediate_exprs, Arc::clone(input))?) - as Arc; + let intermediate = Arc::new(ProjectionExec::try_new( + intermediate_exprs, + Arc::clone(input), + )?) as Arc; // Rewrite the top projection expressions to reference the intermediate. let mut new_proj_exprs: Vec = Vec::new(); @@ -215,9 +215,9 @@ fn try_optimize_projection( mod tests { use super::*; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::logical_expr::Operator; use datafusion::physical_expr::expressions::{binary, col}; use datafusion::physical_plan::empty::EmptyExec; - use datafusion::logical_expr::Operator; fn test_schema() -> Arc { Arc::new(Schema::new(vec![ @@ -238,8 +238,12 @@ mod tests { let a_plus_b_1 = binary(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema)?; let a_plus_b_2 = binary(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema)?; - let two = Arc::new(Literal::new(datafusion::common::ScalarValue::Int32(Some(2)))); - let three = Arc::new(Literal::new(datafusion::common::ScalarValue::Int32(Some(3)))); + let two = Arc::new(Literal::new(datafusion::common::ScalarValue::Int32(Some( + 2, + )))); + let three = Arc::new(Literal::new(datafusion::common::ScalarValue::Int32(Some( + 3, + )))); let expr_x = binary(a_plus_b_1, Operator::Multiply, two, &schema)?; let expr_y = binary(a_plus_b_2, Operator::Multiply, three, &schema)?; From c413e5b56a67a3a1adb5f0f911b05c2a17f38d23 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 06:37:48 -0700 Subject: [PATCH 17/21] Log before/after plans for physical optimization on partition 0 only Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 30cef604be..0da65df0b2 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -559,21 +559,20 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( info!("Comet native plan before DataFusion optimization:\n{before}"); } + if partition == 0 { + let before_display = + DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); + info!("Native plan BEFORE physical optimization:\n{before_display}"); + } + for optimizer in optimizers { - let before_plan = Arc::clone(&optimized_plan); optimized_plan = optimizer.optimize(optimized_plan, &config)?; - if !Arc::ptr_eq(&before_plan, &optimized_plan) { - let before_display = - DisplayableExecutionPlan::new(before_plan.as_ref()).indent(true); - let after_display = - DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); - info!( - "Physical optimizer rule '{}' changed the plan\n\ - BEFORE:\n{before_display}\n\ - AFTER:\n{after_display}", - optimizer.name() - ); - } + } + + if partition == 0 { + let after_display = + DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); + info!("Native plan AFTER physical optimization:\n{after_display}"); } if exec_context.explain_native { From d7e41d9ab4d1954bcf3f80d3afae52975f9f597b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 07:00:26 -0700 Subject: [PATCH 18/21] Add debug logging to CSE rule and remove temp logging from jni_api Log plan tree node types, projection expressions, and identified common subexpressions to help diagnose why TPC-H Q1 is not being optimized. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 12 -------- native/core/src/execution/physical_cse.rs | 35 ++++++++++++++++++++++- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 0da65df0b2..b623af7f8e 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -559,22 +559,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( info!("Comet native plan before DataFusion optimization:\n{before}"); } - if partition == 0 { - let before_display = - DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); - info!("Native plan BEFORE physical optimization:\n{before_display}"); - } - for optimizer in optimizers { optimized_plan = optimizer.optimize(optimized_plan, &config)?; } - if partition == 0 { - let after_display = - DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); - info!("Native plan AFTER physical optimization:\n{after_display}"); - } - if exec_context.explain_native { let after = DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true); diff --git a/native/core/src/execution/physical_cse.rs b/native/core/src/execution/physical_cse.rs index fd5df60732..87853e1ccc 100644 --- a/native/core/src/execution/physical_cse.rs +++ b/native/core/src/execution/physical_cse.rs @@ -33,6 +33,7 @@ use datafusion::physical_expr_common::physical_expr::is_volatile; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion::physical_plan::ExecutionPlan; +use log::info; /// A wrapper around `Arc` that implements `Eq` and `Hash` /// by delegating to the trait-object implementations on `dyn PhysicalExpr`. @@ -69,8 +70,17 @@ impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { plan: Arc, _config: &ConfigOptions, ) -> Result> { + // Log the full plan tree with node types + log_plan_nodes(&plan, 0); + plan.transform_up(|node| { - if node.as_any().downcast_ref::().is_some() { + let node_name = node.name(); + let is_proj = node.as_any().downcast_ref::().is_some(); + info!( + "CSE visiting node: {node_name} (is_projection={is_proj})" + ); + + if is_proj { try_optimize_projection(node) } else { Ok(Transformed::no(node)) @@ -88,6 +98,15 @@ impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { } } +/// Log the plan tree with indentation showing node types. +fn log_plan_nodes(plan: &Arc, depth: usize) { + let indent = " ".repeat(depth); + info!("{indent}{}", plan.name()); + for child in plan.children() { + log_plan_nodes(child, depth + 1); + } +} + /// Returns `true` if the expression is trivial (a `Column` or `Literal`) /// and therefore not worth extracting as a common subexpression. fn is_trivial(expr: &Arc) -> bool { @@ -159,7 +178,21 @@ fn try_optimize_projection( ) -> Result>> { let projection = node.as_any().downcast_ref::().unwrap(); let proj_exprs = projection.expr(); + + info!( + "CSE analyzing ProjectionExec with {} expressions:", + proj_exprs.len() + ); + for (i, pe) in proj_exprs.iter().enumerate() { + info!(" expr[{i}]: {} AS {}", pe.expr, pe.alias); + } + let common = find_common_subexprs(proj_exprs); + info!("CSE found {} common subexpressions", common.len()); + for (i, cse) in common.iter().enumerate() { + info!(" common[{i}]: {cse}"); + } + if common.is_empty() { return Ok(Transformed::no(node)); } From fc2bab0b3c93bc1fbcd93ab83e2fe2bb9102a0c5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 07:26:02 -0700 Subject: [PATCH 19/21] Extend physical CSE optimizer to handle AggregateExec Scan aggregate function arguments for common subexpressions in first-stage (Partial/Single) aggregates, insert an intermediate ProjectionExec to pre-compute them, and rewrite the aggregate expressions to reference the new columns. Also removes debug logging added during development. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/physical_cse.rs | 319 +++++++++++++++++++--- 1 file changed, 276 insertions(+), 43 deletions(-) diff --git a/native/core/src/execution/physical_cse.rs b/native/core/src/execution/physical_cse.rs index 87853e1ccc..dfdb184131 100644 --- a/native/core/src/execution/physical_cse.rs +++ b/native/core/src/execution/physical_cse.rs @@ -17,9 +17,9 @@ //! Physical common subexpression elimination (CSE) optimizer rule. //! -//! Identifies repeated subexpressions within a `ProjectionExec`'s expression -//! list and rewrites the plan to compute them once via an intermediate -//! projection. +//! Identifies repeated subexpressions within `ProjectionExec` and +//! `AggregateExec` nodes and rewrites the plan to compute them once via an +//! intermediate projection. use std::collections::HashMap; use std::hash::{Hash, Hasher}; @@ -28,12 +28,13 @@ use std::sync::Arc; use datafusion::common::config::ConfigOptions; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion}; use datafusion::common::Result; +use datafusion::physical_expr::aggregate::AggregateFunctionExpr; use datafusion::physical_expr::expressions::{Column, Literal}; use datafusion::physical_expr_common::physical_expr::is_volatile; use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion::physical_plan::ExecutionPlan; -use log::info; /// A wrapper around `Arc` that implements `Eq` and `Hash` /// by delegating to the trait-object implementations on `dyn PhysicalExpr`. @@ -54,7 +55,7 @@ impl Hash for ExprKey { } /// Physical optimizer rule that eliminates common subexpressions within -/// `ProjectionExec` nodes. +/// `ProjectionExec` and `AggregateExec` nodes. #[derive(Debug)] pub struct PhysicalCommonSubexprEliminate; @@ -70,18 +71,11 @@ impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { plan: Arc, _config: &ConfigOptions, ) -> Result> { - // Log the full plan tree with node types - log_plan_nodes(&plan, 0); - plan.transform_up(|node| { - let node_name = node.name(); - let is_proj = node.as_any().downcast_ref::().is_some(); - info!( - "CSE visiting node: {node_name} (is_projection={is_proj})" - ); - - if is_proj { + if node.as_any().downcast_ref::().is_some() { try_optimize_projection(node) + } else if node.as_any().downcast_ref::().is_some() { + try_optimize_aggregate(node) } else { Ok(Transformed::no(node)) } @@ -98,15 +92,6 @@ impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { } } -/// Log the plan tree with indentation showing node types. -fn log_plan_nodes(plan: &Arc, depth: usize) { - let indent = " ".repeat(depth); - info!("{indent}{}", plan.name()); - for child in plan.children() { - log_plan_nodes(child, depth + 1); - } -} - /// Returns `true` if the expression is trivial (a `Column` or `Literal`) /// and therefore not worth extracting as a common subexpression. fn is_trivial(expr: &Arc) -> bool { @@ -130,14 +115,14 @@ fn collect_subexprs( } } -/// Identify sub-expressions that appear 2+ times across the projection's +/// Identify sub-expressions that appear 2+ times across the given /// expression list. Returns a deduplicated vec. fn find_common_subexprs( - exprs: &[ProjectionExpr], + exprs: &[Arc], ) -> Vec> { let mut counts: HashMap = HashMap::new(); - for proj_expr in exprs { - collect_subexprs(&proj_expr.expr, &mut counts); + for expr in exprs { + collect_subexprs(expr, &mut counts); } // Collect expressions with count >= 2, preserving insertion order is not // required since we assign deterministic names based on index. @@ -179,19 +164,9 @@ fn try_optimize_projection( let projection = node.as_any().downcast_ref::().unwrap(); let proj_exprs = projection.expr(); - info!( - "CSE analyzing ProjectionExec with {} expressions:", - proj_exprs.len() - ); - for (i, pe) in proj_exprs.iter().enumerate() { - info!(" expr[{i}]: {} AS {}", pe.expr, pe.alias); - } - - let common = find_common_subexprs(proj_exprs); - info!("CSE found {} common subexpressions", common.len()); - for (i, cse) in common.iter().enumerate() { - info!(" common[{i}]: {cse}"); - } + let raw_exprs: Vec> = + proj_exprs.iter().map(|pe| Arc::clone(&pe.expr)).collect(); + let common = find_common_subexprs(&raw_exprs); if common.is_empty() { return Ok(Transformed::no(node)); @@ -244,12 +219,140 @@ fn try_optimize_projection( Ok(Transformed::yes(new_projection)) } +/// Attempt to optimize a single `AggregateExec` by extracting common +/// subexpressions from aggregate function arguments into an intermediate +/// projection. +fn try_optimize_aggregate( + node: Arc, +) -> Result>> { + let agg_exec = node.as_any().downcast_ref::().unwrap(); + + // Only optimize first-stage aggregates where original column expressions + // are present. Final/FinalPartitioned reference partial outputs. + if !agg_exec.mode().is_first_stage() { + return Ok(Transformed::no(node)); + } + + // Collect all sub-expressions from aggregate function arguments. + let aggr_exprs = agg_exec.aggr_expr(); + let all_args: Vec> = aggr_exprs + .iter() + .flat_map(|agg_fn| agg_fn.expressions()) + .collect(); + + let common = find_common_subexprs(&all_args); + if common.is_empty() { + return Ok(Transformed::no(node)); + } + + let input = agg_exec.input(); + let input_schema = input.schema(); + let num_input_cols = input_schema.fields().len(); + + // Build an intermediate projection: pass through all input columns, then + // append one column per common subexpression. + let mut intermediate_exprs: Vec = Vec::new(); + for (i, field) in input_schema.fields().iter().enumerate() { + intermediate_exprs.push(ProjectionExpr { + expr: Arc::new(Column::new(field.name(), i)), + alias: field.name().clone(), + }); + } + + // Map from common subexpression -> (cse_name, column_index_in_intermediate) + let mut cse_map: HashMap = HashMap::new(); + for (idx, cse_expr) in common.iter().enumerate() { + let cse_name = format!("__cse_{idx}"); + let col_index = num_input_cols + idx; + intermediate_exprs.push(ProjectionExpr { + expr: Arc::clone(cse_expr), + alias: cse_name.clone(), + }); + cse_map.insert(ExprKey(Arc::clone(cse_expr)), (cse_name, col_index)); + } + + let intermediate = Arc::new(ProjectionExec::try_new( + intermediate_exprs, + Arc::clone(input), + )?) as Arc; + let intermediate_schema = intermediate.schema(); + + // Rewrite each aggregate function's arguments to reference CSE columns. + let mut new_aggr_exprs: Vec> = Vec::new(); + for agg_fn in aggr_exprs { + let old_args = agg_fn.expressions(); + let mut new_args = Vec::with_capacity(old_args.len()); + for arg in &old_args { + new_args.push(rewrite_expr(Arc::clone(arg), &cse_map)?); + } + let order_by_exprs: Vec> = agg_fn + .order_bys() + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect(); + let new_agg_fn = agg_fn + .with_new_expressions(new_args, order_by_exprs) + .ok_or_else(|| { + datafusion::common::DataFusionError::Internal(format!( + "Failed to rewrite aggregate expression: {}", + agg_fn.name() + )) + })?; + new_aggr_exprs.push(Arc::new(new_agg_fn)); + } + + // Rewrite filter expressions if they reference common subexpressions. + let new_filters: Vec>> = agg_exec + .filter_expr() + .iter() + .map(|filter_opt| { + filter_opt + .as_ref() + .map(|f| rewrite_expr(Arc::clone(f), &cse_map)) + .transpose() + }) + .collect::>()?; + + // Rewrite group-by expressions to reference CSE columns. + let old_group_by = agg_exec.group_expr(); + let new_group_exprs: Vec<(Arc, String)> = + old_group_by + .expr() + .iter() + .map(|(expr, alias)| { + Ok((rewrite_expr(Arc::clone(expr), &cse_map)?, alias.clone())) + }) + .collect::>()?; + let new_null_exprs: Vec<(Arc, String)> = + old_group_by + .null_expr() + .iter() + .map(|(expr, alias)| (Arc::clone(expr), alias.clone())) + .collect(); + let new_group_by = + PhysicalGroupBy::new(new_group_exprs, new_null_exprs, old_group_by.groups().to_vec()); + + let new_agg = AggregateExec::try_new( + *agg_exec.mode(), + new_group_by, + new_aggr_exprs, + new_filters, + intermediate, + intermediate_schema, + )?; + + Ok(Transformed::yes(Arc::new(new_agg) as Arc)) +} + #[cfg(test)] mod tests { use super::*; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::logical_expr::Operator; + use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_expr::expressions::{binary, col}; + use datafusion::physical_plan::aggregates::AggregateMode; use datafusion::physical_plan::empty::EmptyExec; fn test_schema() -> Arc { @@ -262,7 +365,7 @@ mod tests { #[test] fn test_cse_extracts_common_subexpr() -> Result<()> { let schema = test_schema(); - let input: Arc = Arc::new(EmptyExec::new(schema.clone())); + let input: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); let a = col("a", &schema)?; let b = col("b", &schema)?; @@ -329,7 +432,7 @@ mod tests { #[test] fn test_no_cse_when_no_common_subexpr() -> Result<()> { let schema = test_schema(); - let input: Arc = Arc::new(EmptyExec::new(schema.clone())); + let input: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); let a = col("a", &schema)?; let b = col("b", &schema)?; @@ -367,4 +470,134 @@ mod tests { Ok(()) } + + #[test] + fn test_aggregate_cse_extracts_common_subexpr() -> Result<()> { + // Schema: a INT64, b INT64, c INT64 + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + Field::new("c", DataType::Int64, false), + ])); + let input: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + let a = col("a", &schema)?; + let b = col("b", &schema)?; + let c = col("c", &schema)?; + + // Common subexpression: a + b + let a_plus_b_1 = binary(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema)?; + let a_plus_b_2 = binary(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema)?; + + // sum(a + b) and sum((a + b) * c) — both share (a + b) + let agg1 = AggregateExprBuilder::new(sum_udaf(), vec![a_plus_b_1]) + .schema(Arc::clone(&schema)) + .alias("sum1") + .with_ignore_nulls(false) + .with_distinct(false) + .build()?; + + let expr2 = binary(a_plus_b_2, Operator::Multiply, Arc::clone(&c), &schema)?; + let agg2 = AggregateExprBuilder::new(sum_udaf(), vec![expr2]) + .schema(Arc::clone(&schema)) + .alias("sum2") + .with_ignore_nulls(false) + .with_distinct(false) + .build()?; + + let group_by = PhysicalGroupBy::new_single(vec![]); + let aggregate = AggregateExec::try_new( + AggregateMode::Partial, + group_by, + vec![Arc::new(agg1), Arc::new(agg2)], + vec![None, None], + input, + Arc::clone(&schema), + )?; + + let plan: Arc = Arc::new(aggregate); + let config = ConfigOptions::new(); + let rule = PhysicalCommonSubexprEliminate::new(); + let optimized = rule.optimize(plan, &config)?; + + // The optimized plan should be an AggregateExec wrapping a ProjectionExec + let top_agg = optimized + .as_any() + .downcast_ref::() + .expect("top should be AggregateExec"); + let intermediate = top_agg + .input() + .as_any() + .downcast_ref::() + .expect("intermediate should be ProjectionExec"); + + // Intermediate should have input columns (a, b, c) + 1 CSE column + assert_eq!(intermediate.expr().len(), 4); // a, b, c, __cse_0 + + // The CSE column should be named __cse_0 + let cse_alias = &intermediate.expr()[3].alias; + assert_eq!(cse_alias, "__cse_0"); + + // Aggregate should still produce 2 aggregate expressions + assert_eq!(top_agg.aggr_expr().len(), 2); + + Ok(()) + } + + #[test] + fn test_aggregate_cse_skips_final_mode() -> Result<()> { + // Final-mode aggregates should not be optimized + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + let input: Arc = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + let a = col("a", &schema)?; + + let agg1 = AggregateExprBuilder::new(sum_udaf(), vec![Arc::clone(&a)]) + .schema(Arc::clone(&schema)) + .alias("sum1") + .with_ignore_nulls(false) + .with_distinct(false) + .build()?; + + let agg2 = AggregateExprBuilder::new(sum_udaf(), vec![Arc::clone(&a)]) + .schema(Arc::clone(&schema)) + .alias("sum2") + .with_ignore_nulls(false) + .with_distinct(false) + .build()?; + + let group_by = PhysicalGroupBy::new_single(vec![]); + let aggregate = AggregateExec::try_new( + AggregateMode::Final, + group_by, + vec![Arc::new(agg1), Arc::new(agg2)], + vec![None, None], + input, + Arc::clone(&schema), + )?; + + let plan: Arc = Arc::new(aggregate); + let config = ConfigOptions::new(); + let rule = PhysicalCommonSubexprEliminate::new(); + let optimized = rule.optimize(plan, &config)?; + + // Should still be an AggregateExec with no intermediate ProjectionExec + let top_agg = optimized + .as_any() + .downcast_ref::() + .expect("should be AggregateExec"); + assert!( + top_agg + .input() + .as_any() + .downcast_ref::() + .is_none(), + "Final-mode aggregate should not have intermediate projection" + ); + + Ok(()) + } } From b1b46bf8699a91c1a8fd14070d9a0c34c377d25c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 07:46:53 -0700 Subject: [PATCH 20/21] Add logging/timing to CSE rule and filter redundant sub-CSEs - Log when the rule rewrites a ProjectionExec or AggregateExec, including which expressions were extracted - Log wall-clock time for the CSE rule and the overall optimizer loop - Filter out redundant common subexpressions: if CSE `A` contains CSE `B` as a descendant, drop `B` since it will go unused after `A` is rewritten to a column reference Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 3 + native/core/src/execution/physical_cse.rs | 85 +++++++++++++++++++---- 2 files changed, 74 insertions(+), 14 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index b623af7f8e..9c08cd4d3f 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -559,9 +559,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( info!("Comet native plan before DataFusion optimization:\n{before}"); } + let opt_start = std::time::Instant::now(); for optimizer in optimizers { optimized_plan = optimizer.optimize(optimized_plan, &config)?; } + let opt_elapsed = opt_start.elapsed(); + info!("Comet physical optimization completed in {opt_elapsed:?}"); if exec_context.explain_native { let after = diff --git a/native/core/src/execution/physical_cse.rs b/native/core/src/execution/physical_cse.rs index dfdb184131..faddf784ba 100644 --- a/native/core/src/execution/physical_cse.rs +++ b/native/core/src/execution/physical_cse.rs @@ -35,6 +35,7 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion::physical_plan::ExecutionPlan; +use log::info; /// A wrapper around `Arc` that implements `Eq` and `Hash` /// by delegating to the trait-object implementations on `dyn PhysicalExpr`. @@ -71,16 +72,21 @@ impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { plan: Arc, _config: &ConfigOptions, ) -> Result> { - plan.transform_up(|node| { - if node.as_any().downcast_ref::().is_some() { - try_optimize_projection(node) - } else if node.as_any().downcast_ref::().is_some() { - try_optimize_aggregate(node) - } else { - Ok(Transformed::no(node)) - } - }) - .data() + let start = std::time::Instant::now(); + let result = plan + .transform_up(|node| { + if node.as_any().downcast_ref::().is_some() { + try_optimize_projection(node) + } else if node.as_any().downcast_ref::().is_some() { + try_optimize_aggregate(node) + } else { + Ok(Transformed::no(node)) + } + }) + .data(); + let elapsed = start.elapsed(); + info!("Physical CSE optimizer completed in {elapsed:?}"); + result } fn name(&self) -> &str { @@ -116,7 +122,9 @@ fn collect_subexprs( } /// Identify sub-expressions that appear 2+ times across the given -/// expression list. Returns a deduplicated vec. +/// expression list. Returns a deduplicated vec containing only the +/// largest common subexpressions (sub-expressions of an already-extracted +/// CSE are removed since they would be unreferenced after rewriting). fn find_common_subexprs( exprs: &[Arc], ) -> Vec> { @@ -124,15 +132,51 @@ fn find_common_subexprs( for expr in exprs { collect_subexprs(expr, &mut counts); } - // Collect expressions with count >= 2, preserving insertion order is not - // required since we assign deterministic names based on index. - counts + let common: Vec> = counts .into_iter() .filter(|(_, count)| *count >= 2) .map(|(key, _)| key.0) + .collect(); + + // Filter out any CSE that is a strict sub-expression of another CSE, + // since after rewriting the larger CSE to a column reference its + // children will no longer be evaluated and the smaller CSE column + // would go unused. + let common_set: std::collections::HashSet = + common.iter().map(|e| ExprKey(Arc::clone(e))).collect(); + + common + .into_iter() + .filter(|expr| { + // Check if any OTHER common subexpression contains this one + // as a descendant. If so, drop it. + !common_set.iter().any(|other| { + // skip self + if other.0.as_ref() == expr.as_ref() { + return false; + } + contains_subexpr(&other.0, expr) + }) + }) .collect() } +/// Returns true if `haystack` contains `needle` as a strict descendant. +fn contains_subexpr( + haystack: &Arc, + needle: &Arc, +) -> bool { + for child in haystack.children() { + if child.as_ref() == needle.as_ref() { + return true; + } + if contains_subexpr(child, needle) { + return true; + } + } + false +} + /// Replace occurrences of any common subexpression in `expr` with a `Column` /// reference into the intermediate projection's schema. fn rewrite_expr( @@ -216,6 +260,12 @@ fn try_optimize_projection( let new_projection = Arc::new(ProjectionExec::try_new(new_proj_exprs, intermediate)?) as Arc; + info!( + "Physical CSE: rewrote ProjectionExec, extracted {} common subexpression(s): [{}]", + common.len(), + common.iter().map(|e| e.to_string()).collect::>().join(", ") + ); + Ok(Transformed::yes(new_projection)) } @@ -341,6 +391,13 @@ fn try_optimize_aggregate( intermediate_schema, )?; + info!( + "Physical CSE: rewrote AggregateExec ({:?} mode), extracted {} common subexpression(s): [{}]", + agg_exec.mode(), + common.len(), + common.iter().map(|e| e.to_string()).collect::>().join(", ") + ); + Ok(Transformed::yes(Arc::new(new_agg) as Arc)) } From c5d92f6d9458a27888c5b7ecc77d364175a3503b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 07:56:52 -0700 Subject: [PATCH 21/21] Use debug logging and clean up comments in physical CSE rule Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 5 +- native/core/src/execution/physical_cse.rs | 83 +++++------------------ 2 files changed, 19 insertions(+), 69 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 9c08cd4d3f..677fa5fcf8 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -87,7 +87,7 @@ use crate::execution::spark_config::{ }; use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID}; use datafusion_comet_proto::spark_operator::operator::OpStruct; -use log::info; +use log::{debug, info}; use once_cell::sync::Lazy; #[cfg(feature = "jemalloc")] use tikv_jemalloc_ctl::{epoch, stats}; @@ -563,8 +563,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( for optimizer in optimizers { optimized_plan = optimizer.optimize(optimized_plan, &config)?; } - let opt_elapsed = opt_start.elapsed(); - info!("Comet physical optimization completed in {opt_elapsed:?}"); + debug!("Comet physical optimization completed in {:?}", opt_start.elapsed()); if exec_context.explain_native { let after = diff --git a/native/core/src/execution/physical_cse.rs b/native/core/src/execution/physical_cse.rs index faddf784ba..f37e5e2f7d 100644 --- a/native/core/src/execution/physical_cse.rs +++ b/native/core/src/execution/physical_cse.rs @@ -35,10 +35,10 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; use datafusion::physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion::physical_plan::ExecutionPlan; -use log::info; +use log::debug; -/// A wrapper around `Arc` that implements `Eq` and `Hash` -/// by delegating to the trait-object implementations on `dyn PhysicalExpr`. +/// Needed because `Arc` doesn't implement `Eq`/`Hash` +/// directly — this delegates to the trait-object implementations. struct ExprKey(Arc); impl PartialEq for ExprKey { @@ -84,8 +84,7 @@ impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { } }) .data(); - let elapsed = start.elapsed(); - info!("Physical CSE optimizer completed in {elapsed:?}"); + debug!("Physical CSE optimizer completed in {:?}", start.elapsed()); result } @@ -98,15 +97,12 @@ impl PhysicalOptimizerRule for PhysicalCommonSubexprEliminate { } } -/// Returns `true` if the expression is trivial (a `Column` or `Literal`) -/// and therefore not worth extracting as a common subexpression. +/// Columns and literals are too cheap to be worth extracting. fn is_trivial(expr: &Arc) -> bool { expr.as_any().downcast_ref::().is_some() || expr.as_any().downcast_ref::().is_some() } -/// Recursively collect all sub-expressions from `expr` and increment their -/// occurrence count in `counts`. fn collect_subexprs( expr: &Arc, counts: &mut HashMap, @@ -121,10 +117,6 @@ fn collect_subexprs( } } -/// Identify sub-expressions that appear 2+ times across the given -/// expression list. Returns a deduplicated vec containing only the -/// largest common subexpressions (sub-expressions of an already-extracted -/// CSE are removed since they would be unreferenced after rewriting). fn find_common_subexprs( exprs: &[Arc], ) -> Vec> { @@ -138,20 +130,16 @@ fn find_common_subexprs( .map(|(key, _)| key.0) .collect(); - // Filter out any CSE that is a strict sub-expression of another CSE, - // since after rewriting the larger CSE to a column reference its - // children will no longer be evaluated and the smaller CSE column - // would go unused. + // After rewriting the larger CSE to a column reference, its children + // are no longer evaluated, so any smaller CSE nested inside it would + // produce an unused column in the intermediate projection. let common_set: std::collections::HashSet = common.iter().map(|e| ExprKey(Arc::clone(e))).collect(); common .into_iter() .filter(|expr| { - // Check if any OTHER common subexpression contains this one - // as a descendant. If so, drop it. !common_set.iter().any(|other| { - // skip self if other.0.as_ref() == expr.as_ref() { return false; } @@ -161,7 +149,6 @@ fn find_common_subexprs( .collect() } -/// Returns true if `haystack` contains `needle` as a strict descendant. fn contains_subexpr( haystack: &Arc, needle: &Arc, @@ -177,8 +164,8 @@ fn contains_subexpr( false } -/// Replace occurrences of any common subexpression in `expr` with a `Column` -/// reference into the intermediate projection's schema. +/// Replaces occurrences of any common subexpression in `expr` with a +/// `Column` reference into the intermediate projection's schema. fn rewrite_expr( expr: Arc, cse_map: &HashMap, @@ -189,9 +176,9 @@ fn rewrite_expr( } let lookup = ExprKey(Arc::clone(&node)); if let Some((name, index)) = cse_map.get(&lookup) { - // Replace with a column reference and skip recursing into children let col = Arc::new(Column::new(name, *index)) as Arc; + // Jump skips recursing into children that are now behind a column ref Ok(Transformed::new(col, true, TreeNodeRecursion::Jump)) } else { Ok(Transformed::no(node)) @@ -200,8 +187,6 @@ fn rewrite_expr( .data() } -/// Attempt to optimize a single `ProjectionExec` by extracting common -/// subexpressions into an intermediate projection. fn try_optimize_projection( node: Arc, ) -> Result>> { @@ -220,8 +205,6 @@ fn try_optimize_projection( let input_schema = input.schema(); let num_input_cols = input_schema.fields().len(); - // Build the intermediate projection: pass through all input columns, then - // append one column per common subexpression. let mut intermediate_exprs: Vec = Vec::new(); for (i, field) in input_schema.fields().iter().enumerate() { intermediate_exprs.push(ProjectionExpr { @@ -230,7 +213,6 @@ fn try_optimize_projection( }); } - // Map from common subexpression -> (cse_name, column_index_in_intermediate) let mut cse_map: HashMap = HashMap::new(); for (idx, cse_expr) in common.iter().enumerate() { let cse_name = format!("__cse_{idx}"); @@ -247,7 +229,6 @@ fn try_optimize_projection( Arc::clone(input), )?) as Arc; - // Rewrite the top projection expressions to reference the intermediate. let mut new_proj_exprs: Vec = Vec::new(); for proj_expr in proj_exprs { let rewritten = rewrite_expr(Arc::clone(&proj_expr.expr), &cse_map)?; @@ -260,7 +241,7 @@ fn try_optimize_projection( let new_projection = Arc::new(ProjectionExec::try_new(new_proj_exprs, intermediate)?) as Arc; - info!( + debug!( "Physical CSE: rewrote ProjectionExec, extracted {} common subexpression(s): [{}]", common.len(), common.iter().map(|e| e.to_string()).collect::>().join(", ") @@ -269,21 +250,17 @@ fn try_optimize_projection( Ok(Transformed::yes(new_projection)) } -/// Attempt to optimize a single `AggregateExec` by extracting common -/// subexpressions from aggregate function arguments into an intermediate -/// projection. fn try_optimize_aggregate( node: Arc, ) -> Result>> { let agg_exec = node.as_any().downcast_ref::().unwrap(); - // Only optimize first-stage aggregates where original column expressions - // are present. Final/FinalPartitioned reference partial outputs. + // Final/FinalPartitioned aggregates reference partial outputs, not + // the original column expressions, so CSE doesn't apply. if !agg_exec.mode().is_first_stage() { return Ok(Transformed::no(node)); } - // Collect all sub-expressions from aggregate function arguments. let aggr_exprs = agg_exec.aggr_expr(); let all_args: Vec> = aggr_exprs .iter() @@ -299,8 +276,6 @@ fn try_optimize_aggregate( let input_schema = input.schema(); let num_input_cols = input_schema.fields().len(); - // Build an intermediate projection: pass through all input columns, then - // append one column per common subexpression. let mut intermediate_exprs: Vec = Vec::new(); for (i, field) in input_schema.fields().iter().enumerate() { intermediate_exprs.push(ProjectionExpr { @@ -309,7 +284,6 @@ fn try_optimize_aggregate( }); } - // Map from common subexpression -> (cse_name, column_index_in_intermediate) let mut cse_map: HashMap = HashMap::new(); for (idx, cse_expr) in common.iter().enumerate() { let cse_name = format!("__cse_{idx}"); @@ -327,7 +301,6 @@ fn try_optimize_aggregate( )?) as Arc; let intermediate_schema = intermediate.schema(); - // Rewrite each aggregate function's arguments to reference CSE columns. let mut new_aggr_exprs: Vec> = Vec::new(); for agg_fn in aggr_exprs { let old_args = agg_fn.expressions(); @@ -351,7 +324,6 @@ fn try_optimize_aggregate( new_aggr_exprs.push(Arc::new(new_agg_fn)); } - // Rewrite filter expressions if they reference common subexpressions. let new_filters: Vec>> = agg_exec .filter_expr() .iter() @@ -363,7 +335,6 @@ fn try_optimize_aggregate( }) .collect::>()?; - // Rewrite group-by expressions to reference CSE columns. let old_group_by = agg_exec.group_expr(); let new_group_exprs: Vec<(Arc, String)> = old_group_by @@ -391,7 +362,7 @@ fn try_optimize_aggregate( intermediate_schema, )?; - info!( + debug!( "Physical CSE: rewrote AggregateExec ({:?} mode), extracted {} common subexpression(s): [{}]", agg_exec.mode(), common.len(), @@ -460,7 +431,6 @@ mod tests { let rule = PhysicalCommonSubexprEliminate::new(); let optimized = rule.optimize(plan, &config)?; - // The optimized plan should be a ProjectionExec wrapping another ProjectionExec let top = optimized .as_any() .downcast_ref::() @@ -471,14 +441,8 @@ mod tests { .downcast_ref::() .expect("intermediate should be ProjectionExec"); - // Intermediate should have input columns + 1 CSE column assert_eq!(intermediate.expr().len(), 3); // a, b, __cse_0 - - // The CSE column should be named __cse_0 - let cse_alias = &intermediate.expr()[2].alias; - assert_eq!(cse_alias, "__cse_0"); - - // Top projection should still produce 2 output columns + assert_eq!(intermediate.expr()[2].alias, "__cse_0"); assert_eq!(top.expr().len(), 2); assert_eq!(top.expr()[0].alias, "x"); assert_eq!(top.expr()[1].alias, "y"); @@ -513,12 +477,10 @@ mod tests { let rule = PhysicalCommonSubexprEliminate::new(); let optimized = rule.optimize(Arc::clone(&plan), &config)?; - // No intermediate projection should be inserted let top = optimized .as_any() .downcast_ref::() .expect("should be ProjectionExec"); - // Input should be EmptyExec, not another ProjectionExec assert!(top .input() .as_any() @@ -530,7 +492,6 @@ mod tests { #[test] fn test_aggregate_cse_extracts_common_subexpr() -> Result<()> { - // Schema: a INT64, b INT64, c INT64 let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int64, false), Field::new("b", DataType::Int64, false), @@ -542,7 +503,6 @@ mod tests { let b = col("b", &schema)?; let c = col("c", &schema)?; - // Common subexpression: a + b let a_plus_b_1 = binary(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema)?; let a_plus_b_2 = binary(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema)?; @@ -577,7 +537,6 @@ mod tests { let rule = PhysicalCommonSubexprEliminate::new(); let optimized = rule.optimize(plan, &config)?; - // The optimized plan should be an AggregateExec wrapping a ProjectionExec let top_agg = optimized .as_any() .downcast_ref::() @@ -588,14 +547,8 @@ mod tests { .downcast_ref::() .expect("intermediate should be ProjectionExec"); - // Intermediate should have input columns (a, b, c) + 1 CSE column assert_eq!(intermediate.expr().len(), 4); // a, b, c, __cse_0 - - // The CSE column should be named __cse_0 - let cse_alias = &intermediate.expr()[3].alias; - assert_eq!(cse_alias, "__cse_0"); - - // Aggregate should still produce 2 aggregate expressions + assert_eq!(intermediate.expr()[3].alias, "__cse_0"); assert_eq!(top_agg.aggr_expr().len(), 2); Ok(()) @@ -603,7 +556,6 @@ mod tests { #[test] fn test_aggregate_cse_skips_final_mode() -> Result<()> { - // Final-mode aggregates should not be optimized let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int64, false), Field::new("b", DataType::Int64, false), @@ -641,7 +593,6 @@ mod tests { let rule = PhysicalCommonSubexprEliminate::new(); let optimized = rule.optimize(plan, &config)?; - // Should still be an AggregateExec with no intermediate ProjectionExec let top_agg = optimized .as_any() .downcast_ref::()