diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala deleted file mode 100644 index d63b3e7106..0000000000 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.benchmark - -import scala.util.Try - -import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.DecimalType - -import org.apache.comet.CometConf - -/** - * Benchmark to measure Comet execution performance. To run this benchmark: - * {{{ - * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateBenchmark - * }}} - * - * Results will be written to "spark/benchmarks/CometAggregateBenchmark-**results.txt". - */ -object CometAggregateBenchmark extends CometBenchmarkBase { - override def getSparkSession: SparkSession = { - val session = super.getSparkSession - session.conf.set("parquet.enable.dictionary", "false") - session.conf.set("spark.sql.shuffle.partitions", "2") - session - } - - // Wrapper on SQL aggregation function - case class BenchAggregateFunction(name: String, distinct: Boolean = false) { - override def toString: String = if (distinct) s"$name(DISTINCT)" else name - } - - // Aggregation functions to test - private val benchmarkAggFuncs = Seq( - BenchAggregateFunction("SUM"), - BenchAggregateFunction("MIN"), - BenchAggregateFunction("MAX"), - BenchAggregateFunction("COUNT"), - BenchAggregateFunction("COUNT", distinct = true), - BenchAggregateFunction("AVG")) - - def aggFunctionSQL(aggregateFunction: BenchAggregateFunction, input: String): String = { - s"${aggregateFunction.name}(${if (aggregateFunction.distinct) s"DISTINCT $input" else input})" - } - - def singleGroupAndAggregate( - values: Int, - groupingKeyCardinality: Int, - aggregateFunction: BenchAggregateFunction, - isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " + - s"single aggregate ${aggregateFunction.toString}, ansi mode enabled : ${isAnsiMode}", - values, - output = output) - - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable( - dir, - spark.sql(s"SELECT value, floor(rand() * $groupingKeyCardinality) as key FROM $tbl")) - - val functionSQL = aggFunctionSQL(aggregateFunction, "value") - val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY key" - - benchmark.addCase( - s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") { - _ => - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.addCase( - s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") { - _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.run() - } - } - } - - def singleGroupAndAggregateDecimal( - values: Int, - dataType: DecimalType, - groupingKeyCardinality: Int, - aggregateFunction: BenchAggregateFunction, - isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " + - s"single aggregate ${aggregateFunction.toString} on decimal", - values, - output = output) - - val df = makeDecimalDataFrame(values, dataType, false); - - withTempPath { dir => - withTempTable("parquetV1Table") { - df.createOrReplaceTempView(tbl) - prepareTable( - dir, - spark.sql( - s"SELECT dec as value, floor(rand() * $groupingKeyCardinality) as key FROM $tbl")) - - val functionSQL = aggFunctionSQL(aggregateFunction, "value") - val query = s"SELECT key, $functionSQL FROM parquetV1Table GROUP BY key" - - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - benchmark.addCase( - s"SQL Parquet - Spark (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") { - _ => - Try { spark.sql(query).noop() } - } - } - - benchmark.addCase( - s"SQL Parquet - Comet (${aggregateFunction.toString}) ansi mode enabled : ${isAnsiMode}") { - _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.run() - } - } - } - - def multiGroupKeys( - values: Int, - groupingKeyCard: Int, - aggregateFunction: BenchAggregateFunction, - isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Grouped HashAgg Exec: multiple group keys (cardinality $groupingKeyCard), " + - s"single aggregate ${aggregateFunction.toString}", - values, - output = output) - - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable( - dir, - spark.sql( - s"SELECT value, floor(rand() * $groupingKeyCard) as key1, " + - s"floor(rand() * $groupingKeyCard) as key2 FROM $tbl")) - - val functionSQL = aggFunctionSQL(aggregateFunction, "value") - val query = - s"SELECT key1, key2, $functionSQL FROM parquetV1Table GROUP BY key1, key2" - - benchmark.addCase( - s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") { - _ => - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.addCase( - s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") { - _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "1G", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.run() - } - } - } - - def multiAggregates( - values: Int, - groupingKeyCard: Int, - aggregateFunction: BenchAggregateFunction, - isAnsiMode: Boolean): Unit = { - val benchmark = - new Benchmark( - s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCard), " + - s"multiple aggregates ${aggregateFunction.toString} isANSIMode: ${isAnsiMode.toString}", - values, - output = output) - - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable( - dir, - spark.sql( - s"SELECT value as value1, value as value2, floor(rand() * $groupingKeyCard) as key " + - s"FROM $tbl")) - - val functionSQL1 = aggFunctionSQL(aggregateFunction, "value1") - val functionSQL2 = aggFunctionSQL(aggregateFunction, "value2") - - val query = s"SELECT key, $functionSQL1, $functionSQL2 " + - "FROM parquetV1Table GROUP BY key" - - benchmark.addCase( - s"SQL Parquet - Spark (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") { - _ => - withSQLConf(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.addCase( - s"SQL Parquet - Comet (${aggregateFunction.toString}) isANSIMode: ${isAnsiMode.toString}") { - _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_EXEC_ENABLED.key -> "true", - SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) { - Try { spark.sql(query).noop() } - } - } - - benchmark.run() - } - } - } - - override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val total = 1024 * 1024 * 10 - val combinations = List(100, 1024, 1024 * 1024) // number of distinct groups - benchmarkAggFuncs.foreach { aggFunc => - Seq(true, false).foreach(k => { - runBenchmarkWithTable( - s"Grouped Aggregate (single group key + single aggregate $aggFunc)", - total) { v => - for (card <- combinations) { - singleGroupAndAggregate(v, card, aggFunc, k) - } - } - - runBenchmarkWithTable( - s"Grouped Aggregate (multiple group keys + single aggregate $aggFunc)", - total) { v => - for (card <- combinations) { - multiGroupKeys(v, card, aggFunc, k) - } - } - - runBenchmarkWithTable( - s"Grouped Aggregate (single group key + multiple aggregates $aggFunc)", - total) { v => - for (card <- combinations) { - multiAggregates(v, card, aggFunc, k) - } - } - - runBenchmarkWithTable( - s"Grouped Aggregate (single group key + single aggregate $aggFunc on decimal)", - total) { v => - for (card <- combinations) { - singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card, aggFunc, k) - } - } - }) - } - } -} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala new file mode 100644 index 0000000000..4ea06109c4 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateExpressionBenchmark.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class AggExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +/** + * Comprehensive benchmark for Comet aggregate functions. To run this benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateExpressionBenchmark + * }}} + * Results will be written to "spark/benchmarks/CometAggregateFunctionBenchmark-**results.txt". + */ +object CometAggregateExpressionBenchmark extends CometBenchmarkBase { + + private val basicAggregates = List( + AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("count_col", "SELECT COUNT(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "count_distinct", + "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_int", "SELECT MIN(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_int", "SELECT MAX(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_double", "SELECT MIN(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_double", "SELECT MAX(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_int", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_long", "SELECT SUM(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_double", "SELECT SUM(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_int", "SELECT AVG(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_double", "SELECT AVG(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("first", "SELECT FIRST(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "first_ignore_nulls", + "SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("last", "SELECT LAST(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "last_ignore_nulls", + "SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp")) + + private val statisticalAggregates = List( + AggExprConfig("var_samp", "SELECT VAR_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("var_pop", "SELECT VAR_POP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("stddev_samp", "SELECT STDDEV_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("stddev_pop", "SELECT STDDEV_POP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "covar_samp", + "SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "covar_pop", + "SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("corr", "SELECT CORR(c_double, c_double2) FROM parquetV1Table GROUP BY grp")) + + private val bitwiseAggregates = List( + AggExprConfig("bit_and", "SELECT BIT_AND(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("bit_or", "SELECT BIT_OR(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("bit_xor", "SELECT BIT_XOR(c_long) FROM parquetV1Table GROUP BY grp")) + + // Additional structural tests (multiple group keys, multiple aggregates) + private val multiKeyAggregates = List( + AggExprConfig("sum_multi_key", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY grp, grp2"), + AggExprConfig("avg_multi_key", "SELECT AVG(c_double) FROM parquetV1Table GROUP BY grp, grp2")) + + private val multiAggregates = List( + AggExprConfig("sum_sum", "SELECT SUM(c_int), SUM(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_max", "SELECT MIN(c_int), MAX(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "count_sum_avg", + "SELECT COUNT(*), SUM(c_int), AVG(c_double) FROM parquetV1Table GROUP BY grp")) + + // Decimal aggregates + private val decimalAggregates = List( + AggExprConfig("sum_decimal", "SELECT SUM(c_decimal) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_decimal", "SELECT AVG(c_decimal) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_decimal", "SELECT MIN(c_decimal) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_decimal", "SELECT MAX(c_decimal) FROM parquetV1Table GROUP BY grp")) + + // High cardinality tests + private val highCardinalityAggregates = List( + AggExprConfig( + "sum_high_card", + "SELECT SUM(c_int) FROM parquetV1Table GROUP BY high_card_grp"), + AggExprConfig( + "count_distinct_high_card", + "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY high_card_grp")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 + + runBenchmarkWithTable("Aggregate function benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CAST(value % 1000 AS INT) AS grp, + CAST(value % 100 AS INT) AS grp2, + CAST(value % 100000 AS INT) AS high_card_grp, + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 10000) - 5000 AS INT) END AS c_int, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 10000) / 100.0 AS DOUBLE) END AS c_double, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 5000) / 50.0 AS DOUBLE) END AS c_double2, + CASE WHEN value % 100 = 4 THEN NULL ELSE CAST((value % 10000 - 5000) / 100.0 AS DECIMAL(18, 10)) END AS c_decimal + FROM $tbl + """)) + + val allAggregates = basicAggregates ++ statisticalAggregates ++ bitwiseAggregates ++ + multiKeyAggregates ++ multiAggregates ++ decimalAggregates ++ highCardinalityAggregates + + allAggregates.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +}