Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion native/spark-expr/src/comet_scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor,
spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad,
spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains, SparkDateDiff,
SparkDateTrunc, SparkSizeFunc, SparkStringSpace,
SparkDateTrunc, SparkSecondsToTimestamp, SparkSizeFunc, SparkStringSpace,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
Expand Down Expand Up @@ -195,6 +195,7 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
Arc::new(ScalarUDF::new_from_impl(SparkContains::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())),
Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
]
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/datetime_funcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
mod date_diff;
mod date_trunc;
mod extract_date_part;
mod seconds_to_timestamp;
mod timestamp_trunc;
mod unix_timestamp;

Expand All @@ -26,5 +27,6 @@ pub use date_trunc::SparkDateTrunc;
pub use extract_date_part::SparkHour;
pub use extract_date_part::SparkMinute;
pub use extract_date_part::SparkSecond;
pub use seconds_to_timestamp::SparkSecondsToTimestamp;
pub use timestamp_trunc::TimestampTruncExpr;
pub use unix_timestamp::SparkUnixTimestamp;
126 changes: 126 additions & 0 deletions native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Array, Float64Array, Int32Array, Int64Array, TimestampMicrosecondArray};
use arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{utils::take_function_args, DataFusionError, Result};
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

const MICROS_PER_SECOND: i64 = 1_000_000;

/// Spark-compatible seconds_to_timestamp (timestamp_seconds) function.
/// Converts seconds since Unix epoch to a timestamp.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkSecondsToTimestamp {
signature: Signature,
aliases: Vec<String>,
}

impl SparkSecondsToTimestamp {
pub fn new() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int32]),
TypeSignature::Exact(vec![DataType::Int64]),
TypeSignature::Exact(vec![DataType::Float64]),
],
Volatility::Immutable,
),
aliases: vec!["timestamp_seconds".to_string()],
}
}
}

impl Default for SparkSecondsToTimestamp {
fn default() -> Self {
Self::new()
}
}

impl ScalarUDFImpl for SparkSecondsToTimestamp {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"seconds_to_timestamp"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let [seconds] = take_function_args(self.name(), args.args)?;

let arr = seconds.into_array(1)?;

// Handle Int32 input
if let Some(int_array) = arr.as_any().downcast_ref::<Int32Array>() {
let result: TimestampMicrosecondArray = int_array
.iter()
.map(|opt| opt.map(|s| (s as i64) * MICROS_PER_SECOND))
.collect();
return Ok(ColumnarValue::Array(Arc::new(result)));
}

// Handle Int64 input
if let Some(int_array) = arr.as_any().downcast_ref::<Int64Array>() {
let result: TimestampMicrosecondArray = int_array
.iter()
.map(|opt| opt.and_then(|s| s.checked_mul(MICROS_PER_SECOND)))
.collect();
return Ok(ColumnarValue::Array(Arc::new(result)));
}

// Handle Float64 input
if let Some(float_array) = arr.as_any().downcast_ref::<Float64Array>() {
let result: TimestampMicrosecondArray = float_array
.iter()
.map(|opt| {
opt.and_then(|s| {
if s.is_nan() || s.is_infinite() {
None // NaN and Infinite return null per Spark behavior
} else {
let micros = s * (MICROS_PER_SECOND as f64);
Some(micros as i64)
}
})
})
.collect();
return Ok(ColumnarValue::Array(Arc::new(result)));
}

Err(DataFusionError::Execution(format!(
"seconds_to_timestamp expects Int32, Int64 or Float64 input, got {:?}",
arr.data_type()
)))
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}
4 changes: 2 additions & 2 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ pub use comet_scalar_funcs::{
};
pub use csv_funcs::*;
pub use datetime_funcs::{
SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp,
TimestampTruncExpr,
SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkSecondsToTimestamp,
SparkUnixTimestamp, TimestampTruncExpr,
};
pub use error::{SparkError, SparkResult};
pub use hash_funcs::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[Hour] -> CometHour,
classOf[Minute] -> CometMinute,
classOf[Second] -> CometSecond,
classOf[SecondsToTimestamp] -> CometSecondsToTimestamp,
classOf[TruncDate] -> CometTruncDate,
classOf[TruncTimestamp] -> CometTruncTimestamp,
classOf[UnixTimestamp] -> CometUnixTimestamp,
Expand Down
5 changes: 4 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, SecondsToTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -310,6 +310,9 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add")

object CometDateSub extends CometScalarFunction[DateSub]("date_sub")

object CometSecondsToTimestamp
extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp")

object CometLastDay extends CometScalarFunction[LastDay]("last_day")

object CometDateDiff extends CometScalarFunction[DateDiff]("date_diff")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.sql.session.timeZone=UTC
-- ConfigMatrix: parquet.enable.dictionary=false,true

statement
CREATE TABLE test_timestamp_seconds(c0 bigint) USING parquet

statement
INSERT INTO test_timestamp_seconds VALUES (0), (1640995200), (-86400), (4102444800), (-2208988800), (NULL)

-- column argument
query
SELECT c0, timestamp_seconds(c0) FROM test_timestamp_seconds

-- literal arguments
query
SELECT timestamp_seconds(0)

query
SELECT timestamp_seconds(1640995200)

-- negative value (before epoch)
query
SELECT timestamp_seconds(-86400)

-- decimal seconds (fractional)
query
SELECT timestamp_seconds(CAST(1640995200.123 AS DOUBLE))

-- null handling
query
SELECT timestamp_seconds(NULL)
Loading