From 7ca1f89cf005086c20d414a670e7d147fdd409e8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 16:12:23 -0700 Subject: [PATCH 1/3] feat: add support for timestamp_seconds expression Adds native Comet support for Spark's timestamp_seconds (SecondsToTimestamp) function, which converts seconds since Unix epoch to a timestamp. Supports Int32, Int64, and Float64 inputs. NaN and Infinite float values return null per Spark behavior. Closes #3111 Co-Authored-By: Claude Opus 4.5 --- native/spark-expr/src/comet_scalar_funcs.rs | 5 +- native/spark-expr/src/datetime_funcs/mod.rs | 2 + .../datetime_funcs/seconds_to_timestamp.rs | 126 ++++++++++++++++++ native/spark-expr/src/lib.rs | 5 +- .../apache/comet/serde/QueryPlanSerde.scala | 1 + .../org/apache/comet/serde/datetime.scala | 5 +- .../comet/CometTemporalExpressionSuite.scala | 42 ++++++ 7 files changed, 182 insertions(+), 4 deletions(-) create mode 100644 native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 8384a4646a..914af6f066 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -22,8 +22,8 @@ use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, - spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkDateTrunc, SparkSizeFunc, - SparkStringSpace, + spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkDateTrunc, + SparkSecondsToTimestamp, SparkSizeFunc, SparkStringSpace, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -193,6 +193,7 @@ fn all_scalar_functions() -> Vec> { vec![ Arc::new(ScalarUDF::new_from_impl(SparkBitwiseCount::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), + Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())), Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), ] diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index ef8041e5fe..99a04d7ee5 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -17,10 +17,12 @@ mod date_trunc; mod extract_date_part; +mod seconds_to_timestamp; mod timestamp_trunc; pub use date_trunc::SparkDateTrunc; pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; +pub use seconds_to_timestamp::SparkSecondsToTimestamp; pub use timestamp_trunc::TimestampTruncExpr; diff --git a/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs b/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs new file mode 100644 index 0000000000..0e2a5a2f97 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, Float64Array, Int32Array, Int64Array, TimestampMicrosecondArray}; +use arrow::datatypes::{DataType, TimeUnit}; +use datafusion::common::{utils::take_function_args, DataFusionError, Result}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +/// Spark-compatible seconds_to_timestamp (timestamp_seconds) function. +/// Converts seconds since Unix epoch to a timestamp. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkSecondsToTimestamp { + signature: Signature, + aliases: Vec, +} + +impl SparkSecondsToTimestamp { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Int32]), + TypeSignature::Exact(vec![DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float64]), + ], + Volatility::Immutable, + ), + aliases: vec!["timestamp_seconds".to_string()], + } + } +} + +impl Default for SparkSecondsToTimestamp { + fn default() -> Self { + Self::new() + } +} + +impl ScalarUDFImpl for SparkSecondsToTimestamp { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "seconds_to_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [seconds] = take_function_args(self.name(), args.args)?; + + let arr = seconds.into_array(1)?; + + // Handle Int32 input + if let Some(int_array) = arr.as_any().downcast_ref::() { + let result: TimestampMicrosecondArray = int_array + .iter() + .map(|opt| opt.map(|s| (s as i64) * MICROS_PER_SECOND)) + .collect(); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + // Handle Int64 input + if let Some(int_array) = arr.as_any().downcast_ref::() { + let result: TimestampMicrosecondArray = int_array + .iter() + .map(|opt| opt.and_then(|s| s.checked_mul(MICROS_PER_SECOND))) + .collect(); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + // Handle Float64 input + if let Some(float_array) = arr.as_any().downcast_ref::() { + let result: TimestampMicrosecondArray = float_array + .iter() + .map(|opt| { + opt.and_then(|s| { + if s.is_nan() || s.is_infinite() { + None // NaN and Infinite return null per Spark behavior + } else { + let micros = s * (MICROS_PER_SECOND as f64); + Some(micros as i64) + } + }) + }) + .collect(); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + Err(DataFusionError::Execution(format!( + "seconds_to_timestamp expects Int32, Int64 or Float64 input, got {:?}", + arr.data_type() + ))) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index f26fd911d8..a2f71050f5 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -69,7 +69,10 @@ pub use comet_scalar_funcs::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, register_all_comet_functions, }; -pub use datetime_funcs::{SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr}; +pub use datetime_funcs::{ + SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkSecondsToTimestamp, + TimestampTruncExpr, +}; pub use error::{SparkError, SparkResult}; pub use hash_funcs::*; pub use json_funcs::{FromJson, ToJson}; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e50b1d80e6..fa75b217c6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -190,6 +190,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Hour] -> CometHour, classOf[Minute] -> CometMinute, classOf[Second] -> CometSecond, + classOf[SecondsToTimestamp] -> CometSecondsToTimestamp, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, classOf[Year] -> CometYear, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index ef2b0f793c..04f8c93466 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, SecondsToTimestamp, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType} import org.apache.spark.unsafe.types.UTF8String @@ -258,6 +258,9 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add") object CometDateSub extends CometScalarFunction[DateSub]("date_sub") +object CometSecondsToTimestamp + extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") + object CometTruncDate extends CometExpressionSerde[TruncDate] { val supportedFormats: Seq[String] = diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 9a23c76d82..1f2e8bdb6f 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -122,4 +122,46 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH StructField("fmt", DataTypes.StringType, true))) FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) } + + test("timestamp_seconds") { + import org.apache.spark.sql.Row + + // Create test data with reasonable epoch seconds (1970 to 2100 range) + // to avoid overflow when multiplying by 1_000_000 + val r = new Random(42) + val testData = (1 to 1000).map { _ => + // Range: -2208988800 (1900) to 4102444800 (2100) + val epochSeconds = r.nextLong() % 4102444800L + Row(if (r.nextDouble() < 0.1) null else epochSeconds) + } + val schema = StructType(Seq(StructField("c0", DataTypes.LongType, true))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), schema) + df.createOrReplaceTempView("tbl") + + // Basic test with random long values (seconds since epoch) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + checkSparkAnswerAndOperator("SELECT c0, timestamp_seconds(c0) FROM tbl ORDER BY c0") + } + + // Disable constant folding to ensure literal expressions are executed by Comet + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + // Test epoch (0 = 1970-01-01 00:00:00) + checkSparkAnswerAndOperator("SELECT timestamp_seconds(0)") + + // Test a known timestamp (1640995200 = 2022-01-01 00:00:00 UTC) + checkSparkAnswerAndOperator("SELECT timestamp_seconds(1640995200)") + + // Test negative value (before epoch) + checkSparkAnswerAndOperator("SELECT timestamp_seconds(-86400)") + + // Test with decimal seconds (fractional seconds) - cast to double + checkSparkAnswerAndOperator("SELECT timestamp_seconds(CAST(1640995200.123 AS DOUBLE))") + + // Test null handling + checkSparkAnswerAndOperator("SELECT timestamp_seconds(NULL)") + } + } } From bbfd11eca1bbab0a0c3dda95edd8302b92cddc11 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 17:31:33 -0700 Subject: [PATCH 2/3] update docs --- docs/source/user-guide/latest/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 1a273ad033..0dc3f9a5c6 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -300,6 +300,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.Round.enabled` | Enable Comet acceleration for `Round` | true | | `spark.comet.expression.ScalarSubquery.enabled` | Enable Comet acceleration for `ScalarSubquery` | true | | `spark.comet.expression.Second.enabled` | Enable Comet acceleration for `Second` | true | +| `spark.comet.expression.SecondsToTimestamp.enabled` | Enable Comet acceleration for `SecondsToTimestamp` | true | | `spark.comet.expression.Sha1.enabled` | Enable Comet acceleration for `Sha1` | true | | `spark.comet.expression.Sha2.enabled` | Enable Comet acceleration for `Sha2` | true | | `spark.comet.expression.ShiftLeft.enabled` | Enable Comet acceleration for `ShiftLeft` | true | From 03fb7a6a685b019d604b6028d706fa5ea00d316a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 10 Feb 2026 12:16:56 -0700 Subject: [PATCH 3/3] test: migrate timestamp_seconds tests to SQL file-based approach Co-Authored-By: Claude Opus 4.6 --- .../datetime/timestamp_seconds.sql | 48 +++++++++++++++++++ .../comet/CometTemporalExpressionSuite.scala | 42 ---------------- 2 files changed, 48 insertions(+), 42 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql new file mode 100644 index 0000000000..504d9fc38d --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql @@ -0,0 +1,48 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Config: spark.sql.session.timeZone=UTC +-- ConfigMatrix: parquet.enable.dictionary=false,true + +statement +CREATE TABLE test_timestamp_seconds(c0 bigint) USING parquet + +statement +INSERT INTO test_timestamp_seconds VALUES (0), (1640995200), (-86400), (4102444800), (-2208988800), (NULL) + +-- column argument +query +SELECT c0, timestamp_seconds(c0) FROM test_timestamp_seconds + +-- literal arguments +query +SELECT timestamp_seconds(0) + +query +SELECT timestamp_seconds(1640995200) + +-- negative value (before epoch) +query +SELECT timestamp_seconds(-86400) + +-- decimal seconds (fractional) +query +SELECT timestamp_seconds(CAST(1640995200.123 AS DOUBLE)) + +-- null handling +query +SELECT timestamp_seconds(NULL) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 08d38704b8..1ae6926e05 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -182,48 +182,6 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) } - test("timestamp_seconds") { - import org.apache.spark.sql.Row - - // Create test data with reasonable epoch seconds (1970 to 2100 range) - // to avoid overflow when multiplying by 1_000_000 - val r = new Random(42) - val testData = (1 to 1000).map { _ => - // Range: -2208988800 (1900) to 4102444800 (2100) - val epochSeconds = r.nextLong() % 4102444800L - Row(if (r.nextDouble() < 0.1) null else epochSeconds) - } - val schema = StructType(Seq(StructField("c0", DataTypes.LongType, true))) - val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), schema) - df.createOrReplaceTempView("tbl") - - // Basic test with random long values (seconds since epoch) - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { - checkSparkAnswerAndOperator("SELECT c0, timestamp_seconds(c0) FROM tbl ORDER BY c0") - } - - // Disable constant folding to ensure literal expressions are executed by Comet - withSQLConf( - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding", - SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { - // Test epoch (0 = 1970-01-01 00:00:00) - checkSparkAnswerAndOperator("SELECT timestamp_seconds(0)") - - // Test a known timestamp (1640995200 = 2022-01-01 00:00:00 UTC) - checkSparkAnswerAndOperator("SELECT timestamp_seconds(1640995200)") - - // Test negative value (before epoch) - checkSparkAnswerAndOperator("SELECT timestamp_seconds(-86400)") - - // Test with decimal seconds (fractional seconds) - cast to double - checkSparkAnswerAndOperator("SELECT timestamp_seconds(CAST(1640995200.123 AS DOUBLE))") - - // Test null handling - checkSparkAnswerAndOperator("SELECT timestamp_seconds(NULL)") - } - } - test("last_day") { val r = new Random(42) val schema = StructType(Seq(StructField("c0", DataTypes.DateType, true)))