diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index faa59a0da6..d171956166 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -69,9 +69,9 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} restore-keys: | - ${{ runner.os }}-cargo-ci- + ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}- - name: Build native library # Use CI profile for faster builds (no LTO) and to share cache with pr_build_linux.yml. @@ -88,7 +88,7 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} - name: Upload native library uses: actions/upload-artifact@v6 diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 59ded46203..ae433b7db7 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -84,9 +84,9 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} restore-keys: | - ${{ runner.os }}-cargo-ci- + ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}- - name: Build native library (CI profile) run: | @@ -112,7 +112,7 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} # Run Rust tests (runs in parallel with build-native, uses debug builds) linux-test-rust: @@ -138,9 +138,9 @@ jobs: ~/.cargo/git native/target # Note: Java version intentionally excluded - Rust target is JDK-independent - key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} restore-keys: | - ${{ runner.os }}-cargo-debug- + ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}- - name: Rust test steps uses: ./.github/actions/rust-test @@ -153,7 +153,7 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} linux-test: needs: build-native diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index b37c7ccb4d..9c4064f5d3 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -84,9 +84,9 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} restore-keys: | - ${{ runner.os }}-cargo-ci- + ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}- - name: Build native library (CI profile) run: | @@ -112,7 +112,7 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} macos-aarch64-test: needs: build-native diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 610baa9f2d..1332704701 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -75,9 +75,9 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} restore-keys: | - ${{ runner.os }}-cargo-ci- + ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}- - name: Build native library (CI profile) run: | @@ -101,7 +101,7 @@ jobs: ~/.cargo/registry ~/.cargo/git native/target - key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }} + key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }} spark-sql-test: needs: build-native diff --git a/native/core/src/parquet/cast_column.rs b/native/core/src/parquet/cast_column.rs index b03cf209f4..a44166a70b 100644 --- a/native/core/src/parquet/cast_column.rs +++ b/native/core/src/parquet/cast_column.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray}, + array::{make_array, ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray}, compute::CastOptions, datatypes::{DataType, FieldRef, Schema, TimeUnit}, record_batch::RecordBatch, }; +use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; use datafusion::common::format::DEFAULT_CAST_OPTIONS; use datafusion::common::Result as DataFusionResult; use datafusion::common::ScalarValue; @@ -33,6 +34,59 @@ use std::{ sync::Arc, }; +/// Returns true if two DataTypes are structurally equivalent (same data layout) +/// but may differ in field names within nested types. +fn types_differ_only_in_field_names(physical: &DataType, logical: &DataType) -> bool { + match (physical, logical) { + (DataType::List(pf), DataType::List(lf)) => { + pf.is_nullable() == lf.is_nullable() + && (pf.data_type() == lf.data_type() + || types_differ_only_in_field_names(pf.data_type(), lf.data_type())) + } + (DataType::LargeList(pf), DataType::LargeList(lf)) => { + pf.is_nullable() == lf.is_nullable() + && (pf.data_type() == lf.data_type() + || types_differ_only_in_field_names(pf.data_type(), lf.data_type())) + } + (DataType::Map(pf, p_sorted), DataType::Map(lf, l_sorted)) => { + p_sorted == l_sorted + && pf.is_nullable() == lf.is_nullable() + && (pf.data_type() == lf.data_type() + || types_differ_only_in_field_names(pf.data_type(), lf.data_type())) + } + (DataType::Struct(pfields), DataType::Struct(lfields)) => { + // For Struct types, field names are semantically meaningful (they + // identify different columns), so we require name equality here. + // This distinguishes from List/Map wrapper field names ("item" vs + // "element") which are purely cosmetic. + pfields.len() == lfields.len() + && pfields.iter().zip(lfields.iter()).all(|(pf, lf)| { + pf.name() == lf.name() + && pf.is_nullable() == lf.is_nullable() + && (pf.data_type() == lf.data_type() + || types_differ_only_in_field_names(pf.data_type(), lf.data_type())) + }) + } + _ => false, + } +} + +/// Recursively relabel an array so its DataType matches `target_type`. +/// This only changes metadata (field names, nullability flags in nested fields); +/// it does NOT change the underlying buffer data. +fn relabel_array(array: ArrayRef, target_type: &DataType) -> ArrayRef { + if array.data_type() == target_type { + return array; + } + let data = array.to_data(); + let new_data = data + .into_builder() + .data_type(target_type.clone()) + .build() + .expect("relabel_array: data layout must be compatible"); + make_array(new_data) +} + /// Casts a Timestamp(Microsecond) array to Timestamp(Millisecond) by dividing values by 1000. /// Preserves the timezone from the target type. fn cast_timestamp_micros_to_millis_array( @@ -79,6 +133,9 @@ pub struct CometCastColumnExpr { target_field: FieldRef, /// Options forwarded to [`cast_column`]. cast_options: CastOptions<'static>, + /// Spark parquet options for complex nested type conversions. + /// When present, enables `spark_parquet_convert` as a fallback. + parquet_options: Option, } // Manually derive `PartialEq`/`Hash` as `Arc` does not @@ -89,6 +146,7 @@ impl PartialEq for CometCastColumnExpr { && self.input_physical_field.eq(&other.input_physical_field) && self.target_field.eq(&other.target_field) && self.cast_options.eq(&other.cast_options) + && self.parquet_options.eq(&other.parquet_options) } } @@ -98,6 +156,7 @@ impl Hash for CometCastColumnExpr { self.input_physical_field.hash(state); self.target_field.hash(state); self.cast_options.hash(state); + self.parquet_options.hash(state); } } @@ -114,8 +173,15 @@ impl CometCastColumnExpr { input_physical_field: physical_field, target_field, cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS), + parquet_options: None, } } + + /// Set Spark parquet options to enable complex nested type conversions. + pub fn with_parquet_options(mut self, options: SparkParquetOptions) -> Self { + self.parquet_options = Some(options); + self + } } impl Display for CometCastColumnExpr { @@ -145,18 +211,17 @@ impl PhysicalExpr for CometCastColumnExpr { fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { let value = self.expr.evaluate(batch)?; - if value - .data_type() - .equals_datatype(self.target_field.data_type()) - { + // Use == (PartialEq) instead of equals_datatype because equals_datatype + // ignores field names in nested types (Struct, List, Map). We need to detect + // when field names differ (e.g., Struct("a","b") vs Struct("c","d")) so that + // we can apply spark_parquet_convert for field-name-based selection. + if value.data_type() == *self.target_field.data_type() { return Ok(value); } let input_physical_field = self.input_physical_field.data_type(); let target_field = self.target_field.data_type(); - // dbg!(&input_physical_field, &target_field, &value); - // Handle specific type conversions with custom casts match (input_physical_field, target_field) { // Timestamp(Microsecond) -> Timestamp(Millisecond) @@ -174,7 +239,30 @@ impl PhysicalExpr for CometCastColumnExpr { } _ => Ok(value), }, - _ => Ok(value), + // Nested types that differ only in field names (e.g., List element named + // "item" vs "element", or Map entries named "key_value" vs "entries"). + // Re-label the array so the DataType metadata matches the logical schema. + (physical, logical) + if physical != logical && types_differ_only_in_field_names(physical, logical) => + { + match value { + ColumnarValue::Array(array) => { + let relabeled = relabel_array(array, logical); + Ok(ColumnarValue::Array(relabeled)) + } + other => Ok(other), + } + } + // Fallback: use spark_parquet_convert for complex nested type conversions + // (e.g., List → List, Map field selection, etc.) + _ => { + if let Some(parquet_options) = &self.parquet_options { + let converted = spark_parquet_convert(value, target_field, parquet_options)?; + Ok(converted) + } else { + Ok(value) + } + } } } @@ -192,12 +280,16 @@ impl PhysicalExpr for CometCastColumnExpr { ) -> DataFusionResult> { assert_eq!(children.len(), 1); let child = children.pop().expect("CastColumnExpr child"); - Ok(Arc::new(Self::new( + let mut new_expr = Self::new( child, Arc::clone(&self.input_physical_field), Arc::clone(&self.target_field), Some(self.cast_options.clone()), - ))) + ); + if let Some(opts) = &self.parquet_options { + new_expr = new_expr.with_parquet_options(opts.clone()); + } + Ok(Arc::new(new_expr)) } fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index db1859f4d9..2f0ecb8e87 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -26,7 +26,7 @@ use crate::parquet::cast_column::CometCastColumnExpr; use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion::common::{ColumnStatistics, Result as DataFusionResult}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; @@ -116,18 +116,69 @@ struct SparkPhysicalExprAdapter { impl PhysicalExprAdapter for SparkPhysicalExprAdapter { fn rewrite(&self, expr: Arc) -> DataFusionResult> { - // dbg!(&expr); - - let expr = self.default_adapter.rewrite(expr)?; - - //self.cast_datafusion_unsupported_expr(expr) - - expr.transform(|e| self.replace_with_spark_cast(e)).data() + // First let the default adapter handle column remapping, missing columns, + // and simple scalar type casts. Then replace DataFusion's CastColumnExpr + // with Spark-compatible equivalents. + // + // The default adapter may fail for complex nested type casts (List, Map). + // In that case, fall back to wrapping everything ourselves. + let expr = match self.default_adapter.rewrite(Arc::clone(&expr)) { + Ok(rewritten) => { + // Replace DataFusion's CastColumnExpr with either: + // - CometCastColumnExpr (for Struct/List/Map, uses spark_parquet_convert) + // - Spark Cast (for simple scalar types) + rewritten + .transform(|e| self.replace_with_spark_cast(e)) + .data()? + } + Err(_) => { + // Default adapter failed (likely complex nested type cast). + // Handle all type mismatches ourselves using spark_parquet_convert. + self.wrap_all_type_mismatches(expr)? + } + }; + Ok(expr) } } #[allow(dead_code)] impl SparkPhysicalExprAdapter { + /// Wrap ALL Column expressions that have type mismatches with CometCastColumnExpr. + /// This is the fallback path when the default adapter fails (e.g., for complex + /// nested type casts like List or Map). Uses `spark_parquet_convert` + /// under the hood for the actual type conversion. + fn wrap_all_type_mismatches( + &self, + expr: Arc, + ) -> DataFusionResult> { + expr.transform(|e| { + if let Some(column) = e.as_any().downcast_ref::() { + let col_idx = column.index(); + + let logical_field = self.logical_file_schema.fields().get(col_idx); + let physical_field = self.physical_file_schema.fields().get(col_idx); + + if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field) + { + if logical_field.data_type() != physical_field.data_type() { + let cast_expr: Arc = Arc::new( + CometCastColumnExpr::new( + Arc::clone(&e), + Arc::clone(physical_field), + Arc::clone(logical_field), + None, + ) + .with_parquet_options(self.parquet_options.clone()), + ); + return Ok(Transformed::yes(cast_expr)); + } + } + } + Ok(Transformed::no(e)) + }) + .data() + } + /// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast expression. fn replace_with_spark_cast( &self, @@ -140,9 +191,31 @@ impl SparkPhysicalExprAdapter { .downcast_ref::() { let child = Arc::clone(cast.expr()); - let target_type = cast.target_field().data_type().clone(); + let physical_type = cast.input_field().data_type(); + let target_type = cast.target_field().data_type(); + + // For complex nested types (Struct, List, Map), use CometCastColumnExpr + // with spark_parquet_convert which handles field-name-based selection, + // reordering, and nested type casting correctly. + if matches!( + (physical_type, target_type), + (DataType::Struct(_), DataType::Struct(_)) + | (DataType::List(_), DataType::List(_)) + | (DataType::Map(_, _), DataType::Map(_, _)) + ) { + let comet_cast: Arc = Arc::new( + CometCastColumnExpr::new( + child, + Arc::clone(cast.input_field()), + Arc::clone(cast.target_field()), + None, + ) + .with_parquet_options(self.parquet_options.clone()), + ); + return Ok(Transformed::yes(comet_cast)); + } - // Create Spark-compatible cast options + // For simple scalar type casts, use Spark-compatible Cast expression let mut cast_options = SparkCastOptions::new( self.parquet_options.eval_mode, &self.parquet_options.timezone, @@ -151,7 +224,7 @@ impl SparkPhysicalExprAdapter { cast_options.allow_cast_unsigned_ints = self.parquet_options.allow_cast_unsigned_ints; cast_options.is_adapting_schema = true; - let spark_cast = Arc::new(Cast::new(child, target_type, cast_options)); + let spark_cast = Arc::new(Cast::new(child, target_type.clone(), cast_options)); return Ok(Transformed::yes(spark_cast as Arc)); } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 206ac17260..1126300452 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1252,6 +1252,21 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // Test with pattern at end val queryEnd = sql(s"select id from $table where contains (name, 'Smith')") checkSparkAnswerAndOperator(queryEnd) + + // Test with null haystack + sql(s"insert into $table values(6, null)") + checkSparkAnswerAndOperator(sql(s"select id, contains(name, 'Rose') from $table")) + + // Test case sensitivity (should not match) + checkSparkAnswerAndOperator(sql(s"select id from $table where contains(name, 'james')")) + } + } + + test("contains with both columns") { + withParquetTable( + Seq(("hello world", "world"), ("foo bar", "baz"), ("abc", ""), (null, "x"), ("test", null)), + "tbl") { + checkSparkAnswerAndOperator(sql("select contains(_1, _2) from tbl")) } }