Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/iceberg_spark_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
restore-keys: |
${{ runner.os }}-cargo-ci-
${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-

- name: Build native library
# Use CI profile for faster builds (no LTO) and to share cache with pr_build_linux.yml.
Expand All @@ -88,7 +88,7 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}

- name: Upload native library
uses: actions/upload-artifact@v6
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
restore-keys: |
${{ runner.os }}-cargo-ci-
${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-

- name: Build native library (CI profile)
run: |
Expand All @@ -112,7 +112,7 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}

# Run Rust tests (runs in parallel with build-native, uses debug builds)
linux-test-rust:
Expand All @@ -138,9 +138,9 @@ jobs:
~/.cargo/git
native/target
# Note: Java version intentionally excluded - Rust target is JDK-independent
key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
restore-keys: |
${{ runner.os }}-cargo-debug-
${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-

- name: Rust test steps
uses: ./.github/actions/rust-test
Expand All @@ -153,7 +153,7 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-debug-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}

linux-test:
needs: build-native
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
restore-keys: |
${{ runner.os }}-cargo-ci-
${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-

- name: Build native library (CI profile)
run: |
Expand All @@ -112,7 +112,7 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}

macos-aarch64-test:
needs: build-native
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}
restore-keys: |
${{ runner.os }}-cargo-ci-
${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-

- name: Build native library (CI profile)
run: |
Expand All @@ -101,7 +101,7 @@ jobs:
~/.cargo/registry
~/.cargo/git
native/target
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}
key: ${{ runner.os }}-cargo-ci-${{ hashFiles('native/**/Cargo.lock', 'native/**/Cargo.toml') }}-${{ hashFiles('native/**/*.rs') }}

spark-sql-test:
needs: build-native
Expand Down
112 changes: 102 additions & 10 deletions native/core/src/parquet/cast_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
// specific language governing permissions and limitations
// under the License.
use arrow::{
array::{ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray},
array::{make_array, ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray},
compute::CastOptions,
datatypes::{DataType, FieldRef, Schema, TimeUnit},
record_batch::RecordBatch,
};

use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
use datafusion::common::format::DEFAULT_CAST_OPTIONS;
use datafusion::common::Result as DataFusionResult;
use datafusion::common::ScalarValue;
Expand All @@ -33,6 +34,59 @@ use std::{
sync::Arc,
};

/// Returns true if two DataTypes are structurally equivalent (same data layout)
/// but may differ in field names within nested types.
fn types_differ_only_in_field_names(physical: &DataType, logical: &DataType) -> bool {
match (physical, logical) {
(DataType::List(pf), DataType::List(lf)) => {
pf.is_nullable() == lf.is_nullable()
&& (pf.data_type() == lf.data_type()
|| types_differ_only_in_field_names(pf.data_type(), lf.data_type()))
}
(DataType::LargeList(pf), DataType::LargeList(lf)) => {
pf.is_nullable() == lf.is_nullable()
&& (pf.data_type() == lf.data_type()
|| types_differ_only_in_field_names(pf.data_type(), lf.data_type()))
}
(DataType::Map(pf, p_sorted), DataType::Map(lf, l_sorted)) => {
p_sorted == l_sorted
&& pf.is_nullable() == lf.is_nullable()
&& (pf.data_type() == lf.data_type()
|| types_differ_only_in_field_names(pf.data_type(), lf.data_type()))
}
(DataType::Struct(pfields), DataType::Struct(lfields)) => {
// For Struct types, field names are semantically meaningful (they
// identify different columns), so we require name equality here.
// This distinguishes from List/Map wrapper field names ("item" vs
// "element") which are purely cosmetic.
pfields.len() == lfields.len()
&& pfields.iter().zip(lfields.iter()).all(|(pf, lf)| {
pf.name() == lf.name()
&& pf.is_nullable() == lf.is_nullable()
&& (pf.data_type() == lf.data_type()
|| types_differ_only_in_field_names(pf.data_type(), lf.data_type()))
})
}
_ => false,
}
}

/// Recursively relabel an array so its DataType matches `target_type`.
/// This only changes metadata (field names, nullability flags in nested fields);
/// it does NOT change the underlying buffer data.
fn relabel_array(array: ArrayRef, target_type: &DataType) -> ArrayRef {
if array.data_type() == target_type {
return array;
}
let data = array.to_data();
let new_data = data
.into_builder()
.data_type(target_type.clone())
.build()
.expect("relabel_array: data layout must be compatible");
make_array(new_data)
}

