From 3361d48997a7d4a9115596c3343183cac7251685 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 2 Jan 2026 11:16:14 -0700 Subject: [PATCH 1/5] implement comprehensive microbenchmarks for aggregate expressions --- .../CometAggregateFunctionBenchmark.scala | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala new file mode 100644 index 0000000000..0ba2e79614 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class AggExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet aggregate functions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateFunctionBenchmark` + * Results will be written to "spark/benchmarks/CometAggregateFunctionBenchmark-**results.txt". + */ +// spotless:on +object CometAggregateFunctionBenchmark extends CometBenchmarkBase { + + private val basicAggregates = List( + AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("count_col", "SELECT COUNT(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "count_distinct", + "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_int", "SELECT MIN(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_int", "SELECT MAX(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_double", "SELECT MIN(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_double", "SELECT MAX(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_int", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_long", "SELECT SUM(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_double", "SELECT SUM(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_int", "SELECT AVG(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_double", "SELECT AVG(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("first", "SELECT FIRST(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "first_ignore_nulls", + "SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("last", "SELECT LAST(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "last_ignore_nulls", + "SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp")) + + private val statisticalAggregates = List( + AggExprConfig("var_samp", "SELECT VAR_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("var_pop", "SELECT VAR_POP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("stddev_samp", "SELECT STDDEV_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("stddev_pop", "SELECT STDDEV_POP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "covar_samp", + "SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "covar_pop", + "SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("corr", "SELECT CORR(c_double, c_double2) FROM parquetV1Table GROUP BY grp")) + + private val bitwiseAggregates = List( + AggExprConfig("bit_and", "SELECT BIT_AND(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("bit_or", "SELECT BIT_OR(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("bit_xor", "SELECT BIT_XOR(c_long) FROM parquetV1Table GROUP BY grp")) + + // Additional structural tests (multiple group keys, multiple aggregates) + private val multiKeyAggregates = List( + AggExprConfig("sum_multi_key", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY grp, grp2"), + AggExprConfig("avg_multi_key", "SELECT AVG(c_double) FROM parquetV1Table GROUP BY grp, grp2")) + + private val multiAggregates = List( + AggExprConfig("sum_sum", "SELECT SUM(c_int), SUM(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_max", "SELECT MIN(c_int), MAX(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "count_sum_avg", + "SELECT COUNT(*), SUM(c_int), AVG(c_double) FROM parquetV1Table GROUP BY grp")) + + // Decimal aggregates + private val decimalAggregates = List( + AggExprConfig("sum_decimal", "SELECT SUM(c_decimal) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_decimal", "SELECT AVG(c_decimal) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_decimal", "SELECT MIN(c_decimal) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_decimal", "SELECT MAX(c_decimal) FROM parquetV1Table GROUP BY grp")) + + // High cardinality tests + private val highCardinalityAggregates = List( + AggExprConfig( + "sum_high_card", + "SELECT SUM(c_int) FROM parquetV1Table GROUP BY high_card_grp"), + AggExprConfig( + "count_distinct_high_card", + "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY high_card_grp")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 10) // 10M rows default for aggregates + + runBenchmarkWithTable("Aggregate function benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CAST(value % 1000 AS INT) AS grp, + CAST(value % 100 AS INT) AS grp2, + CAST(value % 100000 AS INT) AS high_card_grp, + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 10000) - 5000 AS INT) END AS c_int, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 10000) / 100.0 AS DOUBLE) END AS c_double, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 5000) / 50.0 AS DOUBLE) END AS c_double2, + CASE WHEN value % 100 = 4 THEN NULL ELSE CAST((value % 10000 - 5000) / 100.0 AS DECIMAL(18, 10)) END AS c_decimal + FROM $tbl + """)) + + val allAggregates = basicAggregates ++ statisticalAggregates ++ bitwiseAggregates ++ + multiKeyAggregates ++ multiAggregates ++ decimalAggregates ++ highCardinalityAggregates + + allAggregates.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} From b1cdc48184196709f17553a9db93e8007d2c382f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 2 Jan 2026 11:17:51 -0700 Subject: [PATCH 2/5] rename --- ...ionBenchmark.scala => CometAggregateExpressionBenchmark.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename spark/src/test/scala/org/apache/spark/sql/benchmark/{CometAggregateFunctionBenchmark.scala => CometAggregateExpressionBenchmark.scala} (100%) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala similarity index 100% rename from spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala rename to spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala From 3a00d1a79cc159099cc64790739adc0d67c4e9a2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 2 Jan 2026 11:19:16 -0700 Subject: [PATCH 3/5] delete old benchmark --- .../benchmark/CometAggregateBenchmark.scala | 298 ------------------ .../CometAggregateExpressionBenchmark.scala | 10 +- 2 files changed, 5 insertions(+), 303 deletions(-) delete mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala deleted file mode 100644 index d63b3e7106..0000000000 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.benchmark - -import scala.util.Try - -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.DecimalType - -import org.apache.comet.CometConf - -/** - * Benchmark to measure Comet execution performance. To run this benchmark: - * {{{ - * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateBenchmark - * }}} - * - * Results will be written to "spark/benchmarks/CometAggregateBenchmark-**results.txt". - */ -object CometAggregateBenchmark extends CometBenchmarkBase { - override def getSparkSession: SparkSession = { - val session = super.getSparkSession - session.conf.set("parquet.enable.dictionary", "false") - session.conf.set("spark.sql.shuffle.partitions", "2") - session - } - - // Wrapper on SQL aggregation function - case class BenchAggregateFunction(name: String, distinct: Boolean = false) { - override def toString: String = if (distinct) s"$name(DISTINCT)" else name - } - - // Aggregation functions to test - private val benchmarkAggFuncs = Seq( - BenchAggregateFunction("SUM"), - BenchAggregateFunction("MIN"), - BenchAggregateFunction("MAX"), - BenchAggregateFunction("COUNT"), - BenchAggregateFunction("COUNT", distinct = true), - BenchAggregateFunction("AVG")) - - def aggFunctionSQL(aggregateFunction: BenchAggregateFunction, input: String): String = { - s"${aggregateFunction.name}(${if (aggregateFunction.distinct) s"DISTINCT $input" else input})" - } - - def singleGroupAndAggregate( - values: Int, - groupingKeyCardinality: Int, - aggregateFunction: BenchAggregateFunction, - isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " + - s"single aggregate ${aggregateFunction.toString}, ansi mode enabled : ${isAnsiMode}", - values, - output = output) - - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable( - dir, - spark.sql(s"SELECT value, floor(rand() * $groupingKeyCardinality) as key FROM $tbl")) - - val functionSQL = aggFunctionSQL(aggregateFunction, "value") - val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY key" - - benchmark.addCase( - s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") { - _ => - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.addCase( - s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") { - _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.run() - } - } - } - - def singleGroupAndAggregateDecimal( - values: Int, - dataType: DecimalType, - groupingKeyCardinality: Int, - aggregateFunction: BenchAggregateFunction, - isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " + - s"single aggregate ${aggregateFunction.toString} on decimal", - values, - output = output) - - val df = makeDecimalDataFrame(values, dataType, false); - - withTempPath { dir => - withTempTable("parquetV1Table") { - df.createOrReplaceTempView(tbl) - prepareTable( - dir, - spark.sql( - s"SELECT dec as value, floor(rand() * $groupingKeyCardinality) as key FROM $tbl")) - - val functionSQL = aggFunctionSQL(aggregateFunction, "value") - val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY key" - - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - benchmark.addCase( - s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") { - _ => - Try { spark.sql(query).noop() } - } - } - - benchmark.addCase( - s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") { - _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.run() - } - } - } - - def multiGroupKeys( - values: Int, - groupingKeyCard: Int, - aggregateFunction: BenchAggregateFunction, - isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Grouped HashAgg Exec: multiple group keys (cardinality $groupingKeyCard), " + - s"single aggregate ${aggregateFunction.toString}", - values, - output = output) - - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable( - dir, - spark.sql( - s"SELECT value, floor(rand() * $groupingKeyCard) as key1, " + - s"floor(rand() * $groupingKeyCard) as key2 FROM $tbl")) - - val functionSQL = aggFunctionSQL(aggregateFunction, "value") - val query = - s"SELECT key1, key2, $functionSQL FROM parquetV1Table GROUP BY key1, key2" - - benchmark.addCase( - s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") { - _ => - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.addCase( - s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") { - _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "1G", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.run() - } - } - } - - def multiAggregates( - values: Int, - groupingKeyCard: Int, - aggregateFunction: BenchAggregateFunction, - isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCard), " + - s"multiple aggregates ${aggregateFunction.toString} isANSIMode: ${isAnsiMode.toString}", - values, - output = output) - - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable( - dir, - spark.sql( - s"SELECT value as value1, value as value2, floor(rand() * $groupingKeyCard) as key " + - s"FROM $tbl")) - - val functionSQL1 = aggFunctionSQL(aggregateFunction, "value1") - val functionSQL2 = aggFunctionSQL(aggregateFunction, "value2") - - val query = s"SELECT key, $functionSQL1, $functionSQL2 " + - "FROM parquetV1Table GROUP BY key" - - benchmark.addCase( - s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") { - _ => - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.addCase( - s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") { - _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.run() - } - } - } - - override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val total = 1024 * 1024 * 10 - val combinations = List(100, 1024, 1024 * 1024) // number of distinct groups - benchmarkAggFuncs.foreach { aggFunc => - Seq(true, false).foreach(k => { - runBenchmarkWithTable( - s"Grouped Aggregate (single group key + single aggregate $aggFunc)", - total) { v => - for (card <- combinations) { - singleGroupAndAggregate(v, card, aggFunc, k) - } - } - - runBenchmarkWithTable( - s"Grouped Aggregate (multiple group keys + single aggregate $aggFunc)", - total) { v => - for (card <- combinations) { - multiGroupKeys(v, card, aggFunc, k) - } - } - - runBenchmarkWithTable( - s"Grouped Aggregate (single group key + multiple aggregates $aggFunc)", - total) { v => - for (card <- combinations) { - multiAggregates(v, card, aggFunc, k) - } - } - - runBenchmarkWithTable( - s"Grouped Aggregate (single group key + single aggregate $aggFunc on decimal)", - total) { v => - for (card <- combinations) { - singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card, aggFunc, k) - } - } - }) - } - } -} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala index 0ba2e79614..4ea06109c4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala @@ -24,14 +24,14 @@ case class AggExprConfig( query: String, extraCometConfigs: Map[String, String] = Map.empty) -// spotless:off /** * Comprehensive benchmark for Comet aggregate functions. To run this benchmark: - * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateFunctionBenchmark` + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateExpressionBenchmark + * }}} * Results will be written to "spark/benchmarks/CometAggregateFunctionBenchmark-**results.txt". */ -// spotless:on -object CometAggregateFunctionBenchmark extends CometBenchmarkBase { +object CometAggregateExpressionBenchmark extends CometBenchmarkBase { private val basicAggregates = List( AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"), @@ -104,7 +104,7 @@ object CometAggregateFunctionBenchmark extends CometBenchmarkBase { "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY high_card_grp")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = getBenchmarkRows(1024 * 1024 * 10) // 10M rows default for aggregates + val values = 1024 * 1024 runBenchmarkWithTable("Aggregate function benchmarks", values) { v => withTempPath { dir => From 9af4184357a436b0a9ab00900d6267dd1d6de1d8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 2 Jan 2026 12:20:51 -0700 Subject: [PATCH 4/5] skip some CI workflows for benchmark changes --- .github/workflows/pr_benchmark_check.yml | 85 ++++++++++++++++++++++++ .github/workflows/pr_build_linux.yml | 6 ++ .github/workflows/pr_build_macos.yml | 6 ++ .github/workflows/spark_sql_test.yml | 6 ++ 4 files changed, 103 insertions(+) create mode 100644 .github/workflows/pr_benchmark_check.yml diff --git a/.github/workflows/pr_benchmark_check.yml b/.github/workflows/pr_benchmark_check.yml new file mode 100644 index 0000000000..b7475b9076 --- /dev/null +++ b/.github/workflows/pr_benchmark_check.yml @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Lightweight CI for benchmark-only changes - verifies compilation and linting +# without running full test suites + +name: PR Benchmark Check + +concurrency: + group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +on: + push: + paths: + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" + pull_request: + paths: + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" + workflow_dispatch: + +env: + RUST_VERSION: stable + +jobs: + benchmark-check: + name: Benchmark Compile & Lint Check + runs-on: ubuntu-latest + container: + image: amd64/rust + steps: + - uses: actions/checkout@v6 + + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: ${{ env.RUST_VERSION }} + jdk-version: 17 + + - name: Check Cargo fmt + run: | + cd native + cargo fmt --all -- --check --color=never + + - name: Check Cargo clippy + run: | + cd native + cargo clippy --color=never --all-targets --workspace -- -D warnings + + - name: Check benchmark compilation + run: | + cd native + cargo check --benches + + - name: Cache Maven dependencies + uses: actions/cache@v4 + with: + path: | + ~/.m2/repository + /root/.m2/repository + key: ${{ runner.os }}-benchmark-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-benchmark-maven- + + - name: Check Scala compilation and linting + run: | + ./mvnw -B compile test-compile scalafix:scalafix -Dscalafix.mode=CHECK -Psemanticdb -DskipTests diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index e3b0e40566..beb5f9dcf7 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -27,11 +27,17 @@ on: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" pull_request: paths-ignore: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" # manual trigger # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow workflow_dispatch: diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 0ad40c1932..9a45fe022d 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -27,11 +27,17 @@ on: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" pull_request: paths-ignore: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" # manual trigger # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow workflow_dispatch: diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index d143ef83a0..1ff6fa952c 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -27,11 +27,17 @@ on: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" pull_request: paths-ignore: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" # manual trigger # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow workflow_dispatch: From 6869f79709dff276c22154a4ba0201700db5ba19 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 2 Jan 2026 15:14:21 -0700 Subject: [PATCH 5/5] skip failing suite --- .github/workflows/spark_sql_test.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 1ff6fa952c..2fe5fefe1a 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -65,6 +65,10 @@ jobs: - {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} - {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"} - {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"} + # Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946 + exclude: + - spark-version: {short: '4.0', full: '4.0.1', java: 17} + module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} fail-fast: false name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }} runs-on: ${{ matrix.os }}