From cba7ee384c5ee7a1a6fdc06ac47a007af710211f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 2 Jan 2026 11:45:18 -0700 Subject: [PATCH 1/3] format --- .../CometConditionalExpressionBenchmark.scala | 173 +++++++++++++++--- 1 file changed, 151 insertions(+), 22 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala index c5eb9ea390..5b3532d5f2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala @@ -20,47 +20,176 @@ package org.apache.spark.sql.benchmark /** - * Benchmark to measure Comet execution performance. To run this benchmark: - * `SPARK_GENERATE_BENCHMARK_FILES=1 make - * benchmark-org.apache.spark.sql.benchmark.CometConditionalExpressionBenchmark` Results will be - * written to "spark/benchmarks/CometConditionalExpressionBenchmark-**results.txt". + * Benchmark to measure Comet execution performance for conditional expressions. To run this + * benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometConditionalExpressionBenchmark + * }}} + * Results will be written to + * "spark/benchmarks/CometConditionalExpressionBenchmark-**results.txt". */ object CometConditionalExpressionBenchmark extends CometBenchmarkBase { - def caseWhenExprBenchmark(values: Int): Unit = { + private def prepareTestTable(values: Int)(f: => Unit): Unit = { withTempPath { dir => withTempTable("parquetV1Table") { - prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl")) + // Create table with multiple columns for richer test scenarios: + // - c1: random long values (full range) + // - c2: values 0-99 for multi-branch testing + // - c3: secondary column for non-literal result expressions + // - c4: string column for string result expressions + prepareTable( + dir, + spark.sql(s""" + SELECT + value AS c1, + CAST(ABS(value % 100) AS INT) AS c2, + CAST(value * 2 AS LONG) AS c3, + CAST(value AS STRING) AS c4 + FROM $tbl + """)) + f + } + } + } - val query = - "select CASE WHEN c1 < 0 THEN '<0' WHEN c1 = 0 THEN '=0' ELSE '>0' END from parquetV1Table" + def caseWhenLiteralBenchmark(values: Int): Unit = { + prepareTestTable(values) { + val query = + "SELECT CASE WHEN c1 < 0 THEN '<0' WHEN c1 = 0 THEN '=0' ELSE '>0' END FROM parquetV1Table" + runExpressionBenchmark("Case When Literal (3 branches)", values, query) + } + } - runExpressionBenchmark("Case When Expr", values, query) - } + def caseWhenManyBranchesLiteralBenchmark(values: Int): Unit = { + prepareTestTable(values) { + // 10 branches using c2 (values 0-99) + val query = """ + SELECT CASE + WHEN c2 < 10 THEN 'a' + WHEN c2 < 20 THEN 'b' + WHEN c2 < 30 THEN 'c' + WHEN c2 < 40 THEN 'd' + WHEN c2 < 50 THEN 'e' + WHEN c2 < 60 THEN 'f' + WHEN c2 < 70 THEN 'g' + WHEN c2 < 80 THEN 'h' + WHEN c2 < 90 THEN 'i' + ELSE 'j' + END FROM parquetV1Table + """ + runExpressionBenchmark("Case When Literal (10 branches)", values, query) } } - def ifExprBenchmark(values: Int): Unit = { - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable(dir, spark.sql(s"SELECT value AS c1 FROM $tbl")) + def caseWhenColumnResultBenchmark(values: Int): Unit = { + prepareTestTable(values) { + // Result expressions are column references, not literals + val query = + "SELECT CASE WHEN c1 < 0 THEN c3 WHEN c1 = 0 THEN c1 ELSE c3 + c1 END FROM parquetV1Table" + runExpressionBenchmark("Case When Column Result (3 branches)", values, query) + } + } - val query = "select IF (c1 < 0, '<0', '>=0') from parquetV1Table" + def caseWhenManyBranchesColumnResultBenchmark(values: Int): Unit = { + prepareTestTable(values) { + // 10 branches with column expressions as results + val query = """ + SELECT CASE + WHEN c2 < 10 THEN c1 + WHEN c2 < 20 THEN c3 + WHEN c2 < 30 THEN c1 + c3 + WHEN c2 < 40 THEN c1 - c2 + WHEN c2 < 50 THEN c3 * 2 + WHEN c2 < 60 THEN c1 / 2 + WHEN c2 < 70 THEN c2 + c3 + WHEN c2 < 80 THEN c1 * c2 + WHEN c2 < 90 THEN c3 - c1 + ELSE c1 + c2 + c3 + END FROM parquetV1Table + """ + runExpressionBenchmark("Case When Column Result (10 branches)", values, query) + } + } - runExpressionBenchmark("If Expr", values, query) - } + def ifLiteralBenchmark(values: Int): Unit = { + prepareTestTable(values) { + val query = "SELECT IF(c1 < 0, '<0', '>=0') FROM parquetV1Table" + runExpressionBenchmark("If Literal", values, query) + } + } + + def ifColumnResultBenchmark(values: Int): Unit = { + prepareTestTable(values) { + // Result expressions are column references + val query = "SELECT IF(c1 < 0, c3, c1 + c3) FROM parquetV1Table" + runExpressionBenchmark("If Column Result", values, query) + } + } + + def nestedIfBenchmark(values: Int): Unit = { + prepareTestTable(values) { + // Nested IF expressions (equivalent to CASE WHEN with multiple branches) + val query = """ + SELECT IF(c2 < 25, 'a', + IF(c2 < 50, 'b', + IF(c2 < 75, 'c', 'd'))) + FROM parquetV1Table + """ + runExpressionBenchmark("Nested If Literal (4 outcomes)", values, query) + } + } + + def nestedIfColumnResultBenchmark(values: Int): Unit = { + prepareTestTable(values) { + val query = """ + SELECT IF(c2 < 25, c1, + IF(c2 < 50, c3, + IF(c2 < 75, c1 + c3, c3 * 2))) + FROM parquetV1Table + """ + runExpressionBenchmark("Nested If Column Result (4 outcomes)", values, query) } } override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024; + val values = 1024 * 1024 + + // CASE WHEN with literal results + runBenchmarkWithTable("caseWhenLiteral", values) { v => + caseWhenLiteralBenchmark(v) + } + + runBenchmarkWithTable("caseWhenManyBranchesLiteral", values) { v => + caseWhenManyBranchesLiteralBenchmark(v) + } + + // CASE WHEN with column/expression results + runBenchmarkWithTable("caseWhenColumnResult", values) { v => + caseWhenColumnResultBenchmark(v) + } + + runBenchmarkWithTable("caseWhenManyBranchesColumnResult", values) { v => + caseWhenManyBranchesColumnResultBenchmark(v) + } + + // IF with literal results + runBenchmarkWithTable("ifLiteral", values) { v => + ifLiteralBenchmark(v) + } + + // IF with column/expression results + runBenchmarkWithTable("ifColumnResult", values) { v => + ifColumnResultBenchmark(v) + } - runBenchmarkWithTable("caseWhenExpr", values) { v => - caseWhenExprBenchmark(v) + // Nested IF expressions + runBenchmarkWithTable("nestedIfLiteral", values) { v => + nestedIfBenchmark(v) } - runBenchmarkWithTable("ifExpr", values) { v => - ifExprBenchmark(v) + runBenchmarkWithTable("nestedIfColumnResult", values) { v => + nestedIfColumnResultBenchmark(v) } } } From 9af4184357a436b0a9ab00900d6267dd1d6de1d8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 2 Jan 2026 12:20:51 -0700 Subject: [PATCH 2/3] skip some CI workflows for benchmark changes --- .github/workflows/pr_benchmark_check.yml | 85 ++++++++++++++++++++++++ .github/workflows/pr_build_linux.yml | 6 ++ .github/workflows/pr_build_macos.yml | 6 ++ .github/workflows/spark_sql_test.yml | 6 ++ 4 files changed, 103 insertions(+) create mode 100644 .github/workflows/pr_benchmark_check.yml diff --git a/.github/workflows/pr_benchmark_check.yml b/.github/workflows/pr_benchmark_check.yml new file mode 100644 index 0000000000..b7475b9076 --- /dev/null +++ b/.github/workflows/pr_benchmark_check.yml @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Lightweight CI for benchmark-only changes - verifies compilation and linting +# without running full test suites + +name: PR Benchmark Check + +concurrency: + group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +on: + push: + paths: + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" + pull_request: + paths: + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" + workflow_dispatch: + +env: + RUST_VERSION: stable + +jobs: + benchmark-check: + name: Benchmark Compile & Lint Check + runs-on: ubuntu-latest + container: + image: amd64/rust + steps: + - uses: actions/checkout@v6 + + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: ${{ env.RUST_VERSION }} + jdk-version: 17 + + - name: Check Cargo fmt + run: | + cd native + cargo fmt --all -- --check --color=never + + - name: Check Cargo clippy + run: | + cd native + cargo clippy --color=never --all-targets --workspace -- -D warnings + + - name: Check benchmark compilation + run: | + cd native + cargo check --benches + + - name: Cache Maven dependencies + uses: actions/cache@v4 + with: + path: | + ~/.m2/repository + /root/.m2/repository + key: ${{ runner.os }}-benchmark-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-benchmark-maven- + + - name: Check Scala compilation and linting + run: | + ./mvnw -B compile test-compile scalafix:scalafix -Dscalafix.mode=CHECK -Psemanticdb -DskipTests diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index e3b0e40566..beb5f9dcf7 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -27,11 +27,17 @@ on: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" pull_request: paths-ignore: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" # manual trigger # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow workflow_dispatch: diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 0ad40c1932..9a45fe022d 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -27,11 +27,17 @@ on: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" pull_request: paths-ignore: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" # manual trigger # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow workflow_dispatch: diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index d143ef83a0..1ff6fa952c 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -27,11 +27,17 @@ on: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" pull_request: paths-ignore: - "doc/**" - "docs/**" - "**.md" + - "native/core/benches/**" + - "native/spark-expr/benches/**" + - "spark/src/test/scala/org/apache/spark/sql/benchmark/**" # manual trigger # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow workflow_dispatch: From 6869f79709dff276c22154a4ba0201700db5ba19 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 2 Jan 2026 15:14:21 -0700 Subject: [PATCH 3/3] skip failing suite --- .github/workflows/spark_sql_test.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 1ff6fa952c..2fe5fefe1a 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -65,6 +65,10 @@ jobs: - {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} - {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"} - {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"} + # Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946 + exclude: + - spark-version: {short: '4.0', full: '4.0.1', java: 17} + module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} fail-fast: false name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }} runs-on: ${{ matrix.os }}