Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ harness = false
name = "padding"
harness = false

[[bench]]
name = "timestamp_trunc"
harness = false

[[test]]
name = "test_udf_registration"
path = "tests/spark_expr_reg.rs"
131 changes: 131 additions & 0 deletions native/spark-expr/benches/timestamp_trunc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Array, RecordBatch, TimestampMicrosecondArray};
use arrow::datatypes::{Field, Schema};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::common::ScalarValue;
use datafusion::physical_expr::expressions::{Column, Literal};
use datafusion::physical_expr::PhysicalExpr;
use datafusion_comet_spark_expr::TimestampTruncExpr;
use std::hint::black_box;
use std::sync::Arc;

const NUM_ROWS: usize = 10000;

fn create_timestamp_batch(timezone: &str) -> RecordBatch {
// Generate timestamps spread across time
// Each row is 1 billion microseconds (~16.7 minutes) apart
// This gives us about 115 days of spread across 10000 rows
let mut vec: Vec<Option<i64>> = Vec::with_capacity(NUM_ROWS);
for i in 0..NUM_ROWS {
if i % 100 == 0 {
// Add some nulls (1% of data)
vec.push(None);
} else {
vec.push(Some(i as i64 * 1_000_000_001));
}
}

let array = TimestampMicrosecondArray::from(vec).with_timezone(timezone);
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
array.data_type().clone(),
true,
)]));

RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
}

fn make_col(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(Column::new(name, index))
}

fn make_format_literal(format: &str) -> Arc<dyn PhysicalExpr> {
Arc::new(Literal::new(ScalarValue::Utf8(Some(format.to_string()))))
}

