From 45d3bdf1dcaa82814821ef8097bbe2e0fb83654c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 10 Dec 2025 13:34:27 -0700 Subject: [PATCH 1/5] implement native operator registry --- native/core/src/execution/operators/mod.rs | 1 + .../src/execution/operators/projection.rs | 74 ++++++++++ native/core/src/execution/planner.rs | 38 +++--- .../execution/planner/operator_registry.rs | 127 ++++++++++++++++++ native/core/src/execution/planner/traits.rs | 23 +++- 5 files changed, 240 insertions(+), 23 deletions(-) create mode 100644 native/core/src/execution/operators/projection.rs create mode 100644 native/core/src/execution/planner/operator_registry.rs diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index b01f7857be..33b9be9434 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -31,6 +31,7 @@ pub use expand::ExpandExec; mod iceberg_scan; mod parquet_writer; pub use parquet_writer::ParquetWriterExec; +pub mod projection; mod scan; /// Error returned during executing operators. diff --git a/native/core/src/execution/operators/projection.rs b/native/core/src/execution/operators/projection.rs new file mode 100644 index 0000000000..06d82dbf54 --- /dev/null +++ b/native/core/src/execution/operators/projection.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Projection operator builder + +use std::sync::Arc; + +use datafusion::physical_plan::projection::ProjectionExec; +use datafusion_comet_proto::spark_operator::Operator; +use jni::objects::GlobalRef; + +use crate::{ + execution::{ + operators::{ExecutionError, ScanExec}, + planner::{traits::OperatorBuilder, PhysicalPlanner}, + spark_plan::SparkPlan, + }, + extract_op, +}; + +/// Builder for Projection operators +pub struct ProjectionBuilder; + +impl OperatorBuilder for ProjectionBuilder { + fn build( + &self, + spark_plan: &Operator, + inputs: &mut Vec>, + partition_count: usize, + planner: &PhysicalPlanner, + ) -> Result<(Vec, Arc), ExecutionError> { + let project = extract_op!(spark_plan, Projection); + let children = &spark_plan.children; + + assert_eq!(children.len(), 1); + let (scans, child) = planner.create_plan(&children[0], inputs, partition_count)?; + + // Create projection expressions + let exprs: Result, _> = project + .project_list + .iter() + .enumerate() + .map(|(idx, expr)| { + planner + .create_expr(expr, child.schema()) + .map(|r| (r, format!("col_{idx}"))) + }) + .collect(); + + let projection = Arc::new(ProjectionExec::try_new( + exprs?, + Arc::clone(&child.native_plan), + )?); + + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])), + )) + } +} diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 269ded1e48..687aaac8b8 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -18,6 +18,7 @@ //! Converts Spark physical plan to DataFusion physical plan pub mod expression_registry; +pub mod operator_registry; pub mod traits; use crate::execution::operators::IcebergScanExec; @@ -27,6 +28,7 @@ use crate::{ expressions::subquery::Subquery, operators::{ExecutionError, ExpandExec, ParquetWriterExec, ScanExec}, planner::expression_registry::ExpressionRegistry, + planner::operator_registry::OperatorRegistry, serde::to_arrow_datatype, shuffle::ShuffleWriterExec, }, @@ -861,29 +863,19 @@ impl PhysicalPlanner { inputs: &mut Vec>, partition_count: usize, ) -> Result<(Vec, Arc), ExecutionError> { + // Try to use the modular registry first - this automatically handles any registered operator types + if OperatorRegistry::global().can_handle(spark_plan) { + return OperatorRegistry::global().create_plan( + spark_plan, + inputs, + partition_count, + self, + ); + } + + // Fall back to the original monolithic match for other operators let children = &spark_plan.children; match spark_plan.op_struct.as_ref().unwrap() { - OpStruct::Projection(project) => { - assert_eq!(children.len(), 1); - let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; - let exprs: PhyExprResult = project - .project_list - .iter() - .enumerate() - .map(|(idx, expr)| { - self.create_expr(expr, child.schema()) - .map(|r| (r, format!("col_{idx}"))) - }) - .collect(); - let projection = Arc::new(ProjectionExec::try_new( - exprs?, - Arc::clone(&child.native_plan), - )?); - Ok(( - scans, - Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])), - )) - } OpStruct::Filter(filter) => { assert_eq!(children.len(), 1); let (scans, child) = self.create_plan(&children[0], inputs, partition_count)?; @@ -1634,6 +1626,10 @@ impl PhysicalPlanner { Arc::new(SparkPlan::new(spark_plan.plan_id, window_agg, vec![child])), )) } + _ => Err(GeneralError(format!( + "Unsupported or unregistered operator type: {:?}", + spark_plan.op_struct + ))), } } diff --git a/native/core/src/execution/planner/operator_registry.rs b/native/core/src/execution/planner/operator_registry.rs new file mode 100644 index 0000000000..51dcacfbac --- /dev/null +++ b/native/core/src/execution/planner/operator_registry.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Registry for operator builders using modular pattern + +use std::{ + collections::HashMap, + sync::{Arc, OnceLock}, +}; + +use datafusion_comet_proto::spark_operator::Operator; +use jni::objects::GlobalRef; + +use super::{ + traits::{OperatorBuilder, OperatorType}, + PhysicalPlanner, +}; +use crate::execution::{ + operators::{ExecutionError, ScanExec}, + spark_plan::SparkPlan, +}; + +/// Global registry of operator builders +pub struct OperatorRegistry { + builders: HashMap>, +} + +impl OperatorRegistry { + /// Create a new empty registry + fn new() -> Self { + Self { + builders: HashMap::new(), + } + } + + /// Get the global singleton instance of the operator registry + pub fn global() -> &'static OperatorRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(|| { + let mut registry = OperatorRegistry::new(); + registry.register_all_operators(); + registry + }) + } + + /// Check if the registry can handle a given operator + pub fn can_handle(&self, spark_operator: &Operator) -> bool { + get_operator_type(spark_operator) + .map(|op_type| self.builders.contains_key(&op_type)) + .unwrap_or(false) + } + + /// Create a Spark plan using the registered builder for this operator type + pub fn create_plan( + &self, + spark_operator: &Operator, + inputs: &mut Vec>, + partition_count: usize, + planner: &PhysicalPlanner, + ) -> Result<(Vec, Arc), ExecutionError> { + let operator_type = get_operator_type(spark_operator).ok_or_else(|| { + ExecutionError::GeneralError(format!( + "Unsupported operator type: {:?}", + spark_operator.op_struct + )) + })?; + + let builder = self.builders.get(&operator_type).ok_or_else(|| { + ExecutionError::GeneralError(format!( + "No builder registered for operator type: {:?}", + operator_type + )) + })?; + + builder.build(spark_operator, inputs, partition_count, planner) + } + + /// Register all operator builders + fn register_all_operators(&mut self) { + self.register_projection_operators(); + } + + /// Register projection operators + fn register_projection_operators(&mut self) { + use crate::execution::operators::projection::ProjectionBuilder; + + self.builders + .insert(OperatorType::Projection, Box::new(ProjectionBuilder)); + } +} + +/// Extract the operator type from a Spark operator +fn get_operator_type(spark_operator: &Operator) -> Option { + use datafusion_comet_proto::spark_operator::operator::OpStruct; + + match spark_operator.op_struct.as_ref()? { + OpStruct::Projection(_) => Some(OperatorType::Projection), + OpStruct::Filter(_) => Some(OperatorType::Filter), + OpStruct::HashAgg(_) => Some(OperatorType::HashAgg), + OpStruct::Limit(_) => Some(OperatorType::Limit), + OpStruct::Sort(_) => Some(OperatorType::Sort), + OpStruct::Scan(_) => Some(OperatorType::Scan), + OpStruct::NativeScan(_) => Some(OperatorType::NativeScan), + OpStruct::IcebergScan(_) => Some(OperatorType::IcebergScan), + OpStruct::ShuffleWriter(_) => Some(OperatorType::ShuffleWriter), + OpStruct::ParquetWriter(_) => Some(OperatorType::ParquetWriter), + OpStruct::Expand(_) => Some(OperatorType::Expand), + OpStruct::SortMergeJoin(_) => Some(OperatorType::SortMergeJoin), + OpStruct::HashJoin(_) => Some(OperatorType::HashJoin), + OpStruct::Window(_) => Some(OperatorType::Window), + OpStruct::Explode(_) => None, // Not yet in OperatorType enum + } +} diff --git a/native/core/src/execution/planner/traits.rs b/native/core/src/execution/planner/traits.rs index 3f3467d0d0..1a6b1f5df6 100644 --- a/native/core/src/execution/planner/traits.rs +++ b/native/core/src/execution/planner/traits.rs @@ -48,6 +48,27 @@ macro_rules! extract_expr { }; } +/// Macro to extract a specific operator variant, panicking if called with wrong type. +/// This should be used in operator builders where the registry guarantees the correct +/// operator type has been routed to the builder. +#[macro_export] +macro_rules! extract_op { + ($spark_operator:expr, $variant:ident) => { + match $spark_operator + .op_struct + .as_ref() + .expect("operator struct must be present") + { + datafusion_comet_proto::spark_operator::operator::OpStruct::$variant(op) => op, + other => panic!( + "{} builder called with wrong operator type: {:?}", + stringify!($variant), + other + ), + } + }; +} + /// Macro to generate binary expression builders with minimal boilerplate #[macro_export] macro_rules! binary_expr_builder { @@ -114,7 +135,6 @@ pub trait ExpressionBuilder: Send + Sync { } /// Trait for building physical operators from Spark protobuf operators -#[allow(dead_code)] pub trait OperatorBuilder: Send + Sync { /// Build a Spark plan from a protobuf operator fn build( @@ -201,7 +221,6 @@ pub enum ExpressionType { /// Enum to identify different operator types for registry dispatch #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -#[allow(dead_code)] pub enum OperatorType { Scan, NativeScan, From 5752f3eae8aea2f56c8879418b2398398ba8f33f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 10 Dec 2025 13:51:57 -0700 Subject: [PATCH 2/5] refactor --- .../src/execution/expressions/arithmetic.rs | 5 +- .../src/execution/operators/projection.rs | 2 +- .../execution/planner/expression_registry.rs | 85 +++++++++++- .../execution/planner/operator_registry.rs | 36 ++++- native/core/src/execution/planner/traits.rs | 131 +----------------- 5 files changed, 123 insertions(+), 136 deletions(-) diff --git a/native/core/src/execution/expressions/arithmetic.rs b/native/core/src/execution/expressions/arithmetic.rs index 71fe85ef52..a9749678db 100644 --- a/native/core/src/execution/expressions/arithmetic.rs +++ b/native/core/src/execution/expressions/arithmetic.rs @@ -23,7 +23,7 @@ macro_rules! arithmetic_expr_builder { ($builder_name:ident, $expr_type:ident, $operator:expr) => { pub struct $builder_name; - impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name { + impl $crate::execution::planner::expression_registry::ExpressionBuilder for $builder_name { fn build( &self, spark_expr: &datafusion_comet_proto::spark_expression::Expr, @@ -61,7 +61,8 @@ use crate::execution::{ expressions::extract_expr, operators::ExecutionError, planner::{ - from_protobuf_eval_mode, traits::ExpressionBuilder, BinaryExprOptions, PhysicalPlanner, + expression_registry::ExpressionBuilder, from_protobuf_eval_mode, BinaryExprOptions, + PhysicalPlanner, }, }; diff --git a/native/core/src/execution/operators/projection.rs b/native/core/src/execution/operators/projection.rs index 06d82dbf54..6ba1bb5d59 100644 --- a/native/core/src/execution/operators/projection.rs +++ b/native/core/src/execution/operators/projection.rs @@ -26,7 +26,7 @@ use jni::objects::GlobalRef; use crate::{ execution::{ operators::{ExecutionError, ScanExec}, - planner::{traits::OperatorBuilder, PhysicalPlanner}, + planner::{operator_registry::OperatorBuilder, PhysicalPlanner}, spark_plan::SparkPlan, }, extract_op, diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index f97cb984b1..227484ca87 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -25,7 +25,90 @@ use datafusion::physical_expr::PhysicalExpr; use datafusion_comet_proto::spark_expression::{expr::ExprStruct, Expr}; use crate::execution::operators::ExecutionError; -use crate::execution::planner::traits::{ExpressionBuilder, ExpressionType}; + +/// Trait for building physical expressions from Spark protobuf expressions +pub trait ExpressionBuilder: Send + Sync { + /// Build a DataFusion physical expression from a Spark protobuf expression + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &super::PhysicalPlanner, + ) -> Result, ExecutionError>; +} + +/// Enum to identify different expression types for registry dispatch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ExpressionType { + // Arithmetic expressions + Add, + Subtract, + Multiply, + Divide, + IntegralDivide, + Remainder, + UnaryMinus, + + // Comparison expressions + Eq, + Neq, + Lt, + LtEq, + Gt, + GtEq, + EqNullSafe, + NeqNullSafe, + + // Logical expressions + And, + Or, + Not, + + // Null checks + IsNull, + IsNotNull, + + // Bitwise operations + BitwiseAnd, + BitwiseOr, + BitwiseXor, + BitwiseShiftLeft, + BitwiseShiftRight, + + // Other expressions + Bound, + Unbound, + Literal, + Cast, + CaseWhen, + In, + If, + Substring, + Like, + Rlike, + CheckOverflow, + ScalarFunc, + NormalizeNanAndZero, + Subquery, + BloomFilterMightContain, + CreateNamedStruct, + GetStructField, + ToJson, + ToPrettyString, + ListExtract, + GetArrayStructFields, + ArrayInsert, + Rand, + Randn, + SparkPartitionId, + MonotonicallyIncreasingId, + + // Time functions + Hour, + Minute, + Second, + TruncTimestamp, +} /// Registry for expression builders pub struct ExpressionRegistry { diff --git a/native/core/src/execution/planner/operator_registry.rs b/native/core/src/execution/planner/operator_registry.rs index 51dcacfbac..e4899280b7 100644 --- a/native/core/src/execution/planner/operator_registry.rs +++ b/native/core/src/execution/planner/operator_registry.rs @@ -25,15 +25,43 @@ use std::{ use datafusion_comet_proto::spark_operator::Operator; use jni::objects::GlobalRef; -use super::{ - traits::{OperatorBuilder, OperatorType}, - PhysicalPlanner, -}; +use super::PhysicalPlanner; use crate::execution::{ operators::{ExecutionError, ScanExec}, spark_plan::SparkPlan, }; +/// Trait for building physical operators from Spark protobuf operators +pub trait OperatorBuilder: Send + Sync { + /// Build a Spark plan from a protobuf operator + fn build( + &self, + spark_plan: &datafusion_comet_proto::spark_operator::Operator, + inputs: &mut Vec>, + partition_count: usize, + planner: &PhysicalPlanner, + ) -> Result<(Vec, Arc), ExecutionError>; +} + +/// Enum to identify different operator types for registry dispatch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OperatorType { + Scan, + NativeScan, + IcebergScan, + Projection, + Filter, + HashAgg, + Limit, + Sort, + ShuffleWriter, + ParquetWriter, + Expand, + SortMergeJoin, + HashJoin, + Window, +} + /// Global registry of operator builders pub struct OperatorRegistry { builders: HashMap>, diff --git a/native/core/src/execution/planner/traits.rs b/native/core/src/execution/planner/traits.rs index 1a6b1f5df6..9d9ccf35da 100644 --- a/native/core/src/execution/planner/traits.rs +++ b/native/core/src/execution/planner/traits.rs @@ -15,17 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Core traits for the modular planner framework - -use std::sync::Arc; - -use arrow::datatypes::SchemaRef; -use datafusion::physical_expr::PhysicalExpr; -use datafusion_comet_proto::spark_expression::Expr; -use jni::objects::GlobalRef; - -use crate::execution::operators::ScanExec; -use crate::execution::{operators::ExecutionError, spark_plan::SparkPlan}; +//! Core macros for the modular planner framework /// Macro to extract a specific expression variant, panicking if called with wrong type. /// This should be used in expression builders where the registry guarantees the correct @@ -75,7 +65,7 @@ macro_rules! binary_expr_builder { ($builder_name:ident, $expr_type:ident, $operator:expr) => { pub struct $builder_name; - impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name { + impl $crate::execution::planner::expression_registry::ExpressionBuilder for $builder_name { fn build( &self, spark_expr: &datafusion_comet_proto::spark_expression::Expr, @@ -105,7 +95,7 @@ macro_rules! unary_expr_builder { ($builder_name:ident, $expr_type:ident, $expr_constructor:expr) => { pub struct $builder_name; - impl $crate::execution::planner::traits::ExpressionBuilder for $builder_name { + impl $crate::execution::planner::expression_registry::ExpressionBuilder for $builder_name { fn build( &self, spark_expr: &datafusion_comet_proto::spark_expression::Expr, @@ -122,118 +112,3 @@ macro_rules! unary_expr_builder { } }; } - -/// Trait for building physical expressions from Spark protobuf expressions -pub trait ExpressionBuilder: Send + Sync { - /// Build a DataFusion physical expression from a Spark protobuf expression - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &super::PhysicalPlanner, - ) -> Result, ExecutionError>; -} - -/// Trait for building physical operators from Spark protobuf operators -pub trait OperatorBuilder: Send + Sync { - /// Build a Spark plan from a protobuf operator - fn build( - &self, - spark_plan: &datafusion_comet_proto::spark_operator::Operator, - inputs: &mut Vec>, - partition_count: usize, - planner: &super::PhysicalPlanner, - ) -> Result<(Vec, Arc), ExecutionError>; -} - -/// Enum to identify different expression types for registry dispatch -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum ExpressionType { - // Arithmetic expressions - Add, - Subtract, - Multiply, - Divide, - IntegralDivide, - Remainder, - UnaryMinus, - - // Comparison expressions - Eq, - Neq, - Lt, - LtEq, - Gt, - GtEq, - EqNullSafe, - NeqNullSafe, - - // Logical expressions - And, - Or, - Not, - - // Null checks - IsNull, - IsNotNull, - - // Bitwise operations - BitwiseAnd, - BitwiseOr, - BitwiseXor, - BitwiseShiftLeft, - BitwiseShiftRight, - - // Other expressions - Bound, - Unbound, - Literal, - Cast, - CaseWhen, - In, - If, - Substring, - Like, - Rlike, - CheckOverflow, - ScalarFunc, - NormalizeNanAndZero, - Subquery, - BloomFilterMightContain, - CreateNamedStruct, - GetStructField, - ToJson, - ToPrettyString, - ListExtract, - GetArrayStructFields, - ArrayInsert, - Rand, - Randn, - SparkPartitionId, - MonotonicallyIncreasingId, - - // Time functions - Hour, - Minute, - Second, - TruncTimestamp, -} - -/// Enum to identify different operator types for registry dispatch -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum OperatorType { - Scan, - NativeScan, - IcebergScan, - Projection, - Filter, - HashAgg, - Limit, - Sort, - ShuffleWriter, - ParquetWriter, - Expand, - SortMergeJoin, - HashJoin, - Window, -} From 406f3d444f4829a0df1b20a51a26a1bfead82957 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 10 Dec 2025 13:55:14 -0700 Subject: [PATCH 3/5] fmt --- native/core/src/execution/planner.rs | 2 +- native/core/src/execution/planner/{traits.rs => macros.rs} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename native/core/src/execution/planner/{traits.rs => macros.rs} (100%) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 687aaac8b8..cc92310475 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -18,8 +18,8 @@ //! Converts Spark physical plan to DataFusion physical plan pub mod expression_registry; +pub mod macros; pub mod operator_registry; -pub mod traits; use crate::execution::operators::IcebergScanExec; use crate::{ diff --git a/native/core/src/execution/planner/traits.rs b/native/core/src/execution/planner/macros.rs similarity index 100% rename from native/core/src/execution/planner/traits.rs rename to native/core/src/execution/planner/macros.rs From 57c0a3f1bad27b7c09f15fb00c48f67f68d67a2c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 10 Dec 2025 14:07:57 -0700 Subject: [PATCH 4/5] update docs --- .../adding_a_new_expression.md | 148 ++++++++++++- .../adding_a_new_operator.md | 202 +++++++++++++++--- 2 files changed, 316 insertions(+), 34 deletions(-) diff --git a/docs/source/contributor-guide/adding_a_new_expression.md b/docs/source/contributor-guide/adding_a_new_expression.md index 74825f4301..33b69c8d34 100644 --- a/docs/source/contributor-guide/adding_a_new_expression.md +++ b/docs/source/contributor-guide/adding_a_new_expression.md @@ -262,19 +262,151 @@ message Add2 { ### Adding the Expression in Rust -With the serialization complete, the next step is to implement the expression in Rust and ensure that the incoming plan can make use of it. +With the serialization complete, the next step is to implement the expression in Rust. Comet now uses a modular expression registry pattern that provides better organization and type safety compared to monolithic match statements. -How this works is somewhat dependent on the type of expression you're adding. Expression implementations live in the `native/spark-expr/src/` directory, organized by category (e.g., `math_funcs/`, `string_funcs/`, `array_funcs/`). +#### File Organization -#### Generally Adding a New Expression +The expression-related code is organized as follows: -If you're adding a new expression that requires custom protobuf serialization, you may need to: +- `native/core/src/execution/planner/expression_registry.rs` - Contains `ExpressionBuilder` trait, `ExpressionType` enum, and `ExpressionRegistry` +- `native/core/src/execution/planner/macros.rs` - Contains shared macros (`extract_expr!`, `binary_expr_builder!`, `unary_expr_builder!`) +- `native/core/src/execution/expressions/` - Individual expression builder implementations organized by category +- `native/spark-expr/src/` - Scalar function implementations organized by category (e.g., `math_funcs/`, `string_funcs/`) -1. Add a new message to the protobuf definition in `native/proto/src/proto/expr.proto` -2. Add a native expression handler in `expression_registry.rs` to deserialize the new protobuf message type and - create a native expression +#### Option A: Using the Expression Registry (Recommended for Complex Expressions) -For most expressions, you can skip this step if you're using the existing scalar function infrastructure. +For expressions that need custom protobuf handling or complex logic, use the modular registry pattern. + +##### Create an ExpressionBuilder + +Create or update a file in `native/core/src/execution/expressions/` (e.g., `comparison.rs`, `arithmetic.rs`): + +```rust +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_proto::spark_expression::Expr; + +use crate::execution::{ + operators::ExecutionError, + planner::{expression_registry::ExpressionBuilder, PhysicalPlanner}, +}; +use crate::extract_expr; + +/// Builder for YourNewExpression expressions +pub struct YourExpressionBuilder; + +impl ExpressionBuilder for YourExpressionBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + // Use extract_expr! macro for type-safe extraction + let expr = extract_expr!(spark_expr, YourNewExpression); + + // Convert child expressions + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + + // Create and return the DataFusion physical expression + Ok(Arc::new(YourDataFusionExpr::new(left, right))) + } +} +``` + +For simple binary expressions, you can use the `binary_expr_builder!` macro: + +```rust +use datafusion::logical_expr::Operator as DataFusionOperator; +use crate::binary_expr_builder; + +// This generates a complete ExpressionBuilder implementation +binary_expr_builder!(YourBinaryExprBuilder, YourBinaryExpr, DataFusionOperator::Plus); +``` + +##### Register the ExpressionBuilder + +Add the ExpressionType to the enum in `native/core/src/execution/planner/expression_registry.rs`: + +```rust +/// Enum to identify different expression types for registry dispatch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ExpressionType { + // Arithmetic expressions + Add, + Subtract, + // ... existing expressions ... + YourNewExpression, // Add your expression type here +} +``` + +Register your builder in the `ExpressionRegistry` implementation: + +```rust +/// Register all expression builders +fn register_all_expressions(&mut self) { + self.register_arithmetic_expressions(); + self.register_comparison_expressions(); + self.register_your_expressions(); // Add this line +} + +/// Register your new expressions +fn register_your_expressions(&mut self) { + use crate::execution::expressions::your_category::YourExpressionBuilder; + + self.builders + .insert(ExpressionType::YourNewExpression, Box::new(YourExpressionBuilder)); +} +``` + +Update the `get_expression_type` function to map your protobuf expression: + +```rust +fn get_expression_type(spark_expr: &Expr) -> Option { + use datafusion_comet_proto::spark_expression::expr::ExprStruct; + + match spark_expr.expr_struct.as_ref()? { + ExprStruct::Add(_) => Some(ExpressionType::Add), + ExprStruct::Subtract(_) => Some(ExpressionType::Subtract), + // ... existing expressions ... + ExprStruct::YourNewExpression(_) => Some(ExpressionType::YourNewExpression), + } +} +``` + +**Note**: See existing implementations in `native/core/src/execution/expressions/` for working examples, such as `arithmetic.rs`, `comparison.rs`, etc. + +#### Option B: Using Scalar Functions (Recommended for Simple Functions) + +For expressions that map directly to scalar functions, use the existing scalar function infrastructure. This approach is simpler for basic functions but less flexible than the registry pattern. + +**When to use the Registry Pattern (Option A):** + +- Complex expressions that need custom deserialization logic +- Expressions with multiple variants or complex parameter handling +- Binary/unary expressions that benefit from type-safe extraction +- Expressions that need custom DataFusion physical expression implementations + +**When to use Scalar Functions (Option B):** + +- Simple functions that map directly to DataFusion scalar functions +- Functions that don't need complex parameter handling +- Functions where the existing `CometScalarFunction` pattern is sufficient + +**Benefits of the Registry Pattern:** + +- **Better Organization**: Each expression's logic is isolated +- **Type Safety**: The `extract_expr!` macro ensures compile-time correctness +- **Extensibility**: New expressions can be added without modifying core planner logic +- **Code Reuse**: Macros like `binary_expr_builder!` reduce boilerplate +- **Graceful Fallback**: Unregistered expressions automatically fall back to the monolithic match + +#### Option C: Fallback to Monolithic Match (Legacy) + +If you need to add an expression but prefer not to use the registry pattern, expressions that aren't registered will automatically fall back to the legacy monolithic match statement in `create_expr()`. However, the registry pattern (Option A) is strongly recommended for new expressions. #### Adding a New Scalar Function Expression diff --git a/docs/source/contributor-guide/adding_a_new_operator.md b/docs/source/contributor-guide/adding_a_new_operator.md index 4317943aa8..ee5987bece 100644 --- a/docs/source/contributor-guide/adding_a_new_operator.md +++ b/docs/source/contributor-guide/adding_a_new_operator.md @@ -326,51 +326,201 @@ Run `make` to update the user guide. The new configuration option will be added ### Step 5: Implement the Native Operator in Rust -#### Update the Planner +Comet now uses a modular operator registry pattern that provides better organization and type safety compared to monolithic match statements. -In `native/core/src/execution/planner.rs`, add a match case in the operator deserialization logic to handle your new protobuf message: +#### File Organization + +The operator-related code is organized as follows: + +- `native/core/src/execution/planner/operator_registry.rs` - Contains `OperatorBuilder` trait, `OperatorType` enum, and `OperatorRegistry` +- `native/core/src/execution/planner/expression_registry.rs` - Contains `ExpressionBuilder` trait, `ExpressionType` enum, and `ExpressionRegistry` +- `native/core/src/execution/planner/macros.rs` - Contains shared macros (`extract_op!`, `extract_expr!`, etc.) +- `native/core/src/execution/operators/` - Individual operator builder implementations + +#### Option A: Using the Operator Registry (Recommended) + +The recommended approach is to create an `OperatorBuilder` and register it with the `OperatorRegistry`. + +**Note**: See `native/core/src/execution/operators/projection.rs` for a complete working example of `ProjectionBuilder` that follows this pattern. + +##### Create an OperatorBuilder + +Create a new file in `native/core/src/execution/operators/` (e.g., `your_operator.rs`): + +```rust +use std::sync::Arc; + +use datafusion::physical_plan::your_operator::YourDataFusionExec; +use datafusion_comet_proto::spark_operator::Operator; +use jni::objects::GlobalRef; + +use crate::{ + execution::{ + operators::{ExecutionError, ScanExec}, + planner::{operator_registry::OperatorBuilder, PhysicalPlanner}, + spark_plan::SparkPlan, + }, + extract_op, +}; + +/// Builder for YourNewOperator operators +pub struct YourOperatorBuilder; + +impl OperatorBuilder for YourOperatorBuilder { + fn build( + &self, + spark_plan: &Operator, + inputs: &mut Vec>, + partition_count: usize, + planner: &PhysicalPlanner, + ) -> Result<(Vec, Arc), ExecutionError> { + // Use extract_op! macro for type-safe extraction + let your_op = extract_op!(spark_plan, YourNewOperator); + let children = &spark_plan.children; + + assert_eq!(children.len(), 1); // Adjust based on your operator's arity + let (scans, child) = planner.create_plan(&children[0], inputs, partition_count)?; + + // Convert protobuf fields to DataFusion components + let expressions: Result, _> = your_op + .expressions + .iter() + .map(|expr| planner.create_expr(expr, child.schema())) + .collect(); + + // Create DataFusion operator + let datafusion_exec = Arc::new(YourDataFusionExec::try_new( + expressions?, + Arc::clone(&child.native_plan), + )?); + + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, datafusion_exec, vec![child])), + )) + } +} +``` + +##### Register the OperatorBuilder + +Add the OperatorType to the enum in `native/core/src/execution/planner/operator_registry.rs`: ```rust -use datafusion_comet_proto::spark_operator::operator::OpStruct; +/// Enum to identify different operator types for registry dispatch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OperatorType { + Scan, + NativeScan, + IcebergScan, + Projection, + Filter, + HashAgg, + Limit, + Sort, + ShuffleWriter, + ParquetWriter, + Expand, + SortMergeJoin, + HashJoin, + Window, + YourNewOperator, // Add your operator type here +} +``` + +Update the `get_operator_type` function in the same file: + +```rust +fn get_operator_type(spark_operator: &Operator) -> Option { + use datafusion_comet_proto::spark_operator::operator::OpStruct; + + match spark_operator.op_struct.as_ref()? { + OpStruct::Projection(_) => Some(OperatorType::Projection), + OpStruct::Filter(_) => Some(OperatorType::Filter), + // ... existing operators ... + OpStruct::YourNewOperator(_) => Some(OperatorType::YourNewOperator), + OpStruct::Explode(_) => None, // Not yet in OperatorType enum + } +} +``` -// In the create_plan or similar method: -match op.op_struct.as_ref() { - Some(OpStruct::Scan(scan)) => { +Register your builder in the `OperatorRegistry` implementation: + +```rust +/// Register all operator builders +fn register_all_operators(&mut self) { + self.register_projection_operators(); + self.register_your_operator(); // Add this line +} + +/// Register your new operator +fn register_your_operator(&mut self) { + use crate::execution::operators::your_operator::YourOperatorBuilder; + + self.builders + .insert(OperatorType::YourNewOperator, Box::new(YourOperatorBuilder)); +} +``` + +##### Add the module declaration + +Add your new module to `native/core/src/execution/operators/mod.rs`: + +```rust +pub mod your_operator; +``` + +#### Option B: Using the Fallback Match (Legacy) + +If you prefer not to use the registry pattern (or for complex operators that don't fit the pattern), you can still add a match case in the fallback logic in `native/core/src/execution/planner.rs`: + +```rust +// In the create_plan method's fallback match statement: +match spark_plan.op_struct.as_ref().unwrap() { + OpStruct::Filter(filter) => { // ... existing cases ... } - Some(OpStruct::YourNewOperator(your_op)) => { - create_your_operator_exec(your_op, children, session_ctx) + OpStruct::YourNewOperator(your_op) => { + create_your_operator_exec(your_op, children, partition_count) } - // ... other cases ... + _ => Err(GeneralError(format!( + "Unsupported or unregistered operator type: {:?}", + spark_plan.op_struct + ))), } ``` -#### Implement the Operator +However, the registry approach (Option A) is strongly recommended because it provides: -Create the operator implementation, either in an existing file or a new file in `native/core/src/execution/operators/`: +- **Better organization**: Each operator's logic is isolated +- **Type safety**: The `extract_op!` macro ensures compile-time correctness +- **Extensibility**: New operators can be added without modifying core planner logic +- **Consistency**: Follows the established pattern for expressions + +#### Custom Operator Implementation + +If your operator doesn't have a direct DataFusion equivalent, you'll need to implement a custom operator. See `native/core/src/execution/operators/expand.rs` for a complete example: ```rust use datafusion::physical_plan::{ExecutionPlan, ...}; -use datafusion_comet_proto::spark_operator::YourNewOperator; - -pub fn create_your_operator_exec( - op: &YourNewOperator, - children: Vec>, - session_ctx: &SessionContext, -) -> Result, ExecutionError> { - // Deserialize expressions and configuration - // Create and return the execution plan - // Option 1: Use existing DataFusion operator - // Ok(Arc::new(SomeDataFusionExec::try_new(...)?)) +#[derive(Debug)] +pub struct YourCustomExec { + // Your operator's fields +} - // Option 2: Implement custom operator (see ExpandExec for example) - // Ok(Arc::new(YourCustomExec::new(...))) +impl ExecutionPlan for YourCustomExec { + // Implement required methods + fn as_any(&self) -> &dyn Any { self } + fn schema(&self) -> SchemaRef { /* ... */ } + fn output_partitioning(&self) -> Partitioning { /* ... */ } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { /* ... */ } + fn children(&self) -> Vec> { /* ... */ } + fn with_new_children(&self, children: Vec>) -> Result> { /* ... */ } + fn execute(&self, partition: usize, context: Arc) -> Result { /* ... */ } } ``` -For custom operators, you'll need to implement the `ExecutionPlan` trait. See `native/core/src/execution/operators/expand.rs` or `scan.rs` for examples. - ### Step 6: Add Tests #### Scala Integration Tests From e579fd9c66ffb3c99e4a68dbbaf3652f78253d8c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 10 Dec 2025 15:49:59 -0700 Subject: [PATCH 5/5] revert docs --- .../adding_a_new_expression.md | 148 +------------ .../adding_a_new_operator.md | 202 +++--------------- 2 files changed, 34 insertions(+), 316 deletions(-) diff --git a/docs/source/contributor-guide/adding_a_new_expression.md b/docs/source/contributor-guide/adding_a_new_expression.md index 33b69c8d34..74825f4301 100644 --- a/docs/source/contributor-guide/adding_a_new_expression.md +++ b/docs/source/contributor-guide/adding_a_new_expression.md @@ -262,151 +262,19 @@ message Add2 { ### Adding the Expression in Rust -With the serialization complete, the next step is to implement the expression in Rust. Comet now uses a modular expression registry pattern that provides better organization and type safety compared to monolithic match statements. +With the serialization complete, the next step is to implement the expression in Rust and ensure that the incoming plan can make use of it. -#### File Organization +How this works is somewhat dependent on the type of expression you're adding. Expression implementations live in the `native/spark-expr/src/` directory, organized by category (e.g., `math_funcs/`, `string_funcs/`, `array_funcs/`). -The expression-related code is organized as follows: +#### Generally Adding a New Expression -- `native/core/src/execution/planner/expression_registry.rs` - Contains `ExpressionBuilder` trait, `ExpressionType` enum, and `ExpressionRegistry` -- `native/core/src/execution/planner/macros.rs` - Contains shared macros (`extract_expr!`, `binary_expr_builder!`, `unary_expr_builder!`) -- `native/core/src/execution/expressions/` - Individual expression builder implementations organized by category -- `native/spark-expr/src/` - Scalar function implementations organized by category (e.g., `math_funcs/`, `string_funcs/`) +If you're adding a new expression that requires custom protobuf serialization, you may need to: -#### Option A: Using the Expression Registry (Recommended for Complex Expressions) +1. Add a new message to the protobuf definition in `native/proto/src/proto/expr.proto` +2. Add a native expression handler in `expression_registry.rs` to deserialize the new protobuf message type and + create a native expression -For expressions that need custom protobuf handling or complex logic, use the modular registry pattern. - -##### Create an ExpressionBuilder - -Create or update a file in `native/core/src/execution/expressions/` (e.g., `comparison.rs`, `arithmetic.rs`): - -```rust -use std::sync::Arc; - -use arrow::datatypes::SchemaRef; -use datafusion::physical_expr::PhysicalExpr; -use datafusion_comet_proto::spark_expression::Expr; - -use crate::execution::{ - operators::ExecutionError, - planner::{expression_registry::ExpressionBuilder, PhysicalPlanner}, -}; -use crate::extract_expr; - -/// Builder for YourNewExpression expressions -pub struct YourExpressionBuilder; - -impl ExpressionBuilder for YourExpressionBuilder { - fn build( - &self, - spark_expr: &Expr, - input_schema: SchemaRef, - planner: &PhysicalPlanner, - ) -> Result, ExecutionError> { - // Use extract_expr! macro for type-safe extraction - let expr = extract_expr!(spark_expr, YourNewExpression); - - // Convert child expressions - let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; - let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; - - // Create and return the DataFusion physical expression - Ok(Arc::new(YourDataFusionExpr::new(left, right))) - } -} -``` - -For simple binary expressions, you can use the `binary_expr_builder!` macro: - -```rust -use datafusion::logical_expr::Operator as DataFusionOperator; -use crate::binary_expr_builder; - -// This generates a complete ExpressionBuilder implementation -binary_expr_builder!(YourBinaryExprBuilder, YourBinaryExpr, DataFusionOperator::Plus); -``` - -##### Register the ExpressionBuilder - -Add the ExpressionType to the enum in `native/core/src/execution/planner/expression_registry.rs`: - -```rust -/// Enum to identify different expression types for registry dispatch -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum ExpressionType { - // Arithmetic expressions - Add, - Subtract, - // ... existing expressions ... - YourNewExpression, // Add your expression type here -} -``` - -Register your builder in the `ExpressionRegistry` implementation: - -```rust -/// Register all expression builders -fn register_all_expressions(&mut self) { - self.register_arithmetic_expressions(); - self.register_comparison_expressions(); - self.register_your_expressions(); // Add this line -} - -/// Register your new expressions -fn register_your_expressions(&mut self) { - use crate::execution::expressions::your_category::YourExpressionBuilder; - - self.builders - .insert(ExpressionType::YourNewExpression, Box::new(YourExpressionBuilder)); -} -``` - -Update the `get_expression_type` function to map your protobuf expression: - -```rust -fn get_expression_type(spark_expr: &Expr) -> Option { - use datafusion_comet_proto::spark_expression::expr::ExprStruct; - - match spark_expr.expr_struct.as_ref()? { - ExprStruct::Add(_) => Some(ExpressionType::Add), - ExprStruct::Subtract(_) => Some(ExpressionType::Subtract), - // ... existing expressions ... - ExprStruct::YourNewExpression(_) => Some(ExpressionType::YourNewExpression), - } -} -``` - -**Note**: See existing implementations in `native/core/src/execution/expressions/` for working examples, such as `arithmetic.rs`, `comparison.rs`, etc. - -#### Option B: Using Scalar Functions (Recommended for Simple Functions) - -For expressions that map directly to scalar functions, use the existing scalar function infrastructure. This approach is simpler for basic functions but less flexible than the registry pattern. - -**When to use the Registry Pattern (Option A):** - -- Complex expressions that need custom deserialization logic -- Expressions with multiple variants or complex parameter handling -- Binary/unary expressions that benefit from type-safe extraction -- Expressions that need custom DataFusion physical expression implementations - -**When to use Scalar Functions (Option B):** - -- Simple functions that map directly to DataFusion scalar functions -- Functions that don't need complex parameter handling -- Functions where the existing `CometScalarFunction` pattern is sufficient - -**Benefits of the Registry Pattern:** - -- **Better Organization**: Each expression's logic is isolated -- **Type Safety**: The `extract_expr!` macro ensures compile-time correctness -- **Extensibility**: New expressions can be added without modifying core planner logic -- **Code Reuse**: Macros like `binary_expr_builder!` reduce boilerplate -- **Graceful Fallback**: Unregistered expressions automatically fall back to the monolithic match - -#### Option C: Fallback to Monolithic Match (Legacy) - -If you need to add an expression but prefer not to use the registry pattern, expressions that aren't registered will automatically fall back to the legacy monolithic match statement in `create_expr()`. However, the registry pattern (Option A) is strongly recommended for new expressions. +For most expressions, you can skip this step if you're using the existing scalar function infrastructure. #### Adding a New Scalar Function Expression diff --git a/docs/source/contributor-guide/adding_a_new_operator.md b/docs/source/contributor-guide/adding_a_new_operator.md index ee5987bece..4317943aa8 100644 --- a/docs/source/contributor-guide/adding_a_new_operator.md +++ b/docs/source/contributor-guide/adding_a_new_operator.md @@ -326,201 +326,51 @@ Run `make` to update the user guide. The new configuration option will be added ### Step 5: Implement the Native Operator in Rust -Comet now uses a modular operator registry pattern that provides better organization and type safety compared to monolithic match statements. +#### Update the Planner -#### File Organization - -The operator-related code is organized as follows: - -- `native/core/src/execution/planner/operator_registry.rs` - Contains `OperatorBuilder` trait, `OperatorType` enum, and `OperatorRegistry` -- `native/core/src/execution/planner/expression_registry.rs` - Contains `ExpressionBuilder` trait, `ExpressionType` enum, and `ExpressionRegistry` -- `native/core/src/execution/planner/macros.rs` - Contains shared macros (`extract_op!`, `extract_expr!`, etc.) -- `native/core/src/execution/operators/` - Individual operator builder implementations - -#### Option A: Using the Operator Registry (Recommended) - -The recommended approach is to create an `OperatorBuilder` and register it with the `OperatorRegistry`. - -**Note**: See `native/core/src/execution/operators/projection.rs` for a complete working example of `ProjectionBuilder` that follows this pattern. - -##### Create an OperatorBuilder - -Create a new file in `native/core/src/execution/operators/` (e.g., `your_operator.rs`): - -```rust -use std::sync::Arc; - -use datafusion::physical_plan::your_operator::YourDataFusionExec; -use datafusion_comet_proto::spark_operator::Operator; -use jni::objects::GlobalRef; - -use crate::{ - execution::{ - operators::{ExecutionError, ScanExec}, - planner::{operator_registry::OperatorBuilder, PhysicalPlanner}, - spark_plan::SparkPlan, - }, - extract_op, -}; - -/// Builder for YourNewOperator operators -pub struct YourOperatorBuilder; - -impl OperatorBuilder for YourOperatorBuilder { - fn build( - &self, - spark_plan: &Operator, - inputs: &mut Vec>, - partition_count: usize, - planner: &PhysicalPlanner, - ) -> Result<(Vec, Arc), ExecutionError> { - // Use extract_op! macro for type-safe extraction - let your_op = extract_op!(spark_plan, YourNewOperator); - let children = &spark_plan.children; - - assert_eq!(children.len(), 1); // Adjust based on your operator's arity - let (scans, child) = planner.create_plan(&children[0], inputs, partition_count)?; - - // Convert protobuf fields to DataFusion components - let expressions: Result, _> = your_op - .expressions - .iter() - .map(|expr| planner.create_expr(expr, child.schema())) - .collect(); - - // Create DataFusion operator - let datafusion_exec = Arc::new(YourDataFusionExec::try_new( - expressions?, - Arc::clone(&child.native_plan), - )?); - - Ok(( - scans, - Arc::new(SparkPlan::new(spark_plan.plan_id, datafusion_exec, vec![child])), - )) - } -} -``` - -##### Register the OperatorBuilder - -Add the OperatorType to the enum in `native/core/src/execution/planner/operator_registry.rs`: +In `native/core/src/execution/planner.rs`, add a match case in the operator deserialization logic to handle your new protobuf message: ```rust -/// Enum to identify different operator types for registry dispatch -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum OperatorType { - Scan, - NativeScan, - IcebergScan, - Projection, - Filter, - HashAgg, - Limit, - Sort, - ShuffleWriter, - ParquetWriter, - Expand, - SortMergeJoin, - HashJoin, - Window, - YourNewOperator, // Add your operator type here -} -``` - -Update the `get_operator_type` function in the same file: - -```rust -fn get_operator_type(spark_operator: &Operator) -> Option { - use datafusion_comet_proto::spark_operator::operator::OpStruct; - - match spark_operator.op_struct.as_ref()? { - OpStruct::Projection(_) => Some(OperatorType::Projection), - OpStruct::Filter(_) => Some(OperatorType::Filter), - // ... existing operators ... - OpStruct::YourNewOperator(_) => Some(OperatorType::YourNewOperator), - OpStruct::Explode(_) => None, // Not yet in OperatorType enum - } -} -``` +use datafusion_comet_proto::spark_operator::operator::OpStruct; -Register your builder in the `OperatorRegistry` implementation: - -```rust -/// Register all operator builders -fn register_all_operators(&mut self) { - self.register_projection_operators(); - self.register_your_operator(); // Add this line -} - -/// Register your new operator -fn register_your_operator(&mut self) { - use crate::execution::operators::your_operator::YourOperatorBuilder; - - self.builders - .insert(OperatorType::YourNewOperator, Box::new(YourOperatorBuilder)); -} -``` - -##### Add the module declaration - -Add your new module to `native/core/src/execution/operators/mod.rs`: - -```rust -pub mod your_operator; -``` - -#### Option B: Using the Fallback Match (Legacy) - -If you prefer not to use the registry pattern (or for complex operators that don't fit the pattern), you can still add a match case in the fallback logic in `native/core/src/execution/planner.rs`: - -```rust -// In the create_plan method's fallback match statement: -match spark_plan.op_struct.as_ref().unwrap() { - OpStruct::Filter(filter) => { +// In the create_plan or similar method: +match op.op_struct.as_ref() { + Some(OpStruct::Scan(scan)) => { // ... existing cases ... } - OpStruct::YourNewOperator(your_op) => { - create_your_operator_exec(your_op, children, partition_count) + Some(OpStruct::YourNewOperator(your_op)) => { + create_your_operator_exec(your_op, children, session_ctx) } - _ => Err(GeneralError(format!( - "Unsupported or unregistered operator type: {:?}", - spark_plan.op_struct - ))), + // ... other cases ... } ``` -However, the registry approach (Option A) is strongly recommended because it provides: +#### Implement the Operator -- **Better organization**: Each operator's logic is isolated -- **Type safety**: The `extract_op!` macro ensures compile-time correctness -- **Extensibility**: New operators can be added without modifying core planner logic -- **Consistency**: Follows the established pattern for expressions - -#### Custom Operator Implementation - -If your operator doesn't have a direct DataFusion equivalent, you'll need to implement a custom operator. See `native/core/src/execution/operators/expand.rs` for a complete example: +Create the operator implementation, either in an existing file or a new file in `native/core/src/execution/operators/`: ```rust use datafusion::physical_plan::{ExecutionPlan, ...}; +use datafusion_comet_proto::spark_operator::YourNewOperator; -#[derive(Debug)] -pub struct YourCustomExec { - // Your operator's fields -} +pub fn create_your_operator_exec( + op: &YourNewOperator, + children: Vec>, + session_ctx: &SessionContext, +) -> Result, ExecutionError> { + // Deserialize expressions and configuration + // Create and return the execution plan -impl ExecutionPlan for YourCustomExec { - // Implement required methods - fn as_any(&self) -> &dyn Any { self } - fn schema(&self) -> SchemaRef { /* ... */ } - fn output_partitioning(&self) -> Partitioning { /* ... */ } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { /* ... */ } - fn children(&self) -> Vec> { /* ... */ } - fn with_new_children(&self, children: Vec>) -> Result> { /* ... */ } - fn execute(&self, partition: usize, context: Arc) -> Result { /* ... */ } + // Option 1: Use existing DataFusion operator + // Ok(Arc::new(SomeDataFusionExec::try_new(...)?)) + + // Option 2: Implement custom operator (see ExpandExec for example) + // Ok(Arc::new(YourCustomExec::new(...))) } ``` +For custom operators, you'll need to implement the `ExecutionPlan` trait. See `native/core/src/execution/operators/expand.rs` or `scan.rs` for examples. + ### Step 6: Add Tests #### Scala Integration Tests