Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/download_maven_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get_worker_spark_session_for_docker(connection_types: set[str]) -> SparkSess
"""
from pyspark.sql import SparkSession

spark_builder = SparkSession.builder.appName("syncmaster_jar_downloader").master("local")
spark_builder = SparkSession.builder.appName("syncmaster_jar_downloader").master("local[1]")

for k, v in get_spark_session_conf_for_docker_image(connection_types).items():
spark_builder = spark_builder.config(k, v)
Expand Down
12 changes: 12 additions & 0 deletions docs/changelog/next_release/291.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Allow passing default Spark session config via worker settings:

.. code-block:: yaml
:caption: config.yml

worker:
spark_session_default_config:
spark.master: local
spark.driver.host: 127.0.0.1
spark.driver.bindAddress: 0.0.0.0
spark.sql.pyspark.jvmStacktrace.enabled: true
spark.ui.enabled: false
32 changes: 27 additions & 5 deletions docs/reference/worker/create_spark_session.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
.. _worker-create-spark-session:

Altering Spark session settings
===============================
Configuring Spark session
=========================

SyncMaster Worker creates `SparkSession <https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession>`_ for each Run.
By default, SparkSession is created with ``master=local``, all required .jar packages for specific DB/FileSystem types, and limiter by transfer resources.

It is possible to alter SparkSession config by providing custom function:
By default, SparkSession is created with ``master=local``, including all required .jar packages for DB/FileSystem types, and limited by transfer resources.

Custom Spark session configuration
----------------------------------

It is possible to alter default `Spark Session configuration <https://spark.apache.org/docs/latest/configuration.html>`_ worker settings:

.. code-block:: yaml
:caption: config.yml

worker:
spark_session_default_config:
spark.master: local
spark.driver.host: 127.0.0.1
spark.driver.bindAddress: 0.0.0.0
spark.sql.pyspark.jvmStacktrace.enabled: true
spark.ui.enabled: false

Custom Spark session factory
----------------------------

It is also possible to use custom function which returns ``SparkSession`` object:

.. code-block:: yaml
:caption: config.yml
Expand All @@ -21,17 +41,19 @@ Here is a function example:

from syncmaster.db.models import Run
from syncmaster.dto.connections import ConnectionDTO
from syncmaster.worker.settings import WorkerSettings
from pyspark.sql import SparkSession

def create_custom_spark_session(
run: Run,
source: ConnectionDTO,
target: ConnectionDTO,
settings: WorkerSettings,
) -> SparkSession:
# any custom code returning SparkSession object
return SparkSession.builde.config(...).getOrCreate()

Module with custom function should be placed in the same Docker image or Python virtual environment used by SyncMaster worker.
Module with custom function should be placed into the same Docker image or Python virtual environment used by SyncMaster worker.

.. note::

Expand Down
3 changes: 2 additions & 1 deletion syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ def __init__(
@slot
def perform_transfer(self) -> None:
try:
spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION(
spark = self.settings.worker.create_spark_session_function(
run=self.run,
source=self.source_handler.connection_dto,
target=self.target_handler.connection_dto,
settings=self.settings.worker,
)

with spark:
Expand Down
17 changes: 15 additions & 2 deletions syncmaster/worker/settings/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Any

from pydantic import BaseModel, Field
from pydantic.types import ImportString

Expand All @@ -24,14 +26,25 @@ class WorkerSettings(BaseModel):
:caption: config.yml

worker:
log_url_template: https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}
log_url_template: "https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}"

create_spark_session_function: custom_syncmaster.spark.get_worker_spark_session
spark_session_default_config:
spark.master: local
spark.driver.host: 127.0.0.1
spark.driver.bindAddress: 0.0.0.0
spark.sql.pyspark.jvmStacktrace.enabled: true
spark.ui.enabled: false
"""

CREATE_SPARK_SESSION_FUNCTION: ImportString = Field(
create_spark_session_function: ImportString = Field(
"syncmaster.worker.spark.get_worker_spark_session",
description="Function to create Spark session for worker",
)
spark_session_default_config: dict[str, Any] = Field(
default_factory=dict,
description="Default Spark session configuration",
)
log_url_template: str = Field(
"",
description=":ref:`URL template to access worker logs <worker-log-url>`",
Expand Down
9 changes: 7 additions & 2 deletions syncmaster/worker/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
HDFSConnectionDTO,
HiveConnectionDTO,
)
from syncmaster.worker.settings import WorkerSettings

if TYPE_CHECKING:
from pyspark.sql import SparkSession
Expand All @@ -25,14 +26,18 @@ def get_worker_spark_session(
run: Run,
source: ConnectionDTO,
target: ConnectionDTO,
settings: WorkerSettings,
) -> SparkSession:
"""Construct Spark Session using run parameters and application settings"""
from pyspark.sql import SparkSession

name = run.transfer.group.name + "_" + run.transfer.name # noqa: WPS336
spark_builder = SparkSession.builder.appName(f"syncmaster_{name}")
spark_builder = SparkSession.builder.appName(f"SyncMaster__{name}")

for k, v in get_spark_session_conf(source, target, run.transfer.resources).items():
spark_session_config = settings.spark_session_default_config.copy()
spark_session_config.update(get_spark_session_conf(source, target, run.transfer.resources))

for k, v in spark_session_config.items():
spark_builder = spark_builder.config(k, v)

for entity in source, target:
Expand Down
Loading