/// Casts a Timestamp(Microsecond) array to Timestamp(Millisecond) by dividing values by 1000.
/// Preserves the timezone from the target type.
fn cast_timestamp_micros_to_millis_array(
Expand Down Expand Up @@ -79,6 +133,9 @@ pub struct CometCastColumnExpr {
target_field: FieldRef,
/// Options forwarded to [`cast_column`].
cast_options: CastOptions<'static>,
/// Spark parquet options for complex nested type conversions.
/// When present, enables `spark_parquet_convert` as a fallback.
parquet_options: Option<SparkParquetOptions>,
}

// Manually derive `PartialEq`/`Hash` as `Arc<dyn PhysicalExpr>` does not
Expand All @@ -89,6 +146,7 @@ impl PartialEq for CometCastColumnExpr {
&& self.input_physical_field.eq(&other.input_physical_field)
&& self.target_field.eq(&other.target_field)
&& self.cast_options.eq(&other.cast_options)
&& self.parquet_options.eq(&other.parquet_options)
}
}

Expand All @@ -98,6 +156,7 @@ impl Hash for CometCastColumnExpr {
self.input_physical_field.hash(state);
self.target_field.hash(state);
self.cast_options.hash(state);
self.parquet_options.hash(state);
}
}

Expand All @@ -114,8 +173,15 @@ impl CometCastColumnExpr {
input_physical_field: physical_field,
target_field,
cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS),
parquet_options: None,
}
}

/// Set Spark parquet options to enable complex nested type conversions.
pub fn with_parquet_options(mut self, options: SparkParquetOptions) -> Self {
self.parquet_options = Some(options);
self
}
}

impl Display for CometCastColumnExpr {
Expand Down Expand Up @@ -145,18 +211,17 @@ impl PhysicalExpr for CometCastColumnExpr {
fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
let value = self.expr.evaluate(batch)?;

if value
.data_type()
.equals_datatype(self.target_field.data_type())
{
// Use == (PartialEq) instead of equals_datatype because equals_datatype
// ignores field names in nested types (Struct, List, Map). We need to detect
// when field names differ (e.g., Struct("a","b") vs Struct("c","d")) so that
// we can apply spark_parquet_convert for field-name-based selection.
if value.data_type() == *self.target_field.data_type() {
return Ok(value);
}

let input_physical_field = self.input_physical_field.data_type();
let target_field = self.target_field.data_type();

// dbg!(&input_physical_field, &target_field, &value);

// Handle specific type conversions with custom casts
match (input_physical_field, target_field) {
// Timestamp(Microsecond) -> Timestamp(Millisecond)
Expand All @@ -174,7 +239,30 @@ impl PhysicalExpr for CometCastColumnExpr {
}
_ => Ok(value),
},
_ => Ok(value),
// Nested types that differ only in field names (e.g., List element named
// "item" vs "element", or Map entries named "key_value" vs "entries").
// Re-label the array so the DataType metadata matches the logical schema.
(physical, logical)
if physical != logical && types_differ_only_in_field_names(physical, logical) =>
{
match value {
ColumnarValue::Array(array) => {
let relabeled = relabel_array(array, logical);
Ok(ColumnarValue::Array(relabeled))
}
other => Ok(other),
}
}
// Fallback: use spark_parquet_convert for complex nested type conversions
// (e.g., List<Struct{a,b,c}> → List<Struct{a,c}>, Map field selection, etc.)
_ => {
if let Some(parquet_options) = &self.parquet_options {
let converted = spark_parquet_convert(value, target_field, parquet_options)?;
Ok(converted)
} else {
Ok(value)
}
}
}
}

Expand All @@ -192,12 +280,16 @@ impl PhysicalExpr for CometCastColumnExpr {
) -> DataFusionResult<Arc<dyn PhysicalExpr>> {
assert_eq!(children.len(), 1);
let child = children.pop().expect("CastColumnExpr child");
Ok(Arc::new(Self::new(
let mut new_expr = Self::new(
child,
Arc::clone(&self.input_physical_field),
Arc::clone(&self.target_field),
Some(self.cast_options.clone()),
)))
);
if let Some(opts) = &self.parquet_options {
new_expr = new_expr.with_parquet_options(opts.clone());
}
Ok(Arc::new(new_expr))
}

fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
Loading
Loading