Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
35ebdf5
Add opt-in DataFusion physical optimizer for native execution
andygrove Feb 8, 2026
5085794
Pass spark.comet.datafusion.* configs through to DataFusion session
andygrove Feb 8, 2026
6631f8d
improve config docs
andygrove Feb 8, 2026
ffb23f8
optimize
andygrove Feb 8, 2026
0a5ed95
Log physical plan before and after DataFusion optimization
andygrove Feb 8, 2026
a56d264
Run make format
andygrove Feb 8, 2026
89ce7c7
mark feature as experimental
andygrove Feb 8, 2026
c955b02
Skip EnsureCooperative optimizer to fix infinite busy loop
andygrove Feb 8, 2026
e3dc369
Skip DataFusion optimizer rules incompatible with Comet execution model
andygrove Feb 8, 2026
0b8771b
Re-enable metrics when native physical optimizer is active
andygrove Feb 8, 2026
00e6c45
Fix clippy clone_on_ref_ptr lint
andygrove Feb 8, 2026
5ad5436
Skip FilterPushdown, CoalesceBatches, and ProjectionPushdown optimize…
andygrove Feb 8, 2026
a53c2c9
Configure physical optimizer rules at session creation and add logging
andygrove Feb 8, 2026
edbc3fd
Add physical common subexpression elimination optimizer rule
andygrove Feb 8, 2026
2e9f34e
Log before/after plans on optimizer change and isolate CSE rule for t…
andygrove Feb 8, 2026
bb5a304
Fix false positive plan-change detection in physical CSE optimizer
andygrove Feb 8, 2026
c413e5b
Log before/after plans for physical optimization on partition 0 only
andygrove Feb 9, 2026
d7e41d9
Add debug logging to CSE rule and remove temp logging from jni_api
andygrove Feb 9, 2026
fc2bab0
Extend physical CSE optimizer to handle AggregateExec
andygrove Feb 9, 2026
b1b46bf
Add logging/timing to CSE rule and filter redundant sub-CSEs
andygrove Feb 9, 2026
c5d92f6
Use debug logging and clean up comments in physical CSE rule
andygrove Feb 9, 2026
7a1fe5e
Merge remote-tracking branch 'apache/main' into native-physical-optim…
andygrove Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,14 @@ object CometConf extends ShimCometConf {
val SCAN_AUTO = "auto"

val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
.category(CATEGORY_SCAN)
.category(CATEGORY_PARQUET)
.doc(
"The implementation of Comet Native Scan to use. Available modes are " +
"The implementation of Comet's Parquet scan to use. Available scans are " +
s"`$SCAN_NATIVE_DATAFUSION`, and `$SCAN_NATIVE_ICEBERG_COMPAT`. " +
s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation of scan based on " +
"DataFusion. " +
s"`$SCAN_NATIVE_ICEBERG_COMPAT` is the recommended native implementation that " +
"exposes apis to read parquet columns natively and supports complex types. " +
s"`$SCAN_AUTO` (default) chooses the best scan.")
.internal()
s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation, and " +
s"`$SCAN_NATIVE_ICEBERG_COMPAT` is a hybrid implementation that supports some " +
"additional features, such as row indexes and field ids. " +
s"`$SCAN_AUTO` (default) chooses the best available scan based on the scan schema.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
Expand Down Expand Up @@ -549,6 +547,17 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_NATIVE_PHYSICAL_OPTIMIZER_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.nativePhysicalOptimizer.enabled")
.category(CATEGORY_TESTING)
.doc(
"When enabled, Comet will run DataFusion's physical optimizer rules on the " +
"native query plan before execution. This can improve performance through " +
"additional optimizations such as projection pushdown, coalesce batches, " +
"filter pushdown, and limit pushdown. This feature is highly experimental.")
.booleanConf
.createWithDefault(true)

val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose"
val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback"

Expand Down
3 changes: 3 additions & 0 deletions dev/benchmarks/comet-tpch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.comet.scan.impl=native_datafusion \
--conf spark.comet.exec.replaceSortMergeJoin=true \
--conf spark.comet.scan.impl=native_datafusion \
--conf spark.comet.datafusion.execution.parquet.pushdown_filters=true \
--conf spark.comet.exec.nativePhysicalOptimizer.enabled=true \
--conf spark.comet.expression.Cast.allowIncompatible=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \
Expand Down
102 changes: 94 additions & 8 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_t

use crate::execution::spark_config::{
SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_MAX_TEMP_DIRECTORY_SIZE,
COMET_TRACING_ENABLED,
COMET_NATIVE_PHYSICAL_OPTIMIZER_ENABLED, COMET_TRACING_ENABLED,
};
use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID};
use datafusion_comet_proto::spark_operator::operator::OpStruct;
use log::info;
use log::{debug, info};
use once_cell::sync::Lazy;
#[cfg(feature = "jemalloc")]
use tikv_jemalloc_ctl::{epoch, stats};
Expand Down Expand Up @@ -153,6 +153,8 @@ struct ExecutionContext {
pub memory_pool_config: MemoryPoolConfig,
/// Whether to log memory usage on each call to execute_plan
pub tracing_enabled: bool,
/// Whether to run DataFusion's physical optimizer on the native plan
pub native_physical_optimizer_enabled: bool,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand Down Expand Up @@ -192,6 +194,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
let max_temp_directory_size =
spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024);
let native_physical_optimizer_enabled =
spark_config.get_bool(COMET_NATIVE_PHYSICAL_OPTIMIZER_ENABLED);

