diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 6647e01cc8..7dfe28ffdd 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -23,7 +23,7 @@ use crate::{ spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains, SparkDateDiff, - SparkDateTrunc, SparkSizeFunc, SparkStringSpace, + SparkDateTrunc, SparkSecondsToTimestamp, SparkSizeFunc, SparkStringSpace, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -195,6 +195,7 @@ fn all_scalar_functions() -> Vec> { Arc::new(ScalarUDF::new_from_impl(SparkContains::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), + Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())), Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), ] diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index 1832711479..940ab62dc8 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -18,6 +18,7 @@ mod date_diff; mod date_trunc; mod extract_date_part; +mod seconds_to_timestamp; mod timestamp_trunc; mod unix_timestamp; @@ -26,5 +27,6 @@ pub use date_trunc::SparkDateTrunc; pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; +pub use seconds_to_timestamp::SparkSecondsToTimestamp; pub use timestamp_trunc::TimestampTruncExpr; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs b/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs new file mode 100644 index 0000000000..0e2a5a2f97 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, Float64Array, Int32Array, Int64Array, TimestampMicrosecondArray}; +use arrow::datatypes::{DataType, TimeUnit}; +use datafusion::common::{utils::take_function_args, DataFusionError, Result}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +/// Spark-compatible seconds_to_timestamp (timestamp_seconds) function. +/// Converts seconds since Unix epoch to a timestamp. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkSecondsToTimestamp { + signature: Signature, + aliases: Vec, +} + +impl SparkSecondsToTimestamp { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Int32]), + TypeSignature::Exact(vec![DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float64]), + ], + Volatility::Immutable, + ), + aliases: vec!["timestamp_seconds".to_string()], + } + } +} + +impl Default for SparkSecondsToTimestamp { + fn default() -> Self { + Self::new() + } +} + +impl ScalarUDFImpl for SparkSecondsToTimestamp { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "seconds_to_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [seconds] = take_function_args(self.name(), args.args)?; + + let arr = seconds.into_array(1)?; + + // Handle Int32 input + if let Some(int_array) = arr.as_any().downcast_ref::() { + let result: TimestampMicrosecondArray = int_array + .iter() + .map(|opt| opt.map(|s| (s as i64) * MICROS_PER_SECOND)) + .collect(); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + // Handle Int64 input + if let Some(int_array) = arr.as_any().downcast_ref::() { + let result: TimestampMicrosecondArray = int_array + .iter() + .map(|opt| opt.and_then(|s| s.checked_mul(MICROS_PER_SECOND))) + .collect(); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + // Handle Float64 input + if let Some(float_array) = arr.as_any().downcast_ref::() { + let result: TimestampMicrosecondArray = float_array + .iter() + .map(|opt| { + opt.and_then(|s| { + if s.is_nan() || s.is_infinite() { + None // NaN and Infinite return null per Spark behavior + } else { + let micros = s * (MICROS_PER_SECOND as f64); + Some(micros as i64) + } + }) + }) + .collect(); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + Err(DataFusionError::Execution(format!( + "seconds_to_timestamp expects Int32, Int64 or Float64 input, got {:?}", + arr.data_type() + ))) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 52cf2a2ac3..8ab9746ccb 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -72,8 +72,8 @@ pub use comet_scalar_funcs::{ }; pub use csv_funcs::*; pub use datetime_funcs::{ - SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, - TimestampTruncExpr, + SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkSecondsToTimestamp, + SparkUnixTimestamp, TimestampTruncExpr, }; pub use error::{SparkError, SparkResult}; pub use hash_funcs::*; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3b1ab3642a..3fbf005017 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -199,6 +199,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Hour] -> CometHour, classOf[Minute] -> CometMinute, classOf[Second] -> CometSecond, + classOf[SecondsToTimestamp] -> CometSecondsToTimestamp, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, classOf[UnixTimestamp] -> CometUnixTimestamp, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index a623146916..ae5fc266f2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, SecondsToTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -310,6 +310,9 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add") object CometDateSub extends CometScalarFunction[DateSub]("date_sub") +object CometSecondsToTimestamp + extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") + object CometLastDay extends CometScalarFunction[LastDay]("last_day") object CometDateDiff extends CometScalarFunction[DateDiff]("date_diff") diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql new file mode 100644 index 0000000000..504d9fc38d --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql @@ -0,0 +1,48 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Config: spark.sql.session.timeZone=UTC +-- ConfigMatrix: parquet.enable.dictionary=false,true + +statement +CREATE TABLE test_timestamp_seconds(c0 bigint) USING parquet + +statement +INSERT INTO test_timestamp_seconds VALUES (0), (1640995200), (-86400), (4102444800), (-2208988800), (NULL) + +-- column argument +query +SELECT c0, timestamp_seconds(c0) FROM test_timestamp_seconds + +-- literal arguments +query +SELECT timestamp_seconds(0) + +query +SELECT timestamp_seconds(1640995200) + +-- negative value (before epoch) +query +SELECT timestamp_seconds(-86400) + +-- decimal seconds (fractional) +query +SELECT timestamp_seconds(CAST(1640995200.123 AS DOUBLE)) + +-- null handling +query +SELECT timestamp_seconds(NULL)