From 88faeeaed3e7eb49768e9de6337ae2d7770f7529 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 16:33:16 -0700 Subject: [PATCH 1/4] feat: add support for next_day expression Adds native Comet support for Spark's next_day function which returns the first date after a given date that falls on the specified day of the week. Supports full day names (Sunday, Monday, etc.) and abbreviations (Sun, Mon, etc.). Closes #3092 Co-Authored-By: Claude Opus 4.5 --- native/spark-expr/src/comet_scalar_funcs.rs | 5 +- native/spark-expr/src/datetime_funcs/mod.rs | 2 + .../spark-expr/src/datetime_funcs/next_day.rs | 217 ++++++++++++++++++ native/spark-expr/src/lib.rs | 4 +- .../apache/comet/serde/QueryPlanSerde.scala | 1 + .../org/apache/comet/serde/datetime.scala | 4 +- .../comet/CometTemporalExpressionSuite.scala | 43 ++++ 7 files changed, 272 insertions(+), 4 deletions(-) create mode 100644 native/spark-expr/src/datetime_funcs/next_day.rs diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 8384a4646a..aa798a3eb6 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -22,8 +22,8 @@ use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, - spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkDateTrunc, SparkSizeFunc, - SparkStringSpace, + spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkDateTrunc, SparkNextDay, + SparkSizeFunc, SparkStringSpace, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -193,6 +193,7 @@ fn all_scalar_functions() -> Vec> { vec![ Arc::new(ScalarUDF::new_from_impl(SparkBitwiseCount::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), + Arc::new(ScalarUDF::new_from_impl(SparkNextDay::default())), Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), ] diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index ef8041e5fe..1869585543 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -17,10 +17,12 @@ mod date_trunc; mod extract_date_part; +mod next_day; mod timestamp_trunc; pub use date_trunc::SparkDateTrunc; pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; +pub use next_day::SparkNextDay; pub use timestamp_trunc::TimestampTruncExpr; diff --git a/native/spark-expr/src/datetime_funcs/next_day.rs b/native/spark-expr/src/datetime_funcs/next_day.rs new file mode 100644 index 0000000000..8462eee182 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/next_day.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, Date32Array, StringArray}; +use arrow::datatypes::DataType; +use chrono::{Datelike, NaiveDate, Weekday}; +use datafusion::common::{utils::take_function_args, DataFusionError, Result, ScalarValue}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +/// Spark-compatible next_day function. +/// Returns the first date after the given start date that falls on the specified day of week. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkNextDay { + signature: Signature, +} + +impl SparkNextDay { + pub fn new() -> Self { + Self { + signature: Signature::exact( + vec![DataType::Date32, DataType::Utf8], + Volatility::Immutable, + ), + } + } +} + +impl Default for SparkNextDay { + fn default() -> Self { + Self::new() + } +} + +/// Parse day of week string to chrono Weekday. +/// Supports full names (case-insensitive) and common abbreviations. +/// Returns None for invalid day names. +fn parse_day_of_week(day_str: &str) -> Option { + let day_lower = day_str.trim().to_lowercase(); + match day_lower.as_str() { + "sunday" | "sun" | "su" => Some(Weekday::Sun), + "monday" | "mon" | "mo" => Some(Weekday::Mon), + "tuesday" | "tue" | "tu" => Some(Weekday::Tue), + "wednesday" | "wed" | "we" => Some(Weekday::Wed), + "thursday" | "thu" | "th" => Some(Weekday::Thu), + "friday" | "fri" | "fr" => Some(Weekday::Fri), + "saturday" | "sat" | "sa" => Some(Weekday::Sat), + _ => None, + } +} + +/// Calculate the next date that falls on the target day of week. +/// The result is always after the start date (never the same day). +fn next_day_from_date(date: NaiveDate, target_day: Weekday) -> NaiveDate { + let current_day = date.weekday(); + let current_day_num = current_day.num_days_from_sunday(); // 0 = Sunday + let target_day_num = target_day.num_days_from_sunday(); + + // Calculate days to add (always at least 1) + let days_to_add = if target_day_num > current_day_num { + target_day_num - current_day_num + } else { + // target_day_num <= current_day_num, so we need to go to next week + 7 - current_day_num + target_day_num + }; + + date + chrono::Duration::days(days_to_add as i64) +} + +/// Convert Date32 (days since epoch) to NaiveDate +fn date32_to_naive_date(days: i32) -> Option { + NaiveDate::from_ymd_opt(1970, 1, 1) + .and_then(|epoch| epoch.checked_add_signed(chrono::Duration::days(days as i64))) +} + +/// Convert NaiveDate to Date32 (days since epoch) +fn naive_date_to_date32(date: NaiveDate) -> i32 { + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + (date - epoch).num_days() as i32 +} + +impl ScalarUDFImpl for SparkNextDay { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "next_day" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Date32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [date_arg, day_arg] = take_function_args(self.name(), args.args)?; + + match (date_arg, day_arg) { + // Array date, scalar day of week + ( + ColumnarValue::Array(date_arr), + ColumnarValue::Scalar(ScalarValue::Utf8(Some(day_str))), + ) => { + let date_array = + date_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution("next_day expects Date32 array".to_string()) + })?; + + let target_day = parse_day_of_week(&day_str); + + let result: Date32Array = date_array + .iter() + .map(|opt_date| { + match (opt_date, &target_day) { + (Some(days), Some(day)) => date32_to_naive_date(days) + .map(|date| naive_date_to_date32(next_day_from_date(date, *day))), + _ => None, // null date or invalid day name returns null + } + }) + .collect(); + + Ok(ColumnarValue::Array(Arc::new(result))) + } + // Array date, array day of week + (ColumnarValue::Array(date_arr), ColumnarValue::Array(day_arr)) => { + let date_array = + date_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution("next_day expects Date32 array".to_string()) + })?; + let day_array = + day_arr + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution( + "next_day expects String array for day of week".to_string(), + ) + })?; + + let result: Date32Array = date_array + .iter() + .zip(day_array.iter()) + .map(|(opt_date, opt_day)| { + match (opt_date, opt_day) { + (Some(days), Some(day_str)) => { + let target_day = parse_day_of_week(day_str); + match target_day { + Some(day) => date32_to_naive_date(days).map(|date| { + naive_date_to_date32(next_day_from_date(date, day)) + }), + None => None, // Invalid day name returns null + } + } + _ => None, + } + }) + .collect(); + + Ok(ColumnarValue::Array(Arc::new(result))) + } + // Scalar date, scalar day of week + ( + ColumnarValue::Scalar(ScalarValue::Date32(opt_days)), + ColumnarValue::Scalar(ScalarValue::Utf8(opt_day_str)), + ) => { + let result = match (opt_days, opt_day_str) { + (Some(days), Some(day_str)) => { + let target_day = parse_day_of_week(&day_str); + match target_day { + Some(day) => date32_to_naive_date(days) + .map(|date| naive_date_to_date32(next_day_from_date(date, day))), + None => None, + } + } + _ => None, + }; + Ok(ColumnarValue::Scalar(ScalarValue::Date32(result))) + } + // Handle null day of week + (date_arg, ColumnarValue::Scalar(ScalarValue::Utf8(None))) => { + let arr = date_arg.into_array(1)?; + let null_result: Date32Array = (0..arr.len()).map(|_| None::).collect(); + Ok(ColumnarValue::Array(Arc::new(null_result))) + } + _ => Err(DataFusionError::Execution( + "next_day expects (Date32, Utf8) arguments".to_string(), + )), + } + } +} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index f26fd911d8..9315b9db36 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -69,7 +69,9 @@ pub use comet_scalar_funcs::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, register_all_comet_functions, }; -pub use datetime_funcs::{SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr}; +pub use datetime_funcs::{ + SparkDateTrunc, SparkHour, SparkMinute, SparkNextDay, SparkSecond, TimestampTruncExpr, +}; pub use error::{SparkError, SparkResult}; pub use hash_funcs::*; pub use json_funcs::{FromJson, ToJson}; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e50b1d80e6..d68028161a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -189,6 +189,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[FromUnixTime] -> CometFromUnixTime, classOf[Hour] -> CometHour, classOf[Minute] -> CometMinute, + classOf[NextDay] -> CometNextDay, classOf[Second] -> CometSecond, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index ef2b0f793c..5e137af93c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType} import org.apache.spark.unsafe.types.UTF8String @@ -258,6 +258,8 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add") object CometDateSub extends CometScalarFunction[DateSub]("date_sub") +object CometNextDay extends CometScalarFunction[NextDay]("next_day") + object CometTruncDate extends CometExpressionSerde[TruncDate] { val supportedFormats: Seq[String] = diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 9a23c76d82..78ce93e0ab 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -122,4 +122,47 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH StructField("fmt", DataTypes.StringType, true))) FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) } + + test("next_day") { + import org.apache.spark.sql.Row + + // Create test data with dates + val r = new Random(42) + val testData = (1 to 1000).map { _ => + // Generate dates in a reasonable range + val daysFromEpoch = r.nextInt(20000) - 5000 // ~1956 to ~2024 + // Convert days from epoch to java.sql.Date + val millis = daysFromEpoch.toLong * 24 * 60 * 60 * 1000 + Row(new java.sql.Date(millis)) + } + val schema = StructType(Seq(StructField("c0", DataTypes.DateType, false))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), schema) + df.createOrReplaceTempView("tbl") + + // Test with various day names + val dayNames = + Seq("Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday") + for (day <- dayNames) { + checkSparkAnswerAndOperator(s"SELECT c0, next_day(c0, '$day') FROM tbl ORDER BY c0") + } + + // Test with abbreviated day names + val abbreviations = Seq("Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat") + for (day <- abbreviations) { + checkSparkAnswerAndOperator(s"SELECT c0, next_day(c0, '$day') FROM tbl ORDER BY c0") + } + + // Disable constant folding to ensure literal expressions are executed by Comet + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + // Test with literal date + checkSparkAnswerAndOperator("SELECT next_day(DATE('2023-01-01'), 'Monday')") + checkSparkAnswerAndOperator("SELECT next_day(DATE('2023-01-01'), 'Sunday')") + + // Test null handling + checkSparkAnswerAndOperator("SELECT next_day(NULL, 'Monday')") + checkSparkAnswerAndOperator("SELECT next_day(DATE('2023-01-01'), NULL)") + } + } } From 990d16dc09bb395ae375a01b00af880ef05a1053 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 17:39:45 -0700 Subject: [PATCH 2/4] docs --- docs/source/user-guide/latest/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 1a273ad033..2ad596f2a3 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -286,6 +286,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.Month.enabled` | Enable Comet acceleration for `Month` | true | | `spark.comet.expression.Multiply.enabled` | Enable Comet acceleration for `Multiply` | true | | `spark.comet.expression.Murmur3Hash.enabled` | Enable Comet acceleration for `Murmur3Hash` | true | +| `spark.comet.expression.NextDay.enabled` | Enable Comet acceleration for `NextDay` | true | | `spark.comet.expression.Not.enabled` | Enable Comet acceleration for `Not` | true | | `spark.comet.expression.OctetLength.enabled` | Enable Comet acceleration for `OctetLength` | true | | `spark.comet.expression.Or.enabled` | Enable Comet acceleration for `Or` | true | From 1c8e96e1c6f5c775ff66980c198578e0b7fd96d7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 10 Feb 2026 11:42:17 -0700 Subject: [PATCH 3/4] test: migrate next_day tests to SQL file-based approach Co-Authored-By: Claude Opus 4.6 --- .../expressions/datetime/next_day.sql | 76 +++++++++++++++++++ .../comet/CometTemporalExpressionSuite.scala | 43 ----------- 2 files changed, 76 insertions(+), 43 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/datetime/next_day.sql diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/next_day.sql b/spark/src/test/resources/sql-tests/expressions/datetime/next_day.sql new file mode 100644 index 0000000000..0effb36aa5 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/next_day.sql @@ -0,0 +1,76 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ConfigMatrix: parquet.enable.dictionary=false,true + +statement +CREATE TABLE test_next_day(d date) USING parquet + +statement +INSERT INTO test_next_day VALUES (date('2023-01-01')), (date('2024-02-29')), (date('1969-12-31')), (date('2024-06-15')), (NULL) + +-- full day names +query +SELECT next_day(d, 'Sunday') FROM test_next_day + +query +SELECT next_day(d, 'Monday') FROM test_next_day + +query +SELECT next_day(d, 'Tuesday') FROM test_next_day + +query +SELECT next_day(d, 'Wednesday') FROM test_next_day + +query +SELECT next_day(d, 'Thursday') FROM test_next_day + +query +SELECT next_day(d, 'Friday') FROM test_next_day + +query +SELECT next_day(d, 'Saturday') FROM test_next_day + +-- abbreviated day names +query +SELECT next_day(d, 'Sun') FROM test_next_day + +query +SELECT next_day(d, 'Mon') FROM test_next_day + +query +SELECT next_day(d, 'Tue') FROM test_next_day + +query +SELECT next_day(d, 'Wed') FROM test_next_day + +query +SELECT next_day(d, 'Thu') FROM test_next_day + +query +SELECT next_day(d, 'Fri') FROM test_next_day + +query +SELECT next_day(d, 'Sat') FROM test_next_day + +-- literal arguments +query +SELECT next_day(date('2023-01-01'), 'Monday'), next_day(date('2023-01-01'), 'Sunday') + +-- null handling +query +SELECT next_day(NULL, 'Monday'), next_day(date('2023-01-01'), NULL) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index bcf5d89d2a..1ae6926e05 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -182,49 +182,6 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) } - test("next_day") { - import org.apache.spark.sql.Row - - // Create test data with dates - val r = new Random(42) - val testData = (1 to 1000).map { _ => - // Generate dates in a reasonable range - val daysFromEpoch = r.nextInt(20000) - 5000 // ~1956 to ~2024 - // Convert days from epoch to java.sql.Date - val millis = daysFromEpoch.toLong * 24 * 60 * 60 * 1000 - Row(new java.sql.Date(millis)) - } - val schema = StructType(Seq(StructField("c0", DataTypes.DateType, false))) - val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), schema) - df.createOrReplaceTempView("tbl") - - // Test with various day names - val dayNames = - Seq("Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday") - for (day <- dayNames) { - checkSparkAnswerAndOperator(s"SELECT c0, next_day(c0, '$day') FROM tbl ORDER BY c0") - } - - // Test with abbreviated day names - val abbreviations = Seq("Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat") - for (day <- abbreviations) { - checkSparkAnswerAndOperator(s"SELECT c0, next_day(c0, '$day') FROM tbl ORDER BY c0") - } - - // Disable constant folding to ensure literal expressions are executed by Comet - withSQLConf( - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { - // Test with literal date - checkSparkAnswerAndOperator("SELECT next_day(DATE('2023-01-01'), 'Monday')") - checkSparkAnswerAndOperator("SELECT next_day(DATE('2023-01-01'), 'Sunday')") - - // Test null handling - checkSparkAnswerAndOperator("SELECT next_day(NULL, 'Monday')") - checkSparkAnswerAndOperator("SELECT next_day(DATE('2023-01-01'), NULL)") - } - } - test("last_day") { val r = new Random(42) val schema = StructType(Seq(StructField("c0", DataTypes.DateType, true))) From 703944e7cc949beb027362c19b3ebfdd060ad18d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 10 Feb 2026 12:10:07 -0700 Subject: [PATCH 4/4] refactor: use next_day implementation from datafusion-spark crate Replace our custom SparkNextDay with the upstream datafusion-spark version, which also handles LargeUtf8 and Utf8View string types. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 2 + native/spark-expr/src/comet_scalar_funcs.rs | 3 +- native/spark-expr/src/datetime_funcs/mod.rs | 2 - .../spark-expr/src/datetime_funcs/next_day.rs | 217 ------------------ native/spark-expr/src/lib.rs | 4 +- 5 files changed, 5 insertions(+), 223 deletions(-) delete mode 100644 native/spark-expr/src/datetime_funcs/next_day.rs diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 146e0feb8e..b1e48828f7 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -45,6 +45,7 @@ use datafusion_spark::function::bitwise::bitwise_not::SparkBitwiseNot; use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::datetime::last_day::SparkLastDay; +use datafusion_spark::function::datetime::next_day::SparkNextDay; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; use datafusion_spark::function::map::map_from_entries::MapFromEntries; @@ -349,6 +350,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateAdd::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLastDay::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkNextDay::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 359e4008ad..6647e01cc8 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -23,7 +23,7 @@ use crate::{ spark_array_repeat, spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains, SparkDateDiff, - SparkDateTrunc, SparkNextDay, SparkSizeFunc, SparkStringSpace, + SparkDateTrunc, SparkSizeFunc, SparkStringSpace, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -195,7 +195,6 @@ fn all_scalar_functions() -> Vec> { Arc::new(ScalarUDF::new_from_impl(SparkContains::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), - Arc::new(ScalarUDF::new_from_impl(SparkNextDay::default())), Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), ] diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index f1910a82a8..1832711479 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -18,7 +18,6 @@ mod date_diff; mod date_trunc; mod extract_date_part; -mod next_day; mod timestamp_trunc; mod unix_timestamp; @@ -27,6 +26,5 @@ pub use date_trunc::SparkDateTrunc; pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; -pub use next_day::SparkNextDay; pub use timestamp_trunc::TimestampTruncExpr; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/next_day.rs b/native/spark-expr/src/datetime_funcs/next_day.rs deleted file mode 100644 index 8462eee182..0000000000 --- a/native/spark-expr/src/datetime_funcs/next_day.rs +++ /dev/null @@ -1,217 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::{Array, Date32Array, StringArray}; -use arrow::datatypes::DataType; -use chrono::{Datelike, NaiveDate, Weekday}; -use datafusion::common::{utils::take_function_args, DataFusionError, Result, ScalarValue}; -use datafusion::logical_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, -}; -use std::any::Any; -use std::sync::Arc; - -/// Spark-compatible next_day function. -/// Returns the first date after the given start date that falls on the specified day of week. -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct SparkNextDay { - signature: Signature, -} - -impl SparkNextDay { - pub fn new() -> Self { - Self { - signature: Signature::exact( - vec![DataType::Date32, DataType::Utf8], - Volatility::Immutable, - ), - } - } -} - -impl Default for SparkNextDay { - fn default() -> Self { - Self::new() - } -} - -/// Parse day of week string to chrono Weekday. -/// Supports full names (case-insensitive) and common abbreviations. -/// Returns None for invalid day names. -fn parse_day_of_week(day_str: &str) -> Option { - let day_lower = day_str.trim().to_lowercase(); - match day_lower.as_str() { - "sunday" | "sun" | "su" => Some(Weekday::Sun), - "monday" | "mon" | "mo" => Some(Weekday::Mon), - "tuesday" | "tue" | "tu" => Some(Weekday::Tue), - "wednesday" | "wed" | "we" => Some(Weekday::Wed), - "thursday" | "thu" | "th" => Some(Weekday::Thu), - "friday" | "fri" | "fr" => Some(Weekday::Fri), - "saturday" | "sat" | "sa" => Some(Weekday::Sat), - _ => None, - } -} - -/// Calculate the next date that falls on the target day of week. -/// The result is always after the start date (never the same day). -fn next_day_from_date(date: NaiveDate, target_day: Weekday) -> NaiveDate { - let current_day = date.weekday(); - let current_day_num = current_day.num_days_from_sunday(); // 0 = Sunday - let target_day_num = target_day.num_days_from_sunday(); - - // Calculate days to add (always at least 1) - let days_to_add = if target_day_num > current_day_num { - target_day_num - current_day_num - } else { - // target_day_num <= current_day_num, so we need to go to next week - 7 - current_day_num + target_day_num - }; - - date + chrono::Duration::days(days_to_add as i64) -} - -/// Convert Date32 (days since epoch) to NaiveDate -fn date32_to_naive_date(days: i32) -> Option { - NaiveDate::from_ymd_opt(1970, 1, 1) - .and_then(|epoch| epoch.checked_add_signed(chrono::Duration::days(days as i64))) -} - -/// Convert NaiveDate to Date32 (days since epoch) -fn naive_date_to_date32(date: NaiveDate) -> i32 { - let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); - (date - epoch).num_days() as i32 -} - -impl ScalarUDFImpl for SparkNextDay { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &str { - "next_day" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, _: &[DataType]) -> Result { - Ok(DataType::Date32) - } - - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let [date_arg, day_arg] = take_function_args(self.name(), args.args)?; - - match (date_arg, day_arg) { - // Array date, scalar day of week - ( - ColumnarValue::Array(date_arr), - ColumnarValue::Scalar(ScalarValue::Utf8(Some(day_str))), - ) => { - let date_array = - date_arr - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Execution("next_day expects Date32 array".to_string()) - })?; - - let target_day = parse_day_of_week(&day_str); - - let result: Date32Array = date_array - .iter() - .map(|opt_date| { - match (opt_date, &target_day) { - (Some(days), Some(day)) => date32_to_naive_date(days) - .map(|date| naive_date_to_date32(next_day_from_date(date, *day))), - _ => None, // null date or invalid day name returns null - } - }) - .collect(); - - Ok(ColumnarValue::Array(Arc::new(result))) - } - // Array date, array day of week - (ColumnarValue::Array(date_arr), ColumnarValue::Array(day_arr)) => { - let date_array = - date_arr - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Execution("next_day expects Date32 array".to_string()) - })?; - let day_array = - day_arr - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Execution( - "next_day expects String array for day of week".to_string(), - ) - })?; - - let result: Date32Array = date_array - .iter() - .zip(day_array.iter()) - .map(|(opt_date, opt_day)| { - match (opt_date, opt_day) { - (Some(days), Some(day_str)) => { - let target_day = parse_day_of_week(day_str); - match target_day { - Some(day) => date32_to_naive_date(days).map(|date| { - naive_date_to_date32(next_day_from_date(date, day)) - }), - None => None, // Invalid day name returns null - } - } - _ => None, - } - }) - .collect(); - - Ok(ColumnarValue::Array(Arc::new(result))) - } - // Scalar date, scalar day of week - ( - ColumnarValue::Scalar(ScalarValue::Date32(opt_days)), - ColumnarValue::Scalar(ScalarValue::Utf8(opt_day_str)), - ) => { - let result = match (opt_days, opt_day_str) { - (Some(days), Some(day_str)) => { - let target_day = parse_day_of_week(&day_str); - match target_day { - Some(day) => date32_to_naive_date(days) - .map(|date| naive_date_to_date32(next_day_from_date(date, day))), - None => None, - } - } - _ => None, - }; - Ok(ColumnarValue::Scalar(ScalarValue::Date32(result))) - } - // Handle null day of week - (date_arg, ColumnarValue::Scalar(ScalarValue::Utf8(None))) => { - let arr = date_arg.into_array(1)?; - let null_result: Date32Array = (0..arr.len()).map(|_| None::).collect(); - Ok(ColumnarValue::Array(Arc::new(null_result))) - } - _ => Err(DataFusionError::Execution( - "next_day expects (Date32, Utf8) arguments".to_string(), - )), - } - } -} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index dcbdd7b77f..52cf2a2ac3 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -72,8 +72,8 @@ pub use comet_scalar_funcs::{ }; pub use csv_funcs::*; pub use datetime_funcs::{ - SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkNextDay, SparkSecond, - SparkUnixTimestamp, TimestampTruncExpr, + SparkDateDiff, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, + TimestampTruncExpr, }; pub use error::{SparkError, SparkResult}; pub use hash_funcs::*;