Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ ARG PYTHON_VERSION=3.13
FROM python:$PYTHON_VERSION-slim-bookworm AS base

RUN apt-get update && apt-get install -y --no-install-recommends \
tini \
openjdk-17-jdk-headless \
# required for HDFS/Hive with Kerberos enabled
krb5-user \
Expand Down
103 changes: 98 additions & 5 deletions docker/entrypoint_worker.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,101 @@
#!/usr/bin/env bash
set -e

# https://docs.celeryq.dev/en/stable/userguide/workers.html#max-tasks-per-child-setting
# Required to start each Celery task in separated process, avoiding issues with global Spark session object
# Based on https://github.com/apache/spark/blob/v3.5.7/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh

# exec is required to forward all signals to the main process
exec python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 "$@"
set -ex

if [ -z "$JAVA_HOME" ]; then
JAVA_HOME=$(java -XshowSettings:properties -version 2>&1 > /dev/null | grep 'java.home' | awk '{print $3}')
fi
if [ -z "$SPARK_HOME" ]; then
SPARK_HOME=$(python -c 'import pyspark; import pathlib; print(pathlib.Path(pyspark.__file__).parent.resolve())')
fi

SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"

env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > java_opts.txt
if [ "$(command -v readarray)" ]; then
readarray -t SPARK_EXECUTOR_JAVA_OPTS < java_opts.txt
else
SPARK_EXECUTOR_JAVA_OPTS=("${(@f)$(< java_opts.txt)}")
fi

if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
fi

if ! [ -z ${PYSPARK_PYTHON+x} ]; then
export PYSPARK_PYTHON
fi
if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then
export PYSPARK_DRIVER_PYTHON
fi

# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
# It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s.
if [ -n "${HADOOP_HOME}" ] && [ -z "${SPARK_DIST_CLASSPATH}" ]; then
export SPARK_DIST_CLASSPATH="$($HADOOP_HOME/bin/hadoop classpath)"
fi

if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
fi

if ! [ -z ${SPARK_CONF_DIR+x} ]; then
SPARK_CLASSPATH="$SPARK_CONF_DIR:$SPARK_CLASSPATH";
elif ! [ -z ${SPARK_HOME+x} ]; then
SPARK_CLASSPATH="$SPARK_HOME/conf:$SPARK_CLASSPATH";
fi

# Place IVY2 jars at the end of classpath to avoid conflicts with Spark and Hadoop jars
IVY2_HOME=$(realpath ~/.ivy2)
SPARK_CLASSPATH="$SPARK_CLASSPATH:${IVY2_HOME}/jars/*"

# SPARK-43540: add current working directory into executor classpath
SPARK_CLASSPATH="$SPARK_CLASSPATH:$PWD"

case "$1" in
driver)
shift 1
CMD=(
"$SPARK_HOME/bin/spark-submit"
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
--conf "spark.executorEnv.SPARK_DRIVER_POD_IP=$SPARK_DRIVER_BIND_ADDRESS"
--deploy-mode client
"$@"
)
;;
executor)
shift 1
CMD=(
${JAVA_HOME}/bin/java
"${SPARK_EXECUTOR_JAVA_OPTS[@]}"
-Xms$SPARK_EXECUTOR_MEMORY
-Xmx$SPARK_EXECUTOR_MEMORY
-cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBackend
--driver-url $SPARK_DRIVER_URL
--executor-id $SPARK_EXECUTOR_ID
--cores $SPARK_EXECUTOR_CORES
--app-id $SPARK_APPLICATION_ID
--hostname $SPARK_EXECUTOR_POD_IP
--resourceProfileId $SPARK_RESOURCE_PROFILE_ID
--podName $SPARK_EXECUTOR_POD_NAME
)
;;

*)
# https://docs.celeryq.dev/en/stable/userguide/workers.html#max-tasks-per-child-setting
# Required to start each Celery task in separated process, avoiding issues with global Spark session object
CMD=(
python -m celery
-A syncmaster.worker.celery
worker
--max-tasks-per-child=1
"$@"
)
;;
esac

# Execute the container CMD under tini for better hygiene
exec /usr/bin/tini -s -- "${CMD[@]}"
1 change: 1 addition & 0 deletions docs/changelog/next_release/295.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow using SyncMaster worker image as ``spark.kubernetes.container.image``.
14 changes: 14 additions & 0 deletions docs/reference/worker/create_spark_session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ It is possible to alter default `Spark Session configuration <https://spark.apac
spark.sql.pyspark.jvmStacktrace.enabled: true
spark.ui.enabled: false

For example, to use SyncMaster on Spark + Kubernetes, you can use worker image for Spark executor containers:

.. code-block:: yaml
:caption: config.yml

worker:
spark_session_default_config:
spark.master: k8s://https://kubernetes.default.svc
spark.driver.bindAddress: 0.0.0.0
spark.kubernetes.authenticate.driver.serviceAccountName: spark
spark.sql.pyspark.jvmStacktrace.enabled: true
spark.kubernetes.container.image: mtsrus/syncmaster-worker:{TAG}


Custom Spark session factory
----------------------------

Expand Down
8 changes: 5 additions & 3 deletions syncmaster/worker/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ def get_spark_session_conf(
if spark_master and spark_master.startswith("local"):
config["spark.master"] = f"local[{tasks}]"
config["spark.driver.memory"] = f"{memory_mb}M"
config["spark.default.parallelism"] = tasks * cores_per_task
else:
config["spark.executor.memory"] = f"{memory_mb}M"
config["spark.executor.cores"] = cores_per_task
config["spark.executor.instances"] = tasks
config["spark.dynamicAllocation.maxExecutors"] = tasks

config["spark.executor.cores"] = cores_per_task
config["spark.default.parallelism"] = tasks * cores_per_task
config["spark.dynamicAllocation.maxExecutors"] = tasks # yarn
config["spark.kubernetes.executor.limit.cores"] = cores_per_task # k8s

if maven_packages:
log.debug("Include Maven packages: %s", maven_packages)
Expand Down
Loading