diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index ea89c43204..3210429877 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -80,6 +80,10 @@ harness = false name = "padding" harness = false +[[bench]] +name = "timestamp_trunc" +harness = false + [[test]] name = "test_udf_registration" path = "tests/spark_expr_reg.rs" diff --git a/native/spark-expr/benches/timestamp_trunc.rs b/native/spark-expr/benches/timestamp_trunc.rs new file mode 100644 index 0000000000..2089a7d4ae --- /dev/null +++ b/native/spark-expr/benches/timestamp_trunc.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, RecordBatch, TimestampMicrosecondArray}; +use arrow::datatypes::{Field, Schema}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::common::ScalarValue; +use datafusion::physical_expr::expressions::{Column, Literal}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_spark_expr::TimestampTruncExpr; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 10000; + +fn create_timestamp_batch(timezone: &str) -> RecordBatch { + // Generate timestamps spread across time + // Each row is 1 billion microseconds (~16.7 minutes) apart + // This gives us about 115 days of spread across 10000 rows + let mut vec: Vec> = Vec::with_capacity(NUM_ROWS); + for i in 0..NUM_ROWS { + if i % 100 == 0 { + // Add some nulls (1% of data) + vec.push(None); + } else { + vec.push(Some(i as i64 * 1_000_000_001)); + } + } + + let array = TimestampMicrosecondArray::from(vec).with_timezone(timezone); + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + array.data_type().clone(), + true, + )])); + + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() +} + +fn make_col(name: &str, index: usize) -> Arc { + Arc::new(Column::new(name, index)) +} + +fn make_format_literal(format: &str) -> Arc { + Arc::new(Literal::new(ScalarValue::Utf8(Some(format.to_string())))) +} + +fn benchmark_utc(c: &mut Criterion) { + let timezone = "UTC"; + let batch = create_timestamp_batch(timezone); + let ts_col = make_col("ts", 0); + + let formats = vec![ + "YEAR", + "QUARTER", + "MONTH", + "WEEK", + "DAY", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND", + "MICROSECOND", + ]; + + let mut group = c.benchmark_group("timestamp_trunc_utc"); + + for format in formats { + let expr = TimestampTruncExpr::new( + Arc::clone(&ts_col), + make_format_literal(format), + timezone.to_string(), + ); + + group.bench_function(format, |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + } + + group.finish(); +} + +fn benchmark_new_york(c: &mut Criterion) { + let timezone = "America/New_York"; + let batch = create_timestamp_batch(timezone); + let ts_col = make_col("ts", 0); + + // Test key formats that showed regression in Spark benchmarks + let formats = vec!["YEAR", "MONTH", "DAY", "SECOND"]; + + let mut group = c.benchmark_group("timestamp_trunc_new_york"); + + for format in formats { + let expr = TimestampTruncExpr::new( + Arc::clone(&ts_col), + make_format_literal(format), + timezone.to_string(), + ); + + group.bench_function(format, |b| { + b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap())) + }); + } + + group.finish(); +} + +fn config() -> Criterion { + Criterion::default() +} + +criterion_group! { + name = benches; + config = config(); + targets = benchmark_utc, benchmark_new_york +} +criterion_main!(benches); diff --git a/native/spark-expr/src/kernels/temporal.rs b/native/spark-expr/src/kernels/temporal.rs index 09e2c905c7..fb00196fe8 100644 --- a/native/spark-expr/src/kernels/temporal.rs +++ b/native/spark-expr/src/kernels/temporal.rs @@ -46,6 +46,69 @@ macro_rules! return_compute_error_with { // and the beginning of the Unix Epoch (1970-01-01) const DAYS_TO_UNIX_EPOCH: i32 = 719_163; +// Microseconds per time unit - used for fast arithmetic-based truncation +const MICROS_PER_MILLISECOND: i64 = 1_000; +const MICROS_PER_SECOND: i64 = 1_000_000; +const MICROS_PER_MINUTE: i64 = 60 * MICROS_PER_SECOND; +const MICROS_PER_HOUR: i64 = 60 * MICROS_PER_MINUTE; + +/// Fast arithmetic-based timestamp truncation for sub-day formats. +/// These operations are timezone-independent because they only affect +/// the time portion which is the same across all timezones. +/// +/// For example, truncating 14:30:45.123456 to MINUTE gives 14:30:00.000000 +/// regardless of whether this is interpreted as UTC or America/New_York. +#[inline] +fn truncate_micros_to_unit(micros: i64, unit_size: i64) -> i64 { + // For positive timestamps, simple floor division works + // For negative timestamps (before 1970), we need to round toward negative infinity + if micros >= 0 { + micros - (micros % unit_size) + } else { + // For negative numbers, we want floor division behavior + // e.g., -1 truncated to seconds should be -1_000_000, not 0 + let remainder = micros % unit_size; + if remainder == 0 { + micros + } else { + micros - remainder - unit_size + } + } +} + +/// Truncate timestamp array using fast arithmetic for sub-day formats. +/// Returns Ok(Some(array)) if the format was handled, Ok(None) if not applicable. +fn try_truncate_timestamp_arithmetic( + array: &PrimitiveArray, + format: &str, +) -> Result, SparkError> +where + T: ArrowTemporalType + ArrowNumericType, + i64: From, +{ + let unit_size = match format { + "MICROSECOND" => { + // Microsecond truncation is a no-op for microsecond timestamps + // Just copy the values directly + let result: TimestampMicrosecondArray = + array.iter().map(|opt| opt.map(i64::from)).collect(); + return Ok(Some(result)); + } + "MILLISECOND" => MICROS_PER_MILLISECOND, + "SECOND" => MICROS_PER_SECOND, + "MINUTE" => MICROS_PER_MINUTE, + "HOUR" => MICROS_PER_HOUR, + _ => return Ok(None), // Format requires timezone-aware handling + }; + + let result: TimestampMicrosecondArray = array + .iter() + .map(|opt| opt.map(|v| truncate_micros_to_unit(i64::from(v), unit_size))) + .collect(); + + Ok(Some(result)) +} + // Copied from arrow_arith/temporal.rs with modification to the output datatype // Transforms a array of NaiveDate to an array of Date32 after applying an operation fn as_datetime_with_op, T: ArrowTemporalType, F>( @@ -530,66 +593,48 @@ where T: ArrowTemporalType + ArrowNumericType, i64: From, { + let format_upper = format.to_uppercase(); + + // Fast path: sub-day truncations can use simple arithmetic + // These are timezone-independent since they only affect the time portion + if let Some(result) = try_truncate_timestamp_arithmetic(array, &format_upper)? { + return Ok(result); + } + + // Slow path: date-level truncations require timezone-aware DateTime operations let builder = TimestampMicrosecondBuilder::with_capacity(array.len()); let iter = ArrayIter::new(array); match array.data_type() { - DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { - match format.to_uppercase().as_str() { - "YEAR" | "YYYY" | "YY" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_year(dt)) - }) - } - "QUARTER" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_quarter(dt)) - }) - } - "MONTH" | "MON" | "MM" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_month(dt)) - }) - } - "WEEK" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_week(dt)) - }) - } - "DAY" | "DD" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_day(dt)) - }) - } - "HOUR" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_hour(dt)) - }) - } - "MINUTE" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_minute(dt)) - }) - } - "SECOND" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_second(dt)) - }) - } - "MILLISECOND" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_ms(dt)) - }) - } - "MICROSECOND" => { - as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { - as_micros_from_unix_epoch_utc(trunc_date_to_microsec(dt)) - }) - } - _ => Err(SparkError::Internal(format!( - "Unsupported format: {format:?} for function 'timestamp_trunc'" - ))), + DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => match format_upper.as_str() { + "YEAR" | "YYYY" | "YY" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_year(dt)) + }) } - } + "QUARTER" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_quarter(dt)) + }) + } + "MONTH" | "MON" | "MM" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_month(dt)) + }) + } + "WEEK" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_week(dt)) + }) + } + "DAY" | "DD" => { + as_timestamp_tz_with_op::<&PrimitiveArray, T, _>(iter, builder, tz, |dt| { + as_micros_from_unix_epoch_utc(trunc_date_to_day(dt)) + }) + } + _ => Err(SparkError::Internal(format!( + "Unsupported format: {format:?} for function 'timestamp_trunc'" + ))), + }, dt => return_compute_error_with!( "Unsupported input type '{:?}' for function 'timestamp_trunc'", dt