Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3a9f6fa
Adopt PR #3349's per-partition scan logic to CometNativeScan. Add DPP.
mbutrovich Feb 8, 2026
9a3f747
Fix encryption.
mbutrovich Feb 8, 2026
9af450d
Make format.
mbutrovich Feb 8, 2026
d9c4903
Fix Spark 4 DPP API?
mbutrovich Feb 8, 2026
f572220
New plans.
mbutrovich Feb 8, 2026
b32660e
make format
mbutrovich Feb 8, 2026
ee0806e
Update CometScanRuleSuite.
mbutrovich Feb 8, 2026
d444046
Update the DPP config for Comet.
mbutrovich Feb 9, 2026
20178dd
Merge branch 'main' into cometnativescan-dpp
mbutrovich Feb 9, 2026
59d7b41
Fix after upmerge.
mbutrovich Feb 9, 2026
259fc99
Update plans.
mbutrovich Feb 9, 2026
a804920
Add test to reproduce Unexpected subquery plan type: org.apache.spark…
mbutrovich Feb 9, 2026
4352c04
Add failing Iceberg test too.
mbutrovich Feb 9, 2026
7f0004f
Handle SubqueryExec in addition to SubqueryBroadcastExec. Add relevan…
mbutrovich Feb 9, 2026
959d517
Handle SubqueryExec in addition to SubqueryBroadcastExec. Add relevan…
mbutrovich Feb 9, 2026
69b3559
Handle SubqueryExec in addition to SubqueryBroadcastExec. Add relevan…
mbutrovich Feb 9, 2026
b7eedd1
Fix format.
mbutrovich Feb 9, 2026
e9dab0e
clean up tables in new tests
mbutrovich Feb 9, 2026
b9b5bb8
update spark 3.5.8 diff
mbutrovich Feb 10, 2026
0e71307
add bucketed DPP scan support
mbutrovich Feb 10, 2026
fa6fd1a
update spark 4.0 shim
mbutrovich Feb 10, 2026
ed50be2
make format
mbutrovich Feb 10, 2026
48ba80a
fix shims
mbutrovich Feb 10, 2026
cd6539c
fix shims
mbutrovich Feb 10, 2026
9b00cc2
fix canonicalization?
mbutrovich Feb 10, 2026
704bd2d
Update diffs
mbutrovich Feb 10, 2026
916c37e
Try again with canonicalization
mbutrovich Feb 10, 2026
b2d1540
Try again with canonicalization
mbutrovich Feb 10, 2026
d142fa3
Update diffs.
mbutrovich Feb 10, 2026
288a248
Attempt to fix "SPARK-30291: AQE should catch the exceptions when doi…
mbutrovich Feb 10, 2026
10f9e42
fix "DPP with native_datafusion scan - join with dynamic partition pr…
mbutrovich Feb 10, 2026
bcfa289
Merge branch 'main' into cometnativescan-dpp
mbutrovich Feb 10, 2026
daab410
Merge branch 'main' into cometnativescan-dpp
mbutrovich Feb 10, 2026
47993d5
Rename NativePlanDataInjector
mbutrovich Feb 10, 2026
e25798a
minor cleanup
mbutrovich Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
19 changes: 12 additions & 7 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ object CometConf extends ShimCometConf {
.checkValues(Set(SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
.createWithEnvVarOrDefault("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)

val COMET_DPP_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.dpp.enabled")
.category(CATEGORY_SCAN)
.doc(
"Whether to enable Dynamic Partition Pruning (DPP) for native_datafusion scans. " +
"When enabled, queries with DPP use CometNativeScanExec which supports runtime " +
"partition filtering. When disabled, DPP queries fall back to Spark. " +
"This config only affects native_datafusion scans; other scan modes always " +
"fall back to Spark for DPP queries.")
.booleanConf
.createWithDefault(true)

val COMET_ICEBERG_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.icebergNative.enabled")
.category(CATEGORY_SCAN)
Expand Down Expand Up @@ -530,13 +542,6 @@ object CometConf extends ShimCometConf {
.doubleConf
.createWithDefault(1.0)

val COMET_DPP_FALLBACK_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.dppFallback.enabled")
.category(CATEGORY_EXEC)
.doc("Whether to fall back to Spark for queries that use DPP.")
.booleanConf
.createWithDefault(true)

val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.debug.enabled")
.category(CATEGORY_EXEC)
Expand Down
26 changes: 16 additions & 10 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -410,28 +410,31 @@ index c4fb4fa943c..a04b23870a8 100644
assert(exchanges.size == 2)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..42eb9fd1cb7 100644
index f33432ddb6f..665d414c61f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
+import org.apache.spark.sql.comet.CometScanExec
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive._
@@ -262,6 +263,9 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -262,6 +263,12 @@ abstract class DynamicPartitionPruningSuiteBase
case s: BatchScanExec => s.runtimeFilters.collect {
case d: DynamicPruningExpression => d.child
}
+ case s: CometScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+ case s: CometNativeScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
case _ => Nil
}
}
@@ -1027,7 +1031,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1027,7 +1034,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -441,7 +444,7 @@ index f33432ddb6f..42eb9fd1cb7 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1215,7 +1223,8 @@ abstract class DynamicPartitionPruningSuiteBase
}

test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
Expand All @@ -451,7 +454,7 @@ index f33432ddb6f..42eb9fd1cb7 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -1423,7 +1429,8 @@ abstract class DynamicPartitionPruningSuiteBase
@@ -1423,7 +1432,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}

Expand All @@ -461,7 +464,7 @@ index f33432ddb6f..42eb9fd1cb7 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
val df = sql(
""" WITH v as (
@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1698,7 +1708,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
* Check the static scan metrics with and without DPP
*/
test("static scan metrics",
Expand All @@ -471,7 +474,7 @@ index f33432ddb6f..42eb9fd1cb7 100644
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
@@ -1729,6 +1740,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
Expand Down Expand Up @@ -3200,7 +3203,7 @@ index c63c748953f..7edca9c93a6 100644
implicit val formats = new DefaultFormats {
override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
index 52abd248f3a..7a199931a08 100644
index 52abd248f3a..b4e096cae24 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/DynamicPartitionPruningHiveScanSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
Expand All @@ -3211,12 +3214,15 @@ index 52abd248f3a..7a199931a08 100644
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.hive.execution.HiveTableScanExec
@@ -35,6 +36,9 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
@@ -35,6 +36,12 @@ abstract class DynamicPartitionPruningHiveScanSuiteBase
case s: FileSourceScanExec => s.partitionFilters.collect {
case d: DynamicPruningExpression => d.child
}
+ case s: CometScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
+ case s: CometNativeScanExec => s.partitionFilters.collect {
+ case d: DynamicPruningExpression => d.child
+ }
case h: HiveTableScanExec => h.partitionPruningPred.collect {
case d: DynamicPruningExpression => d.child
Expand Down
46 changes: 26 additions & 20 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,20 +965,29 @@ impl PhysicalPlanner {
))
}
OpStruct::NativeScan(scan) => {
let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
let common = scan
.common
.as_ref()
.ok_or_else(|| GeneralError("NativeScan missing common data".to_string()))?;

let data_schema =
convert_spark_types_to_arrow_schema(common.data_schema.as_slice());
let required_schema: SchemaRef =
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
let partition_schema: SchemaRef =
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
let projection_vector: Vec<usize> = scan
convert_spark_types_to_arrow_schema(common.partition_schema.as_slice());
let projection_vector: Vec<usize> = common
.projection_vector
.iter()
.map(|offset| *offset as usize)
.collect();

// Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions)
let partition_files = &scan.file_partitions[self.partition as usize];
// Get this partition's files (injected at execution time)
let partition_files = scan.file_partition.as_ref().ok_or_else(|| {
GeneralError("NativeScan missing file_partition data".to_string())
})?;

// Bucketed scan with bucket pruning may produce empty partitions
if partition_files.partitioned_file.is_empty() {
let empty_exec = Arc::new(EmptyExec::new(required_schema));
return Ok((
Expand All @@ -988,19 +997,19 @@ impl PhysicalPlanner {
}

// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = common
.data_filters
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
.collect();

let default_values: Option<HashMap<usize, ScalarValue>> = if !scan
let default_values: Option<HashMap<usize, ScalarValue>> = if !common
.default_values
.is_empty()
{
// We have default values. Extract the two lists (same length) of values and
// indexes in the schema, and then create a HashMap to use in the SchemaMapper.
let default_values: Result<Vec<ScalarValue>, DataFusionError> = scan
let default_values: Result<Vec<ScalarValue>, DataFusionError> = common
.default_values
.iter()
.map(|expr| {
Expand All @@ -1015,7 +1024,7 @@ impl PhysicalPlanner {
})
.collect();
let default_values = default_values?;
let default_values_indexes: Vec<usize> = scan
let default_values_indexes: Vec<usize> = common
.default_values_indexes
.iter()
.map(|offset| *offset as usize)
Expand All @@ -1037,7 +1046,7 @@ impl PhysicalPlanner {
.map(|f| f.file_path.clone())
.expect("partition should have files after empty check");

let object_store_options: HashMap<String, String> = scan
let object_store_options: HashMap<String, String> = common
.object_store_options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
Expand All @@ -1048,10 +1057,7 @@ impl PhysicalPlanner {
&object_store_options,
)?;

// Comet serializes all partitions' PartitionedFiles, but we only want to read this
// Spark partition's PartitionedFiles
let files =
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
let files = self.get_partitioned_files(partition_files)?;
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
let partition_fields: Vec<Field> = partition_schema
.fields()
Expand All @@ -1060,7 +1066,7 @@ impl PhysicalPlanner {
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
})
.collect_vec();
let scan = init_datasource_exec(
let datasource_exec = init_datasource_exec(
required_schema,
Some(data_schema),
Some(partition_schema),
Expand All @@ -1070,14 +1076,14 @@ impl PhysicalPlanner {
Some(projection_vector),
Some(data_filters?),
default_values,
scan.session_timezone.as_str(),
scan.case_sensitive,
common.session_timezone.as_str(),
common.case_sensitive,
self.session_ctx(),
scan.encryption_enabled,
common.encryption_enabled,
)?;
Ok((
vec![],
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
Arc::new(SparkPlan::new(spark_plan.plan_id, datasource_exec, vec![])),
))
}
OpStruct::CsvScan(scan) => {
Expand Down
29 changes: 20 additions & 9 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ message Scan {
bool arrow_ffi_safe = 3;
}

message NativeScan {
// Common data shared across all partitions for NativeScan (sent once via commonBytes)
message NativeScanCommon {
repeated spark.spark_expression.DataType fields = 1;
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
// is purely for informational purposes when viewing native query plans in
Expand All @@ -95,20 +96,30 @@ message NativeScan {
repeated SparkStructField data_schema = 4;
repeated SparkStructField partition_schema = 5;
repeated spark.spark_expression.Expr data_filters = 6;
repeated SparkFilePartition file_partitions = 7;
repeated int64 projection_vector = 8;
string session_timezone = 9;
repeated spark.spark_expression.Expr default_values = 10;
repeated int64 default_values_indexes = 11;
bool case_sensitive = 12;
repeated int64 projection_vector = 7;
string session_timezone = 8;
repeated spark.spark_expression.Expr default_values = 9;
repeated int64 default_values_indexes = 10;
bool case_sensitive = 11;
// Options for configuring object stores such as AWS S3, GCS, etc. The key-value pairs are taken
// from Hadoop configuration for compatibility with Hadoop FileSystem implementations of object
// stores.
// The configuration values have hadoop. or spark.hadoop. prefix trimmed. For instance, the
// configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in
// the map.
map<string, string> object_store_options = 13;
bool encryption_enabled = 14;
map<string, string> object_store_options = 12;
bool encryption_enabled = 13;

// Unique identifier for this scan, used to match planning data at execution time
string scan_id = 14;
}

message NativeScan {
// Common data shared across partitions
NativeScanCommon common = 1;

// This partition's files only (injected at execution time by NativeScanDataInjector)
SparkFilePartition file_partition = 2;
}

message CsvScan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
// spotless:on
private def transform(plan: SparkPlan): SparkPlan = {
def convertNode(op: SparkPlan): SparkPlan = op match {
// Fully native scan for V1
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
convertToComet(scan, CometNativeScan).getOrElse(scan)
// CometNativeScanExec is created directly by CometScanRule and handles its own execution
// No conversion needed here - it passes through unchanged

// Fully native Iceberg scan for V2 (iceberg-rust path)
// Only handle scans with native metadata; SupportsComet scans fall through to isCometScan
Expand Down
Loading
Loading