fn benchmark_utc(c: &mut Criterion) {
let timezone = "UTC";
let batch = create_timestamp_batch(timezone);
let ts_col = make_col("ts", 0);

let formats = vec![
"YEAR",
"QUARTER",
"MONTH",
"WEEK",
"DAY",
"HOUR",
"MINUTE",
"SECOND",
"MILLISECOND",
"MICROSECOND",
];

let mut group = c.benchmark_group("timestamp_trunc_utc");

for format in formats {
let expr = TimestampTruncExpr::new(
Arc::clone(&ts_col),
make_format_literal(format),
timezone.to_string(),
);

group.bench_function(format, |b| {
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
}

group.finish();
}

fn benchmark_new_york(c: &mut Criterion) {
let timezone = "America/New_York";
let batch = create_timestamp_batch(timezone);
let ts_col = make_col("ts", 0);

// Test key formats that showed regression in Spark benchmarks
let formats = vec!["YEAR", "MONTH", "DAY", "SECOND"];

let mut group = c.benchmark_group("timestamp_trunc_new_york");

for format in formats {
let expr = TimestampTruncExpr::new(
Arc::clone(&ts_col),
make_format_literal(format),
timezone.to_string(),
);

group.bench_function(format, |b| {
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
}

group.finish();
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = benchmark_utc, benchmark_new_york
}
criterion_main!(benches);
157 changes: 101 additions & 56 deletions native/spark-expr/src/kernels/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,69 @@ macro_rules! return_compute_error_with {
// and the beginning of the Unix Epoch (1970-01-01)
const DAYS_TO_UNIX_EPOCH: i32 = 719_163;

// Microseconds per time unit - used for fast arithmetic-based truncation
const MICROS_PER_MILLISECOND: i64 = 1_000;
const MICROS_PER_SECOND: i64 = 1_000_000;
const MICROS_PER_MINUTE: i64 = 60 * MICROS_PER_SECOND;
const MICROS_PER_HOUR: i64 = 60 * MICROS_PER_MINUTE;

/// Fast arithmetic-based timestamp truncation for sub-day formats.
/// These operations are timezone-independent because they only affect
/// the time portion which is the same across all timezones.
///
/// For example, truncating 14:30:45.123456 to MINUTE gives 14:30:00.000000
/// regardless of whether this is interpreted as UTC or America/New_York.
#[inline]
fn truncate_micros_to_unit(micros: i64, unit_size: i64) -> i64 {
// For positive timestamps, simple floor division works
// For negative timestamps (before 1970), we need to round toward negative infinity
if micros >= 0 {
micros - (micros % unit_size)
} else {
// For negative numbers, we want floor division behavior
// e.g., -1 truncated to seconds should be -1_000_000, not 0
let remainder = micros % unit_size;
if remainder == 0 {
micros
} else {
micros - remainder - unit_size
}
}
}

/// Truncate timestamp array using fast arithmetic for sub-day formats.
/// Returns Ok(Some(array)) if the format was handled, Ok(None) if not applicable.
fn try_truncate_timestamp_arithmetic<T>(
array: &PrimitiveArray<T>,
format: &str,
) -> Result<Option<TimestampMicrosecondArray>, SparkError>
where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let unit_size = match format {
"MICROSECOND" => {
// Microsecond truncation is a no-op for microsecond timestamps
// Just copy the values directly
let result: TimestampMicrosecondArray =
array.iter().map(|opt| opt.map(i64::from)).collect();
return Ok(Some(result));
}
"MILLISECOND" => MICROS_PER_MILLISECOND,
"SECOND" => MICROS_PER_SECOND,
"MINUTE" => MICROS_PER_MINUTE,
"HOUR" => MICROS_PER_HOUR,
_ => return Ok(None), // Format requires timezone-aware handling
};

let result: TimestampMicrosecondArray = array
.iter()
.map(|opt| opt.map(|v| truncate_micros_to_unit(i64::from(v), unit_size)))
.collect();

Ok(Some(result))
}

// Copied from arrow_arith/temporal.rs with modification to the output datatype
// Transforms a array of NaiveDate to an array of Date32 after applying an operation
fn as_datetime_with_op<A: ArrayAccessor<Item = T::Native>, T: ArrowTemporalType, F>(
Expand Down Expand Up @@ -530,66 +593,48 @@ where
T: ArrowTemporalType + ArrowNumericType,
i64: From<T::Native>,
{
let format_upper = format.to_uppercase();

// Fast path: sub-day truncations can use simple arithmetic
// These are timezone-independent since they only affect the time portion
if let Some(result) = try_truncate_timestamp_arithmetic(array, &format_upper)? {
return Ok(result);
}

// Slow path: date-level truncations require timezone-aware DateTime operations
let builder = TimestampMicrosecondBuilder::with_capacity(array.len());
let iter = ArrayIter::new(array);
match array.data_type() {
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
match format.to_uppercase().as_str() {
"YEAR" | "YYYY" | "YY" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_year(dt))
})
}
"QUARTER" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_quarter(dt))
})
}
"MONTH" | "MON" | "MM" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_month(dt))
})
}
"WEEK" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_week(dt))
})
}
"DAY" | "DD" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_day(dt))
})
}
"HOUR" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_hour(dt))
})
}
"MINUTE" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_minute(dt))
})
}
"SECOND" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_second(dt))
})
}
"MILLISECOND" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_ms(dt))
})
}
"MICROSECOND" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_microsec(dt))
})
}
_ => Err(SparkError::Internal(format!(
"Unsupported format: {format:?} for function 'timestamp_trunc'"
))),
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => match format_upper.as_str() {
"YEAR" | "YYYY" | "YY" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_year(dt))
})
}
}
"QUARTER" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_quarter(dt))
})
}
"MONTH" | "MON" | "MM" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_month(dt))
})
}
"WEEK" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_week(dt))
})
}
"DAY" | "DD" => {
as_timestamp_tz_with_op::<&PrimitiveArray<T>, T, _>(iter, builder, tz, |dt| {
as_micros_from_unix_epoch_utc(trunc_date_to_day(dt))
})
}
_ => Err(SparkError::Internal(format!(
"Unsupported format: {format:?} for function 'timestamp_trunc'"
))),
},
dt => return_compute_error_with!(
"Unsupported input type '{:?}' for function 'timestamp_trunc'",
dt
Expand Down
Loading