with_trace("createPlan", tracing_enabled, || {
// Init JVM classes
Expand Down Expand Up @@ -246,6 +250,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
local_dirs_vec,
max_temp_directory_size,
task_cpus as usize,
&spark_config,
)?;

let plan_creation_time = start.elapsed();
Expand Down Expand Up @@ -286,6 +291,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
explain_native,
memory_pool_config,
tracing_enabled,
native_physical_optimizer_enabled,
});

Ok(Box::into_raw(exec_context) as i64)
Expand All @@ -300,6 +306,7 @@ fn prepare_datafusion_session_context(
local_dirs: Vec<String>,
max_temp_directory_size: u64,
task_cpus: usize,
spark_config: &HashMap<String, String>,
) -> CometResult<SessionContext> {
let paths = local_dirs.into_iter().map(PathBuf::from).collect();
let disk_manager = DiskManagerBuilder::default()
Expand All @@ -308,10 +315,7 @@ fn prepare_datafusion_session_context(
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager);
rt_config = rt_config.with_memory_pool(memory_pool);

// Get Datafusion configuration from Spark Execution context
// can be configured in Comet Spark JVM using Spark --conf parameters
// e.g: spark-shell --conf spark.datafusion.sql_parser.parse_float_as_decimal=true
let session_config = SessionConfig::new()
let mut session_config = SessionConfig::new()
.with_target_partitions(task_cpus)
// This DataFusion context is within the scope of an executing Spark Task. We want to set
// its internal parallelism to the number of CPUs allocated to Spark Tasks. This can be
Expand All @@ -328,9 +332,55 @@ fn prepare_datafusion_session_context(
&ScalarValue::Float64(Some(1.1)),
);

// Pass through DataFusion configs from Spark.
// e.g: spark-shell --conf spark.comet.datafusion.sql_parser.parse_float_as_decimal=true
// becomes datafusion.sql_parser.parse_float_as_decimal=true
const SPARK_COMET_DF_PREFIX: &str = "spark.comet.datafusion.";
for (key, value) in spark_config {
if let Some(df_key) = key.strip_prefix(SPARK_COMET_DF_PREFIX) {
let df_key = format!("datafusion.{df_key}");
session_config = session_config.set_str(&df_key, value);
}
}

let runtime = rt_config.build()?;

let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));
// Only include physical optimizer rules that are compatible with
// Comet's execution model. Spark handles distribution, sorting,
// filter/projection pushdown, and join selection externally.
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::{
// aggregate_statistics::AggregateStatistics,
// combine_partial_final_agg::CombinePartialFinalAggregate,
// limit_pushdown::LimitPushdown,
// limit_pushdown_past_window::LimitPushPastWindows,
// limited_distinct_aggregation::LimitedDistinctAggregation,
// topk_aggregation::TopKAggregation,
// update_aggr_exprs::OptimizeAggregateOrder,
PhysicalOptimizerRule,
};

use crate::execution::physical_cse::PhysicalCommonSubexprEliminate;

let physical_optimizer_rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
// Arc::new(AggregateStatistics::new()),
// Arc::new(LimitedDistinctAggregation::new()),
// Arc::new(CombinePartialFinalAggregate::new()),
// Arc::new(OptimizeAggregateOrder::new()),
// Arc::new(TopKAggregation::new()),
// Arc::new(LimitPushPastWindows::new()),
// Arc::new(LimitPushdown::new()),
Arc::new(PhysicalCommonSubexprEliminate::new()),
];

let state = SessionStateBuilder::new()
.with_config(session_config)
.with_runtime_env(Arc::new(runtime))
.with_default_features()
.with_physical_optimizer_rules(physical_optimizer_rules)
.build();

let mut session_ctx = SessionContext::new_with_state(state);

datafusion::functions_nested::register_all(&mut session_ctx)?;
register_datafusion_spark_function(&session_ctx);
Expand Down Expand Up @@ -495,9 +545,45 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let physical_plan_time = start.elapsed();

exec_context.plan_creation_time += physical_plan_time;
exec_context.root_op = Some(Arc::clone(&root_op));
exec_context.scans = scans;

let root_op = if exec_context.native_physical_optimizer_enabled {
let state = exec_context.session_ctx.state();
let optimizers = state.physical_optimizers();
let config = Arc::clone(state.config_options());
let mut optimized_plan = Arc::clone(&root_op.native_plan);

if exec_context.explain_native {
let before =
DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true);
info!("Comet native plan before DataFusion optimization:\n{before}");
}

let opt_start = std::time::Instant::now();
for optimizer in optimizers {
optimized_plan = optimizer.optimize(optimized_plan, &config)?;
}
debug!("Comet physical optimization completed in {:?}", opt_start.elapsed());

if exec_context.explain_native {
let after =
DisplayableExecutionPlan::new(optimized_plan.as_ref()).indent(true);
info!("Comet native plan after DataFusion optimization:\n{after}");
}

// Keep the original SparkPlan tree structure (for metrics)
// but replace the root's native plan with the optimized version
Arc::new(SparkPlan::new(
root_op.plan_id,
optimized_plan,
root_op.children.clone(),
))
} else {
root_op
};

exec_context.root_op = Some(Arc::clone(&root_op));

if exec_context.explain_native {
let formatted_plan_str =
DisplayableExecutionPlan::new(root_op.native_plan.as_ref()).indent(true);
Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod expressions;
pub mod jni_api;
pub(crate) mod metrics;
pub mod operators;
pub(crate) mod physical_cse;
pub(crate) mod planner;
pub mod serde;
pub mod shuffle;
Expand Down
Loading
Loading