Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ Available benchmarking guides:

- [Benchmarking on macOS](benchmarking_macos.md)
- [Benchmarking on AWS EC2](benchmarking_aws_ec2)
- [TPC-DS Benchmarking with spark-sql-perf](benchmarking_spark_sql_perf.md)

We also have many micro benchmarks that can be run from an IDE located [here](https://github.com/apache/datafusion-comet/tree/main/spark/src/test/scala/org/apache/spark/sql/benchmark).
311 changes: 311 additions & 0 deletions docs/source/contributor-guide/benchmarking_spark_sql_perf.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# TPC-DS Benchmarking with spark-sql-perf

This guide explains how to generate TPC-DS data and run TPC-DS benchmarks using the
[KubedAI/spark-sql-perf](https://github.com/KubedAI/spark-sql-perf) framework (a fork of
[databricks/spark-sql-perf](https://github.com/databricks/spark-sql-perf)).

We use the KubedAI fork because it adds two features not present in the upstream Databricks repository:

- **Apache Iceberg support** — allows generating and registering TPC-DS tables in Iceberg format, which is
needed for Comet Iceberg benchmarking.
- **TPC-DS v4.0 queries** — adds the full set of TPC-DS v4.0 queries alongside the existing v1.4 and v2.4 sets.

The spark-sql-perf approach uses the TPC-DS `dsdgen` tool to generate data directly through Spark, which handles
partitioning and writing to Parquet format automatically.

## Prerequisites

- Java 17 (for Spark 3.5+)
- Apache Spark 3.5.x
- SBT (Scala Build Tool)
- C compiler toolchain (`gcc`, `make`, `flex`, `bison`, `byacc`)

## Step 1: Build tpcds-kit

The `dsdgen` tool from [databricks/tpcds-kit](https://github.com/databricks/tpcds-kit) is required for data
generation. This is a modified fork of the official TPC-DS toolkit that outputs to stdout, allowing Spark to
ingest the data directly.

**Linux (Ubuntu/Debian):**

```shell
sudo apt-get install -y gcc make flex bison byacc git
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=LINUX
```

**Linux (CentOS/RHEL/Amazon Linux):**

```shell
sudo yum install -y gcc make flex bison byacc git
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=LINUX
```

**macOS:**

```shell
xcode-select --install
git clone https://github.com/databricks/tpcds-kit.git
cd tpcds-kit/tools
make OS=MACOS
```

Verify the build succeeded:

```shell
ls -l dsdgen
```

## Step 2: Build spark-sql-perf

```shell
git clone https://github.com/KubedAI/spark-sql-perf.git
cd spark-sql-perf
git checkout support-iceberg-tpcds-v4.0
sbt package
```

This produces a JAR file at `target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar` (the exact version may
vary). Note the path to this JAR for later use.

## Step 3: Install and Start Spark

If you do not already have Spark installed:

```shell
export SPARK_VERSION=3.5.6
wget https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop3.tgz
tar xzf spark-$SPARK_VERSION-bin-hadoop3.tgz
sudo mv spark-$SPARK_VERSION-bin-hadoop3 /opt
export SPARK_HOME=/opt/spark-$SPARK_VERSION-bin-hadoop3/
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
mkdir -p /tmp/spark-events
```

Start Spark in standalone mode:

```shell
$SPARK_HOME/sbin/start-master.sh
export SPARK_MASTER=spark://$(hostname):7077
$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
```

## Step 4: Generate TPC-DS Data

Launch `spark-shell` with the spark-sql-perf JAR loaded:

```shell
$SPARK_HOME/bin/spark-shell \
--master $SPARK_MASTER \
--jars /path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=8 \
--conf spark.executor.memory=16g
```

In the Spark shell, run the following to generate data. Adjust `scaleFactor` and paths as needed:

```scala
import com.databricks.spark.sql.perf.tpcds.TPCDSTables

val tpcdsKit = "/path/to/tpcds-kit/tools"
val scaleFactor = "100" // 100 GB
val dataDir = "/path/to/tpcds-data"
val format = "parquet"
val numPartitions = 32 // adjust based on cluster size

val tables = new TPCDSTables(spark.sqlContext, dsdgenDir = tpcdsKit, scaleFactor = scaleFactor)
tables.genData(
location = dataDir,
format = format,
overwrite = true,
partitionTables = true,
clusterByPartitionColumns = true,
filterOutNullPartitionValues = false,
numPartitions = numPartitions
)
```

Data generation for SF100 typically takes 20-60 minutes depending on hardware. When complete, exit the shell:

```scala
:quit
```

Verify the data was generated:

```shell
ls /path/to/tpcds-data/
```

You should see directories for each TPC-DS table (`store_sales`, `catalog_sales`, `web_sales`, `customer`,
`date_dim`, etc.).

Set the `TPCDS_DATA` environment variable:

```shell
export TPCDS_DATA=/path/to/tpcds-data
```

## Step 5: Run TPC-DS Benchmarks

### Register Tables

Launch `spark-shell` with the spark-sql-perf JAR (same as Step 4) and register the generated data as tables:

```scala
import com.databricks.spark.sql.perf.tpcds.{TPCDS, TPCDSTables}

val scaleFactor = "100"
val dataDir = "/path/to/tpcds-data"
val format = "parquet"
val databaseName = "tpcds"

// Create database and register tables
sql(s"CREATE DATABASE IF NOT EXISTS $databaseName")
val tables = new TPCDSTables(spark.sqlContext, dsdgenDir = "", scaleFactor = scaleFactor)
tables.createExternalTables(
location = dataDir,
format = format,
databaseName = databaseName,
overwrite = true,
discoverPartitions = true
)

sql(s"USE $databaseName")
```

### Run Spark Baseline

```scala
val tpcds = new TPCDS(spark.sqlContext)

// Choose a query set: tpcds1_4Queries, tpcds2_4Queries, or tpcds4_0Queries
val queries = tpcds.tpcds2_4Queries

val experiment = tpcds.runExperiment(
executionsToRun = queries,
iterations = 3,
resultLocation = "/path/to/results/spark",
tags = Map("engine" -> "spark", "scale_factor" -> "100"),
forkThread = true
)

experiment.waitForFinish(86400)
```

Results are saved as JSON to the `resultLocation` path.

### Run with Comet

Build Comet from source and launch `spark-shell` with both the Comet and spark-sql-perf JARs:

```shell
make release
export COMET_JAR=$(pwd)/spark/target/comet-spark-spark3.5_2.12-*.jar

$SPARK_HOME/bin/spark-shell \
--master $SPARK_MASTER \
--jars /path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar,$COMET_JAR \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=8 \
--conf spark.executor.memory=8g \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=8g \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf spark.comet.enabled=true \
--conf spark.comet.exec.enabled=true \
--conf spark.comet.exec.all.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.columnar.shuffle.enabled=true
```

Then register tables and run the benchmark the same way as the Spark baseline, changing the tags and result
location:

```scala
val experiment = tpcds.runExperiment(
executionsToRun = queries,
iterations = 3,
resultLocation = "/path/to/results/comet",
tags = Map("engine" -> "comet", "scale_factor" -> "100"),
forkThread = true
)

experiment.waitForFinish(86400)
```

### View Results

Results are saved as JSON under the `resultLocation`. You can query them directly in Spark:

```scala
val results = spark.read.json("/path/to/results/spark")
results.select("name", "parsingTime", "analysisTime", "optimizationTime", "planningTime", "executionTime")
.withColumn("totalTime", (col("parsingTime") + col("analysisTime") +
col("optimizationTime") + col("planningTime") + col("executionTime")) / 1000.0)
.orderBy("name")
.show(200, false)
```

## Alternative: Command-Line Data Generation

You can also generate TPC-DS data without the Spark shell using `spark-submit`:

```shell
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--class com.databricks.spark.sql.perf.tpcds.GenTPCDSData \
--conf spark.driver.memory=8G \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=8 \
--conf spark.executor.memory=16g \
/path/to/spark-sql-perf/target/scala-2.12/spark-sql-perf_2.12-0.5.1-SNAPSHOT.jar \
-d /path/to/tpcds-kit/tools \
-s 100 \
-l /path/to/tpcds-data \
-f parquet
```

## Troubleshooting

### dsdgen not found

Ensure `tpcds-kit/tools/dsdgen` exists and is executable. The `dsdgenDir` parameter in the Spark shell (or `-d`
flag in the CLI) must point to the directory containing the `dsdgen` binary, not the binary itself.

### Out of memory during data generation

For large scale factors (SF1000+), increase executor memory and the number of partitions:

```shell
--conf spark.executor.memory=32g
```

And in the Spark shell, use a higher `numPartitions` value (e.g., 200+).