From 2fc01f5ffb39c4db54c4b8561225025da7958370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 6 Nov 2025 16:01:13 +0300 Subject: [PATCH] [DOP-29498] Allow overriding Spark session config --- docker/download_maven_packages.py | 2 +- docs/changelog/next_release/291.feature.rst | 12 +++++++ .../reference/worker/create_spark_session.rst | 32 ++++++++++++++++--- syncmaster/worker/controller.py | 3 +- syncmaster/worker/settings/__init__.py | 17 ++++++++-- syncmaster/worker/spark.py | 9 ++++-- 6 files changed, 64 insertions(+), 11 deletions(-) create mode 100644 docs/changelog/next_release/291.feature.rst diff --git a/docker/download_maven_packages.py b/docker/download_maven_packages.py index 00da2a38..0401fe21 100755 --- a/docker/download_maven_packages.py +++ b/docker/download_maven_packages.py @@ -41,7 +41,7 @@ def get_worker_spark_session_for_docker(connection_types: set[str]) -> SparkSess """ from pyspark.sql import SparkSession - spark_builder = SparkSession.builder.appName("syncmaster_jar_downloader").master("local") + spark_builder = SparkSession.builder.appName("syncmaster_jar_downloader").master("local[1]") for k, v in get_spark_session_conf_for_docker_image(connection_types).items(): spark_builder = spark_builder.config(k, v) diff --git a/docs/changelog/next_release/291.feature.rst b/docs/changelog/next_release/291.feature.rst new file mode 100644 index 00000000..9ce43659 --- /dev/null +++ b/docs/changelog/next_release/291.feature.rst @@ -0,0 +1,12 @@ +Allow passing default Spark session config via worker settings: + +.. code-block:: yaml + :caption: config.yml + + worker: + spark_session_default_config: + spark.master: local + spark.driver.host: 127.0.0.1 + spark.driver.bindAddress: 0.0.0.0 + spark.sql.pyspark.jvmStacktrace.enabled: true + spark.ui.enabled: false diff --git a/docs/reference/worker/create_spark_session.rst b/docs/reference/worker/create_spark_session.rst index c4d47dde..b6383a19 100644 --- a/docs/reference/worker/create_spark_session.rst +++ b/docs/reference/worker/create_spark_session.rst @@ -1,12 +1,32 @@ .. _worker-create-spark-session: -Altering Spark session settings -=============================== +Configuring Spark session +========================= SyncMaster Worker creates `SparkSession `_ for each Run. -By default, SparkSession is created with ``master=local``, all required .jar packages for specific DB/FileSystem types, and limiter by transfer resources. -It is possible to alter SparkSession config by providing custom function: +By default, SparkSession is created with ``master=local``, including all required .jar packages for DB/FileSystem types, and limited by transfer resources. + +Custom Spark session configuration +---------------------------------- + +It is possible to alter default `Spark Session configuration `_ worker settings: + +.. code-block:: yaml + :caption: config.yml + + worker: + spark_session_default_config: + spark.master: local + spark.driver.host: 127.0.0.1 + spark.driver.bindAddress: 0.0.0.0 + spark.sql.pyspark.jvmStacktrace.enabled: true + spark.ui.enabled: false + +Custom Spark session factory +---------------------------- + +It is also possible to use custom function which returns ``SparkSession`` object: .. code-block:: yaml :caption: config.yml @@ -21,17 +41,19 @@ Here is a function example: from syncmaster.db.models import Run from syncmaster.dto.connections import ConnectionDTO + from syncmaster.worker.settings import WorkerSettings from pyspark.sql import SparkSession def create_custom_spark_session( run: Run, source: ConnectionDTO, target: ConnectionDTO, + settings: WorkerSettings, ) -> SparkSession: # any custom code returning SparkSession object return SparkSession.builde.config(...).getOrCreate() -Module with custom function should be placed in the same Docker image or Python virtual environment used by SyncMaster worker. +Module with custom function should be placed into the same Docker image or Python virtual environment used by SyncMaster worker. .. note:: diff --git a/syncmaster/worker/controller.py b/syncmaster/worker/controller.py index 10d3c71e..d068109c 100644 --- a/syncmaster/worker/controller.py +++ b/syncmaster/worker/controller.py @@ -203,10 +203,11 @@ def __init__( @slot def perform_transfer(self) -> None: try: - spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION( + spark = self.settings.worker.create_spark_session_function( run=self.run, source=self.source_handler.connection_dto, target=self.target_handler.connection_dto, + settings=self.settings.worker, ) with spark: diff --git a/syncmaster/worker/settings/__init__.py b/syncmaster/worker/settings/__init__.py index ccd6bb87..edbb0512 100644 --- a/syncmaster/worker/settings/__init__.py +++ b/syncmaster/worker/settings/__init__.py @@ -1,5 +1,7 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 +from typing import Any + from pydantic import BaseModel, Field from pydantic.types import ImportString @@ -24,14 +26,25 @@ class WorkerSettings(BaseModel): :caption: config.yml worker: - log_url_template: https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }} + log_url_template: "https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}" + create_spark_session_function: custom_syncmaster.spark.get_worker_spark_session + spark_session_default_config: + spark.master: local + spark.driver.host: 127.0.0.1 + spark.driver.bindAddress: 0.0.0.0 + spark.sql.pyspark.jvmStacktrace.enabled: true + spark.ui.enabled: false """ - CREATE_SPARK_SESSION_FUNCTION: ImportString = Field( + create_spark_session_function: ImportString = Field( "syncmaster.worker.spark.get_worker_spark_session", description="Function to create Spark session for worker", ) + spark_session_default_config: dict[str, Any] = Field( + default_factory=dict, + description="Default Spark session configuration", + ) log_url_template: str = Field( "", description=":ref:`URL template to access worker logs `", diff --git a/syncmaster/worker/spark.py b/syncmaster/worker/spark.py index 8f7c1e98..406b563a 100644 --- a/syncmaster/worker/spark.py +++ b/syncmaster/worker/spark.py @@ -14,6 +14,7 @@ HDFSConnectionDTO, HiveConnectionDTO, ) +from syncmaster.worker.settings import WorkerSettings if TYPE_CHECKING: from pyspark.sql import SparkSession @@ -25,14 +26,18 @@ def get_worker_spark_session( run: Run, source: ConnectionDTO, target: ConnectionDTO, + settings: WorkerSettings, ) -> SparkSession: """Construct Spark Session using run parameters and application settings""" from pyspark.sql import SparkSession name = run.transfer.group.name + "_" + run.transfer.name # noqa: WPS336 - spark_builder = SparkSession.builder.appName(f"syncmaster_{name}") + spark_builder = SparkSession.builder.appName(f"SyncMaster__{name}") - for k, v in get_spark_session_conf(source, target, run.transfer.resources).items(): + spark_session_config = settings.spark_session_default_config.copy() + spark_session_config.update(get_spark_session_conf(source, target, run.transfer.resources)) + + for k, v in spark_session_config.items(): spark_builder = spark_builder.config(k, v) for entity in source, target: