From 96cba648a9514901d05775779fa793b076e142ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Fri, 26 Dec 2025 19:41:43 +0300 Subject: [PATCH] Switch from flake8 to ruff --- .github/workflows/codeql-analysis.yml | 4 - .pre-commit-config.yaml | 25 +- docker/download_ivy2_packages.py | 2 +- pyproject.toml | 251 +++--------------- syncmaster/__init__.py | 2 +- syncmaster/db/migrations/env.py | 12 +- syncmaster/db/models/base.py | 2 +- syncmaster/db/repositories/base.py | 5 +- syncmaster/db/repositories/connection.py | 2 +- syncmaster/db/repositories/group.py | 2 +- .../db/repositories/repository_with_owner.py | 7 +- syncmaster/db/repositories/run.py | 6 +- syncmaster/db/repositories/search.py | 2 +- syncmaster/db/repositories/transfer.py | 17 +- syncmaster/db/repositories/user.py | 4 +- syncmaster/dto/transfers.py | 4 +- syncmaster/exceptions/redirect.py | 2 +- syncmaster/scheduler/__init__.py | 5 +- syncmaster/scheduler/__main__.py | 2 +- syncmaster/scheduler/transfer_job_manager.py | 2 +- .../schemas/v1/connections/clickhouse.py | 2 +- syncmaster/schemas/v1/connections/ftp.py | 2 +- syncmaster/schemas/v1/connections/ftps.py | 2 +- syncmaster/schemas/v1/connections/iceberg.py | 8 +- syncmaster/schemas/v1/connections/mssql.py | 2 +- syncmaster/schemas/v1/connections/mysql.py | 2 +- syncmaster/schemas/v1/connections/oracle.py | 2 +- syncmaster/schemas/v1/connections/postgres.py | 2 +- syncmaster/schemas/v1/connections/s3.py | 4 +- syncmaster/schemas/v1/connections/samba.py | 2 +- syncmaster/schemas/v1/connections/sftp.py | 2 +- syncmaster/schemas/v1/connections/webdav.py | 2 +- syncmaster/schemas/v1/groups.py | 2 +- syncmaster/schemas/v1/queue.py | 2 +- syncmaster/schemas/v1/transfers/resources.py | 10 +- syncmaster/schemas/v1/types.py | 2 +- syncmaster/server/handler.py | 10 +- syncmaster/server/middlewares/openapi.py | 2 +- .../server/providers/auth/base_provider.py | 2 +- .../server/providers/auth/dummy_provider.py | 4 +- .../providers/auth/keycloak_provider.py | 10 +- .../providers/auth/oauth2_gateway_provider.py | 29 +- syncmaster/server/services/get_user.py | 18 +- syncmaster/settings/logging.py | 6 +- syncmaster/worker/controller.py | 6 +- syncmaster/worker/handlers/file/base.py | 6 +- syncmaster/worker/handlers/file/ftp.py | 2 +- syncmaster/worker/handlers/file/ftps.py | 2 +- syncmaster/worker/handlers/file/hdfs.py | 2 +- syncmaster/worker/handlers/file/local_df.py | 31 ++- syncmaster/worker/handlers/file/remote_df.py | 8 +- syncmaster/worker/handlers/file/s3.py | 6 +- syncmaster/worker/handlers/file/samba.py | 2 +- syncmaster/worker/handlers/file/sftp.py | 2 +- syncmaster/worker/ivy2.py | 4 +- syncmaster/worker/settings/__init__.py | 2 +- syncmaster/worker/spark.py | 6 +- .../file_df_connection/generate_data.py | 3 +- .../file_df_connection/generate_files.py | 132 ++++----- tests/spark/__init__.py | 28 +- tests/test_integration/celery_test.py | 7 +- .../connection_fixtures/dataframe_fixtures.py | 10 +- .../auth_fixtures/keycloak_fixture.py | 2 +- tests/utils.py | 5 +- uv.lock | 56 ---- 65 files changed, 284 insertions(+), 525 deletions(-) mode change 100644 => 100755 tests/resources/file_df_connection/generate_data.py mode change 100644 => 100755 tests/resources/file_df_connection/generate_files.py diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 14158be1..37b16adc 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -14,7 +14,6 @@ concurrency: cancel-in-progress: true env: - # flake8-commas is failing on Python 3.12 DEFAULT_PYTHON: '3.13' jobs: @@ -55,9 +54,6 @@ jobs: export UV=$(which uv) make venv-install - - name: Run flake8 - run: uv run flake8 syncmaster/ - - name: Run mypy run: uv run mypy --config-file ./pyproject.toml ./syncmaster/server diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f670f4ce..9b297f89 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -37,13 +37,13 @@ repos: - id: chmod args: ['644'] exclude_types: [shell] - exclude: ^(.*__main__\.py|syncmaster/server/scripts/.*\.py|docker/.*\.py)$ + exclude: ^(.*__main__\.py|syncmaster/server/scripts/.*\.py|docker/.*\.py|tests/resources/file_df_connection/generate.*.py)$ - id: chmod args: ['755'] types: [shell] - id: chmod args: ['755'] - files: ^(.*__main__\.py|syncmaster/server/scripts/.*\.py|docker/.*\.py)$ + files: ^(.*__main__\.py|syncmaster/server/scripts/.*\.py|docker/.*\.py|tests/resources/file_df_connection/generate.*.py)$ - id: insert-license types: [python] exclude: ^(syncmaster/server/dependencies/stub.py|docs/.*\.py|tests/.*\.py)$ @@ -59,31 +59,15 @@ repos: - id: pyupgrade args: [--py311-plus, --keep-runtime-typing] - - repo: https://github.com/pycqa/bandit - rev: 1.9.2 - hooks: - - id: bandit - args: - - --aggregate=file - - -iii - - -ll - require_serial: true - - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.14.10 hooks: - id: ruff-format + - id: ruff-check + args: [--fix] - repo: local hooks: - - id: flake8 - name: flake8 - entry: flake8 - language: python - types: [python] - files: ^syncmaster/.*$ - pass_filenames: true - - id: mypy name: mypy entry: mypy ./syncmaster/server --config-file ./pyproject.toml @@ -115,7 +99,6 @@ repos: ci: skip: - - flake8 # checked with Github Actions - mypy # checked with Github Actions - chmod # failing in pre-commit.ci - docker-compose-check # cannot run on pre-commit.ci diff --git a/docker/download_ivy2_packages.py b/docker/download_ivy2_packages.py index 27392f94..31b95bdd 100755 --- a/docker/download_ivy2_packages.py +++ b/docker/download_ivy2_packages.py @@ -39,7 +39,7 @@ def get_worker_spark_session_for_docker(connection_types: set[str]) -> SparkSess Construct dummy Spark session with all .jars included. Designed to be used in Dockerfile.worker to populate the image. """ - from pyspark.sql import SparkSession + from pyspark.sql import SparkSession # noqa: PLC0415 spark_builder = SparkSession.builder.appName("syncmaster_jar_downloader").master("local[1]") diff --git a/pyproject.toml b/pyproject.toml index d46bdf10..cf720ee0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,8 +108,6 @@ test = [ dev = [ "mypy~=1.19.1", "pre-commit~=4.5.0", - "flake8~=7.3.0", - "flake8-pyproject~=1.2.3", "sqlalchemy[mypy]~=2.0.44", "types-jwcrypto~=1.5.0", ] @@ -137,6 +135,44 @@ target-version = "py311" line-length = 120 extend-exclude = ["docs/", "Makefile"] +[tool.ruff.lint] +select = ["ALL"] +ignore = ["ARG", "ANN", "A002", "COM812", "D", "TD", "FIX002"] + +[tool.ruff.lint.per-file-ignores] +"syncmaster/db/migrations/versions/2025-08-10_0012_update_ts.py" = ["E501"] +"syncmaster/db/migrations/*" = ["N999"] +"syncmaster/db/models/*" = ["TC"] +"syncmaster/db/repositories/*" = ["TC", "PLR0913"] +"syncmaster/schemas/*" = ["TC"] + +"tests/*" = [ + "S", + "A", + "E501", + "FBT001", + "FBT002", + "PLR2004", + "PLR0913", + "SLF001", + "FIX002", + "PERF401", + "RET504", + "PLC0415", + "C408", +] +"tests/resources/file_df_connection/test_data.py" = ["RUF001"] + +[tool.ruff.lint.flake8-pytest-style] +parametrize-names-type = "list" +parametrize-values-type = "list" +parametrize-values-row-type = "tuple" + +[tool.ruff.lint.flake8-quotes] +inline-quotes = "double" +multiline-quotes = "double" +docstring-quotes = "double" + [tool.mypy] python_version = "3.11" plugins = ["pydantic.mypy", "sqlalchemy.ext.mypy.plugin"] @@ -240,217 +276,6 @@ exclude_lines = [ "def downgrade\\(\\)", ] -[tool.flake8] -jobs = 4 -# We don't control ones who use our code -i-control-code = false -# Max of noqa in a module -max-noqa-comments = 10 -max-annotation-complexity = 4 -max-returns = 5 -max-awaits = 5 -max-local-variables = 20 -max-name-length = 65 -# Max of expressions in a function -max-expressions = 15 -# Max args in a function -max-arguments = 15 -# Max classes and functions in a single module -max-module-members = 35 -max-import-from-members = 35 -max-methods = 25 -# Max line complexity measured in AST nodes -max-line-complexity = 24 -# Max Jones Score for a module: the median of all lines complexity sum -max-jones-score = 15 -# Max amount of cognitive complexity per function -max-cognitive-score = 20 -# Max amount of cognitive complexity per module -max-cognitive-average = 25 -max-imports = 25 -max-imported-names = 50 -# Max of expression usages in a module -max-module-expressions = 15 -# Max of expression usages in a function -max-function-expressions = 15 -max-base-classes = 5 -max-decorators = 6 -# Max of repeated string constants in your modules -max-string-usages = 15 -max-try-body-length = 15 -max-asserts = 15 -# Max number of access level in an expression -max-access-level = 6 -# maximum number of public instance attributes -max-attributes = 20 - -max-line-length = 120 -max-doc-length = 120 - -# https://pypi.org/project/flake8-quotes/ -inline-quotes = "double" -multiline-quotes = "double" -docstring-quotes = "double" - -# https://wemake-python-stylegui.de/en/latest/pages/usage/formatter.html -show-source = true -# Print total number of errors -count = true -statistics = true - -exclude = [ - ".tox", - "migrations", - "dist", - "build", - "hadoop_archive_plugin", - "virtualenv", - "venv", - "venv36", - "ve", - ".venv", - "tox.ini", - "docker", - "Jenkinsfile", - "dags", - "setup.py", - "docs" -] -rst-directives = [ - "autosummary", - "data", - "currentmodule", - "deprecated", - "glossary", - "moduleauthor", - "plot", - "testcode", - "versionadded", - "versionchanged" -] -# https://github.com/peterjc/flake8-rst-docstrings/pull/16 -rst-roles = [ - "attr", - "class", - "func", - "meth", - "mod", - "obj", - "ref", - "term", - "py:func", - "py:mod" -] - -ignore = [ - "ANN", - "FI1", -# Found upper-case constant in a class: DB_URL - "WPS115", -# Found statement that has no effect - "WPS428", -# Found `f` string [opinionated] - "WPS305", -# Found class without a base class (goes against PEP8) [opinionated] - "WPS306", -# Found line break before binary operator [goes against PEP8] [opinionated] - "W503", -# Found multiline conditions [opinionated] - "WPS337", -# Found mutable module constant [opinionated] - "WPS407", -# Found empty module: - "WPS411", -# Found nested import [opinionated] - "WPS433", -# Found negated condition [opinionated] - "WPS504", -# Found implicit `.get()` dict usage - "WPS529", -# Docstrings [opinionated] - "D", -# string does contain unindexed parameters' - "P101", - "P103", -# Found implicit string concatenation [optional] - "WPS326", -# Found wrong function call: locals' - "WPS421", -# module level import not at top of file - "E402", -# Document or section may not begin with a transition. - "RST399", -# Error in "code" directive - "RST307", -# Found `%` string formatting - "WPS323", -# doc line too long - "W505", -# line too long - "E501", -# Found wrong module name: util - "WPS100", -# Found wrong keyword: pass - "WPS420", -# Found incorrect node inside `class` body: pass - "WPS604", -# Found wrong variable name: data - "WPS110", -# Found builtin shadowing: id - "WPS125", -# Found too short name: e < 2 - "WPS111", -# Found a line that starts with a dot - "WPS348", -# first argument of a method should be named 'self' - "N805", -# Found `finally` in `try` block without `except` - "WPS501", -# Wrong multiline string usage: textwrap.dedent + multiline comment - "WPS462", -# Found incorrect multi-line string: 3-quoted docstring with just one line - "WPS322", -# https://github.com/wemake-services/wemake-python-styleguide/issues/2847 -# E704 multiple statements on one line (def) - "E704", -# WPS220 Found too deep nesting: 34 > 20 - "WPS220", -# WPS412 Found `__init__.py` module with logic - "WPS412", -# WPS410 Found wrong metadata variable: __all__ - "WPS410", -] - -per-file-ignores = [ -# WPS102 Found incorrect module name pattern -# WPS432 Found magic number: 256 -# WPS226 Found string literal over-use: value > 15 -# WPS342 Found implicit raw string - "*migrations/*.py:WPS102,WPS432,WPS226,WPS342", - "*db/models/*.py:WPS102,WPS432,WPS342", - "*db/mixins/*.py:WPS102,WPS432", -# WPS432 Found magic number: 180 - "*settings/*.py:WPS432", -# WPS404 Found complex default value - "*server/api/*.py:WPS404", -# WPS237 Found a too complex `f` string - "*exceptions/*.py:WPS237", - "*exceptions/__init__.py:F40,WPS410", -# WPS201 Found module with too many imports: 30 > 25 -# WPS203 Found module with too many imported names: 55 > 50 - "syncmaster/worker/controller.py:WPS201,WPS203", -# TAE001 too few type annotations -# WPS231 Found function with too much cognitive complexity -# S101 Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. -# WPS442 Found outer scope names shadowing -# WPS432 Found magic number -# WPS334 Found reversed complex comparison -# WPS218 Found too many `assert` statements: 19 > 15 -# WPS226 Found string literal over-use: value > 15 -# WPS118 Found too long name: - "*tests/*.py:TAE001,WPS231,S101,WPS442,WPS432,WPS334,WPS218,WPS226,WPS118", -] - [tool.towncrier] name = "Syncmaster" package = "syncmaster" diff --git a/syncmaster/__init__.py b/syncmaster/__init__.py index 7c55c3ff..4e078b20 100644 --- a/syncmaster/__init__.py +++ b/syncmaster/__init__.py @@ -6,6 +6,6 @@ VERSION_FILE = Path(__file__).parent / "VERSION" _raw_version = VERSION_FILE.read_text().strip() # version always contain only release number like 0.0.1 -__version__ = ".".join(_raw_version.split(".")[:3]) # noqa: WPS410 +__version__ = ".".join(_raw_version.split(".")[:3]) # version tuple always contains only integer parts, like (0, 0, 1) __version_tuple__ = tuple(map(int, __version__.split("."))) # noqa: RUF048 diff --git a/syncmaster/db/migrations/env.py b/syncmaster/db/migrations/env.py index 23ada238..6ad72321 100644 --- a/syncmaster/db/migrations/env.py +++ b/syncmaster/db/migrations/env.py @@ -1,8 +1,8 @@ # SPDX-FileCopyrightText: 2023-present MTS PJSC # SPDX-License-Identifier: Apache-2.0 import asyncio -import os from logging.config import fileConfig +from pathlib import Path from alembic import context from alembic.script import ScriptDirectory @@ -46,13 +46,11 @@ def get_next_revision_id(): script_directory = ScriptDirectory.from_config(context.config) versions_path = script_directory.versions - existing_filenames = os.listdir(versions_path) existing_ids = [] - - for filename in existing_filenames: + for file in Path(versions_path).iterdir(): # Assuming filename format: YYYY-MM-DD_XXXX_slug.py - parts = filename.split("_") - if len(parts) >= 2: + parts = file.name.split("_") + if len(parts) > 1: id_part = parts[1] try: id_num = int(id_part) @@ -60,7 +58,7 @@ def get_next_revision_id(): except ValueError: pass - if existing_ids: + if existing_ids: # noqa: SIM108 next_id = max(existing_ids) + 1 else: next_id = 1 diff --git a/syncmaster/db/models/base.py b/syncmaster/db/models/base.py index beaad85c..6930e2ff 100644 --- a/syncmaster/db/models/base.py +++ b/syncmaster/db/models/base.py @@ -23,6 +23,6 @@ class Base(DeclarativeBase): metadata = model_metadata @declared_attr - def __tablename__(cls) -> str: + def __tablename__(cls) -> str: # noqa: N805 name_list = re.findall(r"[A-Z][a-z\d]*", cls.__name__) return "_".join(name_list).lower() diff --git a/syncmaster/db/repositories/base.py b/syncmaster/db/repositories/base.py index b4b296d1..24db9fb7 100644 --- a/syncmaster/db/repositories/base.py +++ b/syncmaster/db/repositories/base.py @@ -23,7 +23,7 @@ Model = TypeVar("Model", bound=Base) -class Repository(Generic[Model], ABC): +class Repository(ABC, Generic[Model]): def __init__(self, model: type[Model], session: AsyncSession): self._model = model self._session = session @@ -108,9 +108,8 @@ async def _paginate_scalar_result(self, query: Select, page: int, page_size: int ) def _construct_vector_search(self, query: Select, ts_query: ColumnElement) -> Select: - query = ( + return ( query.where(self._model.search_vector.op("@@")(ts_query)) .add_columns(func.ts_rank(self._model.search_vector, ts_query).label("rank")) .order_by(func.ts_rank(self._model.search_vector, ts_query).desc()) ) - return query diff --git a/syncmaster/db/repositories/connection.py b/syncmaster/db/repositories/connection.py index bf80b9ca..2b80d820 100644 --- a/syncmaster/db/repositories/connection.py +++ b/syncmaster/db/repositories/connection.py @@ -130,7 +130,7 @@ async def copy( except IntegrityError as integrity_error: self._raise_error(integrity_error) - def _raise_error(self, err: DBAPIError) -> NoReturn: # noqa: WPS238 + def _raise_error(self, err: DBAPIError) -> NoReturn: constraint = err.__cause__.__cause__.constraint_name if constraint == "fk__connection__group_id__group": raise GroupNotFoundError from err diff --git a/syncmaster/db/repositories/group.py b/syncmaster/db/repositories/group.py index 641b22a7..f0559c1b 100644 --- a/syncmaster/db/repositories/group.py +++ b/syncmaster/db/repositories/group.py @@ -363,7 +363,7 @@ async def delete_user( await self._session.delete(user_group) await self._session.flush() - def _raise_error(self, err: DBAPIError) -> NoReturn: # noqa: WPS238 + def _raise_error(self, err: DBAPIError) -> NoReturn: constraint = err.__cause__.__cause__.constraint_name if constraint == "fk__group__owner_id__user": diff --git a/syncmaster/db/repositories/repository_with_owner.py b/syncmaster/db/repositories/repository_with_owner.py index 3b95fe11..fa671d1b 100644 --- a/syncmaster/db/repositories/repository_with_owner.py +++ b/syncmaster/db/repositories/repository_with_owner.py @@ -13,7 +13,7 @@ class RepositoryWithOwner(Repository, Generic[Model]): - async def get_resource_permission(self, user: User, resource_id: int) -> Permission: # noqa: WPS212 + async def get_resource_permission(self, user: User, resource_id: int) -> Permission: # noqa: PLR0911 """Method for determining CRUD rights in a repository (self.model) for a resource""" is_exists = await self._session.get(self._model, resource_id) @@ -68,7 +68,7 @@ async def get_resource_permission(self, user: User, resource_id: int) -> Permiss return Permission.DELETE # Maintainer - async def get_group_permission(self, user: User, group_id: int) -> Permission: # noqa: WPS212 + async def get_group_permission(self, user: User, group_id: int) -> Permission: """Method for determining CRUD permissions in the specified group""" owner_query = ( ( @@ -101,8 +101,7 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: # If the user is not in the group, then he is either a superuser or does not have any rights if not user.is_superuser: return Permission.NONE - else: - return Permission.DELETE + return Permission.DELETE group_role = user_group.role diff --git a/syncmaster/db/repositories/run.py b/syncmaster/db/repositories/run.py index 41a52f52..da76e416 100644 --- a/syncmaster/db/repositories/run.py +++ b/syncmaster/db/repositories/run.py @@ -98,7 +98,7 @@ async def read_full_serialized_transfer( ) transfer = transfer.one() - return dict( + return dict( # noqa: C408 id=transfer.id, name=transfer.name, group_id=transfer.group_id, @@ -111,7 +111,7 @@ async def read_full_serialized_transfer( source_params=transfer.source_params, target_params=transfer.target_params, strategy_params=transfer.strategy_params, - source_connection=dict( + source_connection=dict( # noqa: C408 id=transfer.source_connection.id, group_id=transfer.source_connection.group_id, name=transfer.source_connection.name, @@ -119,7 +119,7 @@ async def read_full_serialized_transfer( data=transfer.source_connection.data, auth_data=source_creds["auth_data"], ), - target_connection=dict( + target_connection=dict( # noqa: C408 id=transfer.target_connection.id, group_id=transfer.target_connection.group_id, name=transfer.target_connection.name, diff --git a/syncmaster/db/repositories/search.py b/syncmaster/db/repositories/search.py index 212c75a9..cd09d3f9 100644 --- a/syncmaster/db/repositories/search.py +++ b/syncmaster/db/repositories/search.py @@ -44,7 +44,7 @@ def make_tsquery(user_input: str) -> ColumnElement: simple_query = func.to_tsquery("simple", build_tsquery(user_input)) stemmed_query = func.plainto_tsquery("russian", user_input) combined_query = simple_query.op("||")(stemmed_query) - return combined_query + return combined_query # noqa: RET504 def ts_match(search_vector: InstrumentedAttribute, ts_query: ColumnElement) -> ColumnElement: diff --git a/syncmaster/db/repositories/transfer.py b/syncmaster/db/repositories/transfer.py index 430e9582..191bb4da 100644 --- a/syncmaster/db/repositories/transfer.py +++ b/syncmaster/db/repositories/transfer.py @@ -39,7 +39,7 @@ async def paginate( queue_id: int | None = None, source_connection_type: list[str] | None = None, target_connection_type: list[str] | None = None, - is_scheduled: bool | None = None, + is_scheduled: bool | None = None, # noqa: FBT001 ) -> Pagination: stmt = select(Transfer).where( Transfer.group_id == group_id, @@ -61,8 +61,8 @@ async def paginate( if is_scheduled is not None: stmt = stmt.where(Transfer.is_scheduled == is_scheduled) - SourceConnection = aliased(Connection) - TargetConnection = aliased(Connection) + SourceConnection = aliased(Connection) # noqa: N806 + TargetConnection = aliased(Connection) # noqa: N806 if source_connection_type is not None: stmt = stmt.join( @@ -118,7 +118,7 @@ async def create( transformations: list[dict[str, Any]], resources: dict[str, Any], queue_id: int, - is_scheduled: bool, + is_scheduled: bool, # noqa: FBT001 schedule: str | None, ) -> Transfer: query = ( @@ -160,7 +160,7 @@ async def update( strategy_params: dict[str, Any], transformations: list[dict[str, Any]], resources: dict[str, Any], - is_scheduled: bool, + is_scheduled: bool, # noqa: FBT001 schedule: str | None, queue_id: int, ) -> Transfer: @@ -202,16 +202,15 @@ async def copy( new_name: str | None, ) -> Transfer: try: - kwargs = dict( + kwargs = dict( # noqa: C408 group_id=new_group_id, source_connection_id=new_source_connection, target_connection_id=new_target_connection, queue_id=new_queue_id, name=new_name, ) - new_transfer = await self._copy(Transfer.id == transfer_id, **kwargs) - return new_transfer + return await self._copy(Transfer.id == transfer_id, **kwargs) except IntegrityError as integrity_error: self._raise_error(integrity_error) @@ -225,7 +224,7 @@ async def list_by_connection_id(self, conn_id: int) -> Sequence[Transfer]: result = await self._session.scalars(query) return result.fetchall() - def _raise_error(self, err: DBAPIError) -> NoReturn: # noqa: WPS238 + def _raise_error(self, err: DBAPIError) -> NoReturn: constraint = err.__cause__.__cause__.constraint_name if constraint == "fk__transfer__group_id__group": raise GroupNotFoundError from err diff --git a/syncmaster/db/repositories/user.py b/syncmaster/db/repositories/user.py index a46c4e23..93fb9f9d 100644 --- a/syncmaster/db/repositories/user.py +++ b/syncmaster/db/repositories/user.py @@ -21,7 +21,7 @@ async def paginate( self, page: int, page_size: int, - is_superuser: bool, + is_superuser: bool, # noqa: FBT001 search_query: str | None = None, ) -> Pagination: stmt = select(User) @@ -62,7 +62,7 @@ async def create( first_name: str | None = None, middle_name: str | None = None, last_name: str | None = None, - is_superuser: bool = False, + is_superuser: bool = False, # noqa: FBT001, FBT002 ) -> User: query = ( insert(User) diff --git a/syncmaster/dto/transfers.py b/syncmaster/dto/transfers.py index 69e51fab..a6cb0830 100644 --- a/syncmaster/dto/transfers.py +++ b/syncmaster/dto/transfers.py @@ -63,7 +63,7 @@ def __post_init__(self): self.options.setdefault("if_exists", "replace_overlapping_partitions") @staticmethod - def _rewrite_option_name(file_format: dict, from_name: str, to_name: str): # noqa: WPS602 + def _rewrite_option_name(file_format: dict, from_name: str, to_name: str): if from_name in file_format: file_format[to_name] = file_format.pop(from_name) @@ -132,7 +132,7 @@ def __post_init__(self): @dataclass class IcebergTransferDTO(DBTransferDTO): type: ClassVar[str] = "iceberg" - catalog_name: str = field(default_factory=lambda: f"iceberg_{uuid4().hex[:8]}") # noqa: WPS237 + catalog_name: str = field(default_factory=lambda: f"iceberg_{uuid4().hex[:8]}") def __post_init__(self): super().__post_init__() diff --git a/syncmaster/exceptions/redirect.py b/syncmaster/exceptions/redirect.py index 4d94d0b1..aa0e6034 100644 --- a/syncmaster/exceptions/redirect.py +++ b/syncmaster/exceptions/redirect.py @@ -4,6 +4,6 @@ from syncmaster.exceptions.base import SyncmasterError -class RedirectException(SyncmasterError): +class RedirectError(SyncmasterError): def __init__(self, redirect_url: str): self.redirect_url = redirect_url diff --git a/syncmaster/scheduler/__init__.py b/syncmaster/scheduler/__init__.py index c973c23e..0e9d24bf 100644 --- a/syncmaster/scheduler/__init__.py +++ b/syncmaster/scheduler/__init__.py @@ -6,9 +6,8 @@ def celery_factory(settings: SchedulerAppSettings) -> Celery: - app = Celery( + return Celery( __name__, broker=settings.broker.url, - backend="db+" + settings.database.sync_url, # noqa: WPS336 + backend="db+" + settings.database.sync_url, ) - return app diff --git a/syncmaster/scheduler/__main__.py b/syncmaster/scheduler/__main__.py index 8dc57670..687119d4 100755 --- a/syncmaster/scheduler/__main__.py +++ b/syncmaster/scheduler/__main__.py @@ -19,7 +19,7 @@ async def main(): transfer_job_manager = TransferJobManager(settings) transfer_job_manager.scheduler.start() - while True: # noqa: WPS457 + while True: logger.info("Looking at the transfer table...") await transfer_job_manager.remove_orphan_jobs() diff --git a/syncmaster/scheduler/transfer_job_manager.py b/syncmaster/scheduler/transfer_job_manager.py index a212da8a..66472ca0 100644 --- a/syncmaster/scheduler/transfer_job_manager.py +++ b/syncmaster/scheduler/transfer_job_manager.py @@ -68,7 +68,7 @@ async def remove_orphan_jobs(self) -> None: self.scheduler.remove_job(str(job_id)) @staticmethod - async def send_job_to_celery(transfer_id: int) -> None: # noqa: WPS602, WPS217 + async def send_job_to_celery(transfer_id: int) -> None: """ 1. Do not pass additional arguments like settings, otherwise they will be serialized in jobs table. diff --git a/syncmaster/schemas/v1/connections/clickhouse.py b/syncmaster/schemas/v1/connections/clickhouse.py index 3cdc5dcf..476f1915 100644 --- a/syncmaster/schemas/v1/connections/clickhouse.py +++ b/syncmaster/schemas/v1/connections/clickhouse.py @@ -16,7 +16,7 @@ class ClickhouseConnectionDataSchema(BaseModel): host: str - port: int = Field(default=8123, gt=0, le=65535) # noqa: WPS432 + port: int = Field(default=8123, gt=0, le=65535) database_name: str | None = None additional_params: dict = Field(default_factory=dict) diff --git a/syncmaster/schemas/v1/connections/ftp.py b/syncmaster/schemas/v1/connections/ftp.py index a3ef2f65..ef9c758d 100644 --- a/syncmaster/schemas/v1/connections/ftp.py +++ b/syncmaster/schemas/v1/connections/ftp.py @@ -17,7 +17,7 @@ class FTPConnectionDataSchema(BaseModel): host: str - port: int = Field(default=21, gt=0, le=65535) # noqa: WPS432 + port: int = Field(default=21, gt=0, le=65535) class CreateFTPConnectionSchema(CreateConnectionBaseSchema): diff --git a/syncmaster/schemas/v1/connections/ftps.py b/syncmaster/schemas/v1/connections/ftps.py index 3a89676a..c243490b 100644 --- a/syncmaster/schemas/v1/connections/ftps.py +++ b/syncmaster/schemas/v1/connections/ftps.py @@ -17,7 +17,7 @@ class FTPSConnectionDataSchema(BaseModel): host: str - port: int = Field(default=21, gt=0, le=65535) # noqa: WPS432 + port: int = Field(default=21, gt=0, le=65535) class CreateFTPSConnectionSchema(CreateConnectionBaseSchema): diff --git a/syncmaster/schemas/v1/connections/iceberg.py b/syncmaster/schemas/v1/connections/iceberg.py index 7abae946..8918995a 100644 --- a/syncmaster/schemas/v1/connections/iceberg.py +++ b/syncmaster/schemas/v1/connections/iceberg.py @@ -28,7 +28,7 @@ class IcebergRESTCatalogS3DirectConnectionDataSchema(BaseModel): rest_catalog_url: URL s3_warehouse_path: str s3_host: str - s3_port: int | None = Field(default=None, gt=0, le=65535) # noqa: WPS432 + s3_port: int | None = Field(default=None, gt=0, le=65535) s3_protocol: Literal["http", "https"] = "https" s3_bucket: str s3_region: str @@ -81,9 +81,11 @@ def connection_and_auth_data_match(self): if not self.auth_data: return self if self.data.type == "iceberg_rest_s3_direct" and "s3" not in self.auth_data.type: - raise ValueError("Cannot create direct S3 connection without S3 credentials") + msg = "Cannot create direct S3 connection without S3 credentials" + raise ValueError(msg) if self.data.type == "iceberg_rest_s3_delegated" and "s3" in self.auth_data.type: - raise ValueError("Cannot create delegated S3 connection with S3 credentials") + msg = "Cannot create delegated S3 connection with S3 credentials" + raise ValueError(msg) return self diff --git a/syncmaster/schemas/v1/connections/mssql.py b/syncmaster/schemas/v1/connections/mssql.py index 47946875..0f82da54 100644 --- a/syncmaster/schemas/v1/connections/mssql.py +++ b/syncmaster/schemas/v1/connections/mssql.py @@ -16,7 +16,7 @@ class MSSQLConnectionDataSchema(BaseModel): host: str - port: int = Field(default=1433, gt=0, le=65535) # noqa: WPS432 + port: int = Field(default=1433, gt=0, le=65535) database_name: str additional_params: dict = Field(default_factory=dict) diff --git a/syncmaster/schemas/v1/connections/mysql.py b/syncmaster/schemas/v1/connections/mysql.py index 8278ef53..ceab07a8 100644 --- a/syncmaster/schemas/v1/connections/mysql.py +++ b/syncmaster/schemas/v1/connections/mysql.py @@ -16,7 +16,7 @@ class MySQLConnectionDataSchema(BaseModel): host: str - port: int = Field(default=3306, gt=0, le=65535) # noqa: WPS432 + port: int = Field(default=3306, gt=0, le=65535) database_name: str | None = None additional_params: dict = Field(default_factory=dict) diff --git a/syncmaster/schemas/v1/connections/oracle.py b/syncmaster/schemas/v1/connections/oracle.py index 34393785..e275de77 100644 --- a/syncmaster/schemas/v1/connections/oracle.py +++ b/syncmaster/schemas/v1/connections/oracle.py @@ -17,7 +17,7 @@ class OracleConnectionDataSchema(BaseModel): host: str - port: int = Field(default=1521, gt=0, le=65535) # noqa: WPS432 + port: int = Field(default=1521, gt=0, le=65535) service_name: str | None = None sid: str | None = None additional_params: dict = Field(default_factory=dict) diff --git a/syncmaster/schemas/v1/connections/postgres.py b/syncmaster/schemas/v1/connections/postgres.py index 52375ced..cb18b392 100644 --- a/syncmaster/schemas/v1/connections/postgres.py +++ b/syncmaster/schemas/v1/connections/postgres.py @@ -17,7 +17,7 @@ class PostgresConnectionDataSchema(BaseModel): host: str - port: int = Field(default=5432, gt=0, le=65535) # noqa: WPS432 + port: int = Field(default=5432, gt=0, le=65535) database_name: str additional_params: dict = Field(default_factory=dict) diff --git a/syncmaster/schemas/v1/connections/s3.py b/syncmaster/schemas/v1/connections/s3.py index 4b7add44..ab7caec1 100644 --- a/syncmaster/schemas/v1/connections/s3.py +++ b/syncmaster/schemas/v1/connections/s3.py @@ -21,7 +21,7 @@ class ReadS3ConnectionDataSchema(BaseModel): bucket: str protocol: Literal["http", "https"] = "https" region: str - port: int | None = Field(default=None, gt=0, le=65535, validate_default=True) # noqa: WPS432 + port: int | None = Field(default=None, gt=0, le=65535, validate_default=True) bucket_style: Literal["domain", "path"] = "path" additional_params: dict = Field(default_factory=dict) @@ -32,7 +32,7 @@ class CreateS3ConnectionDataSchema(ReadS3ConnectionDataSchema): def validate_port(cls, port: int | None, info: ValidationInfo) -> int: protocol = info.data.get("protocol") if port is None: - return 443 if protocol == "https" else 80 # noqa: WPS432 + return 443 if protocol == "https" else 80 return port diff --git a/syncmaster/schemas/v1/connections/samba.py b/syncmaster/schemas/v1/connections/samba.py index bb20a2ee..96d86d04 100644 --- a/syncmaster/schemas/v1/connections/samba.py +++ b/syncmaster/schemas/v1/connections/samba.py @@ -19,7 +19,7 @@ class SambaConnectionDataSchema(BaseModel): host: str share: str - port: int | None = Field(default=None, gt=0, le=65535) # noqa: WPS432 + port: int | None = Field(default=None, gt=0, le=65535) protocol: Literal["SMB", "NetBIOS"] = "SMB" domain: str = "" diff --git a/syncmaster/schemas/v1/connections/sftp.py b/syncmaster/schemas/v1/connections/sftp.py index e4457568..b8c7d678 100644 --- a/syncmaster/schemas/v1/connections/sftp.py +++ b/syncmaster/schemas/v1/connections/sftp.py @@ -17,7 +17,7 @@ class SFTPConnectionDataSchema(BaseModel): host: str - port: int = Field(default=22, gt=0, le=65535) # noqa: WPS432 + port: int = Field(default=22, gt=0, le=65535) class CreateSFTPConnectionSchema(CreateConnectionBaseSchema): diff --git a/syncmaster/schemas/v1/connections/webdav.py b/syncmaster/schemas/v1/connections/webdav.py index 47a24427..847d0b0f 100644 --- a/syncmaster/schemas/v1/connections/webdav.py +++ b/syncmaster/schemas/v1/connections/webdav.py @@ -19,7 +19,7 @@ class WebDAVConnectionDataSchema(BaseModel): host: str - port: int | None = Field(default=None, gt=0, le=65535) # noqa: WPS432 + port: int | None = Field(default=None, gt=0, le=65535) protocol: Literal["http", "https"] = "https" diff --git a/syncmaster/schemas/v1/groups.py b/syncmaster/schemas/v1/groups.py index 8e1cf916..bc40bbb3 100644 --- a/syncmaster/schemas/v1/groups.py +++ b/syncmaster/schemas/v1/groups.py @@ -24,7 +24,7 @@ class AddUserSchema(BaseModel): @model_validator(mode="before") @classmethod def validate_role(cls, values): - if isinstance(values, dict): + if isinstance(values, dict): # noqa: SIM108 role = values.get("role") else: # access 'role' directly if 'values' is an object diff --git a/syncmaster/schemas/v1/queue.py b/syncmaster/schemas/v1/queue.py index 416243e0..e91f52bf 100644 --- a/syncmaster/schemas/v1/queue.py +++ b/syncmaster/schemas/v1/queue.py @@ -12,7 +12,7 @@ QueueName = Annotated[ str, - StringConstraints(min_length=3, max_length=128, pattern=ALLOWED_PATTERN), # noqa: WPS432 + StringConstraints(min_length=3, max_length=128, pattern=ALLOWED_PATTERN), ] diff --git a/syncmaster/schemas/v1/transfers/resources.py b/syncmaster/schemas/v1/transfers/resources.py index b1e85fb2..03ff61b0 100644 --- a/syncmaster/schemas/v1/transfers/resources.py +++ b/syncmaster/schemas/v1/transfers/resources.py @@ -2,16 +2,16 @@ # SPDX-License-Identifier: Apache-2.0 from pydantic import BaseModel, ByteSize, Field -ONE_MB = 2**20 # noqa: WPS432 -ONE_GB = 2**30 # noqa: WPS432 +ONE_MB = 2**20 +ONE_GB = 2**30 class Resources(BaseModel): max_parallel_tasks: int = Field(default=1, ge=1, le=100, description="Parallel executors") - cpu_cores_per_task: int = Field(default=1, ge=1, le=32, description="Cores per executor") # noqa: WPS432 + cpu_cores_per_task: int = Field(default=1, ge=1, le=32, description="Cores per executor") ram_bytes_per_task: ByteSize = Field( # type: ignore[arg-type] default=ONE_GB, - ge=512 * ONE_MB, # noqa: WPS432 - le=64 * ONE_GB, # noqa: WPS432 + ge=512 * ONE_MB, + le=64 * ONE_GB, description="RAM per executor", ) diff --git a/syncmaster/schemas/v1/types.py b/syncmaster/schemas/v1/types.py index 08d7a13f..1ae3a8a5 100644 --- a/syncmaster/schemas/v1/types.py +++ b/syncmaster/schemas/v1/types.py @@ -4,5 +4,5 @@ from pydantic import AnyUrl, StringConstraints, UrlConstraints -NameConstr = Annotated[str, StringConstraints(min_length=3, max_length=128)] # noqa: WPS432 +NameConstr = Annotated[str, StringConstraints(min_length=3, max_length=128)] URL = Annotated[AnyUrl, UrlConstraints(allowed_schemes=["http", "https"], preserve_empty_path=True)] diff --git a/syncmaster/server/handler.py b/syncmaster/server/handler.py index 06ecc97e..276968f6 100644 --- a/syncmaster/server/handler.py +++ b/syncmaster/server/handler.py @@ -33,7 +33,7 @@ QueueDeleteError, QueueNotFoundError, ) -from syncmaster.exceptions.redirect import RedirectException +from syncmaster.exceptions.redirect import RedirectError from syncmaster.exceptions.run import ( CannotConnectToTaskQueueError, CannotStopRunError, @@ -106,7 +106,11 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE ) -async def syncmsater_exception_handler(request: Request, exc: SyncmasterError): # noqa: WPS231, WPS212 +# TODO: refactor exception handling +async def syncmsater_exception_handler( # noqa: C901, PLR0911, PLR0912, PLR0915 + request: Request, + exc: SyncmasterError, +): response = get_response_for_exception(SyncmasterError) if not response: return unknown_exception_handler(request, exc) @@ -122,7 +126,7 @@ async def syncmsater_exception_handler(request: Request, exc: SyncmasterError): content=content, ) - if isinstance(exc, RedirectException): + if isinstance(exc, RedirectError): content.code = "unauthorized" content.message = "Please authorize using provided URL" content.details = exc.redirect_url diff --git a/syncmaster/server/middlewares/openapi.py b/syncmaster/server/middlewares/openapi.py index b9157a4e..22345f61 100644 --- a/syncmaster/server/middlewares/openapi.py +++ b/syncmaster/server/middlewares/openapi.py @@ -50,7 +50,7 @@ def custom_openapi_schema(app: FastAPI, settings: OpenAPISettings) -> dict: openapi_schema["info"]["x-logo"] = { "url": str(settings.logo.url), "altText": str(settings.logo.alt_text), - "backgroundColor": f"#{settings.logo.background_color}", # noqa: WPS237 + "backgroundColor": f"#{settings.logo.background_color}", "href": str(settings.logo.href), } app.openapi_schema = openapi_schema diff --git a/syncmaster/server/providers/auth/base_provider.py b/syncmaster/server/providers/auth/base_provider.py index 1b25f50d..6335230e 100644 --- a/syncmaster/server/providers/auth/base_provider.py +++ b/syncmaster/server/providers/auth/base_provider.py @@ -69,7 +69,7 @@ async def get_current_user(self, access_token: str | None, request: Request) -> ... @abstractmethod - async def get_token_password_grant( + async def get_token_password_grant( # noqa: PLR0913 self, grant_type: str | None = None, login: str | None = None, diff --git a/syncmaster/server/providers/auth/dummy_provider.py b/syncmaster/server/providers/auth/dummy_provider.py index 2caaaed1..29e30b0c 100644 --- a/syncmaster/server/providers/auth/dummy_provider.py +++ b/syncmaster/server/providers/auth/dummy_provider.py @@ -20,7 +20,7 @@ log = logging.getLogger(__name__) -class DummyAuthProvider(AuthProvider): # noqa: WPS338 +class DummyAuthProvider(AuthProvider): def __init__( self, settings: Annotated[DummyAuthProviderSettings, Depends(Stub(DummyAuthProviderSettings))], @@ -45,7 +45,7 @@ async def get_current_user(self, access_token: str | None, *args, **kwargs) -> U user_id = self._get_user_id_from_token(access_token) return await self._uow.user.read_by_id(user_id) - async def get_token_password_grant( + async def get_token_password_grant( # noqa: PLR0913 self, grant_type: str | None = None, login: str | None = None, diff --git a/syncmaster/server/providers/auth/keycloak_provider.py b/syncmaster/server/providers/auth/keycloak_provider.py index 93807f2f..5aac103c 100644 --- a/syncmaster/server/providers/auth/keycloak_provider.py +++ b/syncmaster/server/providers/auth/keycloak_provider.py @@ -11,7 +11,7 @@ from syncmaster.db.models.user import User from syncmaster.exceptions import EntityNotFoundError from syncmaster.exceptions.auth import AuthorizationError, LogoutError -from syncmaster.exceptions.redirect import RedirectException +from syncmaster.exceptions.redirect import RedirectError from syncmaster.server.dependencies import Stub from syncmaster.server.providers.auth.base_provider import AuthProvider from syncmaster.server.services.unit_of_work import UnitOfWork @@ -29,7 +29,7 @@ def __init__( self.settings = settings self._uow = unit_of_work self.keycloak_openid = KeycloakOpenID( - server_url=str(self.settings.keycloak.api_url).rstrip("/") + "/", # noqa: WPS336 + server_url=str(self.settings.keycloak.api_url).rstrip("/") + "/", client_id=self.settings.keycloak.client_id, realm_name=self.settings.keycloak.realm_name, client_secret_key=self.settings.keycloak.client_secret.get_secret_value(), @@ -55,7 +55,7 @@ def setup(cls, app: FastAPI) -> FastAPI: ) return app - async def get_token_password_grant( + async def get_token_password_grant( # noqa: PLR0913 self, grant_type: str | None = None, login: str | None = None, @@ -84,7 +84,7 @@ async def get_token_authorization_code_grant( msg = "Failed to get token" raise AuthorizationError(msg) from e - async def get_current_user(self, access_token: str | None, request: Request) -> User: # noqa: WPS231, WPS217 + async def get_current_user(self, access_token: str | None, request: Request) -> User: if not access_token: log.debug("No access token found in session") await self.redirect_to_auth() @@ -152,7 +152,7 @@ async def redirect_to_auth(self) -> NoReturn: redirect_uri=self.settings.keycloak.ui_callback_url, scope=self.settings.keycloak.scope, ) - raise RedirectException(redirect_url=auth_url) + raise RedirectError(redirect_url=auth_url) async def logout(self, user: User, refresh_token: str | None) -> None: if not refresh_token: diff --git a/syncmaster/server/providers/auth/oauth2_gateway_provider.py b/syncmaster/server/providers/auth/oauth2_gateway_provider.py index 5a33c4ee..7a396b17 100644 --- a/syncmaster/server/providers/auth/oauth2_gateway_provider.py +++ b/syncmaster/server/providers/auth/oauth2_gateway_provider.py @@ -18,7 +18,7 @@ class OAuth2GatewayProvider(AuthProvider): - def __init__( # noqa: WPS612 + def __init__( self, settings: Annotated[OAuth2GatewayProviderSettings, Depends(Stub(OAuth2GatewayProviderSettings))], unit_of_work: Annotated[UnitOfWork, Depends()], @@ -26,7 +26,7 @@ def __init__( # noqa: WPS612 self.settings = settings self._uow = unit_of_work self.keycloak_openid = KeycloakOpenID( - server_url=str(self.settings.keycloak.api_url).rstrip("/") + "/", # noqa: WPS336 + server_url=str(self.settings.keycloak.api_url).rstrip("/") + "/", client_id=self.settings.keycloak.client_id, realm_name=self.settings.keycloak.realm_name, client_secret_key=self.settings.keycloak.client_secret.get_secret_value(), @@ -43,30 +43,34 @@ def setup(cls, app: FastAPI) -> FastAPI: app.dependency_overrides[OAuth2GatewayProviderSettings] = lambda: settings return app - async def get_current_user( # noqa: WPS231, WPS217, WPS238 + async def get_current_user( self, access_token: str | None, request: Request, ) -> User: if not access_token: log.debug("No access token found in request") - raise AuthorizationError("Missing auth credentials") + msg = "Missing auth credentials" + raise AuthorizationError(msg) try: token_info = await self.keycloak_openid.a_introspect(access_token) except KeycloakOperationError as e: log.info("Failed to introspect token: %s", e) - raise AuthorizationError("Invalid token payload") + msg = "Invalid token payload" + raise AuthorizationError(msg) from e if token_info["active"] is False: - raise AuthorizationError("Token is not active") + msg = "Token is not active" + raise AuthorizationError(msg) # these names are hardcoded in keycloak: # https://github.com/keycloak/keycloak/blob/3ca3a4ad349b4d457f6829eaf2ae05f1e01408be/core/src/main/java/org/keycloak/representations/IDToken.java # TODO: make sure which fields are guaranteed login = token_info.get("preferred_username") if not login: - raise AuthorizationError("Invalid token") + msg = "Invalid token" + raise AuthorizationError(msg) email = token_info.get("email") first_name = token_info.get("given_name") @@ -86,7 +90,7 @@ async def get_current_user( # noqa: WPS231, WPS217, WPS238 ) return user - async def get_token_password_grant( + async def get_token_password_grant( # noqa: PLR0913 self, grant_type: str | None = None, login: str | None = None, @@ -95,8 +99,9 @@ async def get_token_password_grant( client_id: str | None = None, client_secret: str | None = None, ) -> dict[str, Any]: + msg = f"Password grant is not supported by {self.__class__.__name__}." raise NotImplementedError( - f"Password grant is not supported by {self.__class__.__name__}.", # noqa: WPS237 + msg, ) async def get_token_authorization_code_grant( @@ -106,11 +111,13 @@ async def get_token_authorization_code_grant( client_id: str | None = None, client_secret: str | None = None, ) -> dict[str, Any]: + msg = f"Authorization code grant is not supported by {self.__class__.__name__}." raise NotImplementedError( - f"Authorization code grant is not supported by {self.__class__.__name__}.", # noqa: WPS237 + msg, ) async def logout(self, user: User, refresh_token: str | None) -> None: + msg = f"Logout is not supported by {self.__class__.__name__}." raise NotImplementedError( - f"Logout is not supported by {self.__class__.__name__}.", # noqa: WPS237 + msg, ) diff --git a/syncmaster/server/services/get_user.py b/syncmaster/server/services/get_user.py index 839386e2..67b23027 100644 --- a/syncmaster/server/services/get_user.py +++ b/syncmaster/server/services/get_user.py @@ -26,9 +26,12 @@ ) -def get_user( # noqa: WPS231 - is_superuser: bool = False, -) -> Callable[[Request, AuthProvider, str | None, HTTPAuthorizationCredentials | None], Coroutine[Any, Any, User]]: +def get_user( + is_superuser: bool = False, # noqa: FBT001, FBT002 +) -> Callable[ + [Request, AuthProvider, str | None, HTTPAuthorizationCredentials | None], + Coroutine[Any, Any, User], +]: async def wrapper( request: Request, auth_provider: Annotated[AuthProvider, Depends(Stub(AuthProvider))], @@ -51,11 +54,14 @@ async def wrapper( request=request, ) if user is None: - raise EntityNotFoundError("User not found") + msg = "User not found" + raise EntityNotFoundError(msg) if not user.is_active: - raise ActionNotAllowedError("Inactive user") + msg = "Inactive user" + raise ActionNotAllowedError(msg) if is_superuser and not user.is_superuser: - raise ActionNotAllowedError("You have no power here") + msg = "You have no power here" + raise ActionNotAllowedError(msg) return user return wrapper diff --git a/syncmaster/settings/logging.py b/syncmaster/settings/logging.py index 810572b4..5ef5edd7 100644 --- a/syncmaster/settings/logging.py +++ b/syncmaster/settings/logging.py @@ -1,7 +1,7 @@ # SPDX-FileCopyrightText: 2023-present MTS PJSC # SPDX-License-Identifier: Apache-2.0 import logging -import logging.config # noqa: WPS301, WPS458 +import logging.config from pydantic import AliasChoices, BaseModel, ConfigDict, Field from pydantic_settings_logging import ( @@ -9,12 +9,10 @@ FormatterConfig, HandlerConfig, LoggerConfig, -) -from pydantic_settings_logging import LoggingSettings as BaseLoggingSettings -from pydantic_settings_logging import ( RootLoggerConfig, StreamHandlerConfig, ) +from pydantic_settings_logging import LoggingSettings as BaseLoggingSettings # https://github.com/vduseev/pydantic-settings-logging/pull/1 diff --git a/syncmaster/worker/controller.py b/syncmaster/worker/controller.py index c79d0d40..69ab3284 100644 --- a/syncmaster/worker/controller.py +++ b/syncmaster/worker/controller.py @@ -176,7 +176,7 @@ class TransferController: source_handler: Handler target_handler: Handler - def __init__( + def __init__( # noqa: PLR0913 self, settings: WorkerAppSettings, run: Run, @@ -241,7 +241,7 @@ def perform_transfer(self) -> None: finally: self.temp_dir.cleanup() - def get_handler( + def get_handler( # noqa: PLR0913 self, connection_data: dict[str, Any], connection_auth_data: dict, @@ -312,7 +312,7 @@ def _get_transfer_hwm_name(self) -> str: hwm_name_suffix, ], ) - return hwm_name + return hwm_name # noqa: RET504 def _reset_transfer_hwm(self) -> None: with self._get_hwm_store() as hwm_store: diff --git a/syncmaster/worker/handlers/file/base.py b/syncmaster/worker/handlers/file/base.py index cdf3450e..2b5029b7 100644 --- a/syncmaster/worker/handlers/file/base.py +++ b/syncmaster/worker/handlers/file/base.py @@ -3,7 +3,7 @@ from __future__ import annotations -import os +from pathlib import Path from typing import TYPE_CHECKING, ClassVar from syncmaster.worker.handlers.base import Handler @@ -55,13 +55,13 @@ class FileHandler(Handler): "orc": "orc", } - def _rename_files(self, tmp_path: str) -> None: + def _rename_files(self, tmp_path: Path) -> None: files = self.file_connection.list_dir(tmp_path) for index, old_path in enumerate(files): extension = self._get_file_extension() new_name = self._get_file_name(index, extension) - new_path = os.path.join(tmp_path, new_name) + new_path = Path(tmp_path, new_name) self.file_connection.rename_file(old_path, new_path) def _get_file_name(self, index: int, extension: str) -> str: diff --git a/syncmaster/worker/handlers/file/ftp.py b/syncmaster/worker/handlers/file/ftp.py index 6ebfe938..9e279c6a 100644 --- a/syncmaster/worker/handlers/file/ftp.py +++ b/syncmaster/worker/handlers/file/ftp.py @@ -20,7 +20,7 @@ class FTPHandler(LocalDFFileHandler): connection_dto: FTPConnectionDTO def connect(self, spark: SparkSession) -> None: - from onetl.connection import FTP, SparkLocalFS + from onetl.connection import FTP, SparkLocalFS # noqa: PLC0415 self.file_connection = FTP( host=self.connection_dto.host, diff --git a/syncmaster/worker/handlers/file/ftps.py b/syncmaster/worker/handlers/file/ftps.py index 55e1c896..9e55a3f6 100644 --- a/syncmaster/worker/handlers/file/ftps.py +++ b/syncmaster/worker/handlers/file/ftps.py @@ -20,7 +20,7 @@ class FTPSHandler(LocalDFFileHandler): connection_dto: FTPSConnectionDTO def connect(self, spark: SparkSession) -> None: - from onetl.connection import FTPS, SparkLocalFS + from onetl.connection import FTPS, SparkLocalFS # noqa: PLC0415 self.file_connection = FTPS( host=self.connection_dto.host, diff --git a/syncmaster/worker/handlers/file/hdfs.py b/syncmaster/worker/handlers/file/hdfs.py index 3a4e3c26..95a7d3ca 100644 --- a/syncmaster/worker/handlers/file/hdfs.py +++ b/syncmaster/worker/handlers/file/hdfs.py @@ -20,7 +20,7 @@ class HDFSHandler(RemoteDFFileHandler): connection_dto: HDFSConnectionDTO def connect(self, spark: SparkSession): - from onetl.connection import HDFS, SparkHDFS + from onetl.connection import HDFS, SparkHDFS # noqa: PLC0415 self.df_connection = SparkHDFS( cluster=self.connection_dto.cluster, diff --git a/syncmaster/worker/handlers/file/local_df.py b/syncmaster/worker/handlers/file/local_df.py index 8d213119..83636e53 100644 --- a/syncmaster/worker/handlers/file/local_df.py +++ b/syncmaster/worker/handlers/file/local_df.py @@ -3,7 +3,7 @@ from __future__ import annotations -import os +from pathlib import Path from typing import TYPE_CHECKING from etl_entities.hwm import FileListHWM, FileModifiedTimeHWM @@ -18,11 +18,11 @@ class LocalDFFileHandler(FileHandler): def read(self) -> DataFrame: - from pyspark.sql.types import StructType + from pyspark.sql.types import StructType # noqa: PLC0415 downloader_params = {} if self.transfer_dto.strategy.type == "incremental": - hwm_name = f"{self.transfer_dto.id}_{self.connection_dto.type}_{self.transfer_dto.directory_path}" # noqa: WPS237 + hwm_name = f"{self.transfer_dto.id}_{self.connection_dto.type}_{self.transfer_dto.directory_path}" if self.transfer_dto.strategy.increment_by == "file_modified_since": downloader_params["hwm"] = FileModifiedTimeHWM(name=hwm_name) elif self.transfer_dto.strategy.increment_by == "file_name": @@ -42,7 +42,7 @@ def read(self) -> DataFrame: connection=self.local_df_connection, format=self.transfer_dto.file_format, source_path=self.temp_dir.name, - df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None, + df_schema=(StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None), ) df = reader.run() @@ -66,9 +66,9 @@ def write(self, df: DataFrame) -> None: writer.run(df=df) # working with spark generated .crc files may lead to exceptions - crc_files = [f for f in os.listdir(self.temp_dir.name) if f.endswith(".crc")] + crc_files = [f for f in Path(self.temp_dir.name).iterdir() if f.name.endswith(".crc")] for file in crc_files: - os.remove(os.path.join(self.temp_dir.name, file)) + file.unlink() self._rename_files() @@ -80,20 +80,19 @@ def write(self, df: DataFrame) -> None: uploader.run() def _rename_files(self): - files = os.listdir(self.temp_dir.name) - - for index, file_name in enumerate(files): + tmp_dir = Path(self.temp_dir.name) + for index, file_name in enumerate(tmp_dir.iterdir()): extension = self._get_file_extension() - new_name = self._get_file_name(str(index), extension) - old_path = os.path.join(self.temp_dir.name, file_name) - new_path = os.path.join(self.temp_dir.name, new_name) - os.rename(old_path, new_path) + new_name = self._get_file_name(index, extension) + old_path = Path(self.temp_dir.name, file_name) + new_path = Path(self.temp_dir.name, new_name) + old_path.rename(new_path) def _make_file_metadata_filters(self, filters: list[dict]) -> list[Glob | Regexp | FileSizeRange]: processed_filters = [] - for filter in filters: - filter_type = filter["type"] - value = filter["value"] + for filter_ in filters: + filter_type = filter_["type"] + value = filter_["value"] if filter_type == "name_glob": processed_filters.append(Glob(value)) diff --git a/syncmaster/worker/handlers/file/remote_df.py b/syncmaster/worker/handlers/file/remote_df.py index decd0aa6..2191c12f 100644 --- a/syncmaster/worker/handlers/file/remote_df.py +++ b/syncmaster/worker/handlers/file/remote_df.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -import os +from pathlib import Path from typing import TYPE_CHECKING from onetl.file import FileDFReader, FileDFWriter, FileMover @@ -16,13 +16,13 @@ class RemoteDFFileHandler(FileHandler): def read(self) -> DataFrame: - from pyspark.sql.types import StructType + from pyspark.sql.types import StructType # noqa: PLC0415 reader = FileDFReader( connection=self.df_connection, format=self.transfer_dto.file_format, source_path=self.transfer_dto.directory_path, - df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None, + df_schema=(StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None), options=self.transfer_dto.options, ) df = reader.run() @@ -38,7 +38,7 @@ def read(self) -> DataFrame: return df def write(self, df: DataFrame) -> None: - tmp_path = os.path.join(self.transfer_dto.directory_path, ".tmp", str(self.run_dto.id)) + tmp_path = Path(self.transfer_dto.directory_path) / ".tmp" / str(self.run_dto.id) try: writer = FileDFWriter( connection=self.df_connection, diff --git a/syncmaster/worker/handlers/file/s3.py b/syncmaster/worker/handlers/file/s3.py index 017f2d0e..d4f592dc 100644 --- a/syncmaster/worker/handlers/file/s3.py +++ b/syncmaster/worker/handlers/file/s3.py @@ -21,7 +21,7 @@ class S3Handler(RemoteDFFileHandler): connection_dto: S3ConnectionDTO def connect(self, spark: SparkSession): - from onetl.connection import S3, SparkS3 + from onetl.connection import S3, SparkS3 # noqa: PLC0415 self.df_connection = SparkS3( host=self.connection_dto.host, @@ -48,7 +48,7 @@ def connect(self, spark: SparkSession): @slot def read(self) -> DataFrame: - from pyspark.sql.types import StructType + from pyspark.sql.types import StructType # noqa: PLC0415 options = {} if self.transfer_dto.file_format.__class__.__name__ in ("Excel", "XML"): @@ -58,7 +58,7 @@ def read(self) -> DataFrame: connection=self.df_connection, format=self.transfer_dto.file_format, source_path=self.transfer_dto.directory_path, - df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None, + df_schema=(StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None), options={**options, **self.transfer_dto.options}, ) df = reader.run() diff --git a/syncmaster/worker/handlers/file/samba.py b/syncmaster/worker/handlers/file/samba.py index ffeb10ea..63e43a4f 100644 --- a/syncmaster/worker/handlers/file/samba.py +++ b/syncmaster/worker/handlers/file/samba.py @@ -20,7 +20,7 @@ class SambaHandler(LocalDFFileHandler): connection_dto: SambaConnectionDTO def connect(self, spark: SparkSession) -> None: - from onetl.connection import Samba, SparkLocalFS + from onetl.connection import Samba, SparkLocalFS # noqa: PLC0415 self.file_connection = Samba( host=self.connection_dto.host, diff --git a/syncmaster/worker/handlers/file/sftp.py b/syncmaster/worker/handlers/file/sftp.py index d73c3cec..99928629 100644 --- a/syncmaster/worker/handlers/file/sftp.py +++ b/syncmaster/worker/handlers/file/sftp.py @@ -20,7 +20,7 @@ class SFTPHandler(LocalDFFileHandler): connection_dto: SFTPConnectionDTO def connect(self, spark: SparkSession) -> None: - from onetl.connection import SFTP, SparkLocalFS + from onetl.connection import SFTP, SparkLocalFS # noqa: PLC0415 self.file_connection = SFTP( host=self.connection_dto.host, diff --git a/syncmaster/worker/ivy2.py b/syncmaster/worker/ivy2.py index f50ee7b5..bd883928 100644 --- a/syncmaster/worker/ivy2.py +++ b/syncmaster/worker/ivy2.py @@ -15,8 +15,8 @@ from onetl.file.format import XML, Excel -def get_packages(connection_types: set[str]) -> list[str]: # noqa: WPS212 - import pyspark +def get_packages(connection_types: set[str]) -> list[str]: + import pyspark # noqa: PLC0415 spark_version = pyspark.__version__ # excel version is hardcoded due to https://github.com/nightscape/spark-excel/issues/902 diff --git a/syncmaster/worker/settings/__init__.py b/syncmaster/worker/settings/__init__.py index 106c3771..ad8ac0e5 100644 --- a/syncmaster/worker/settings/__init__.py +++ b/syncmaster/worker/settings/__init__.py @@ -35,7 +35,7 @@ class WorkerSettings(BaseModel): spark.driver.bindAddress: 0.0.0.0 spark.sql.pyspark.jvmStacktrace.enabled: true spark.ui.enabled: false - """ + """ # noqa: E501 create_spark_session_function: ImportString = Field( "syncmaster.worker.spark.get_worker_spark_session", diff --git a/syncmaster/worker/spark.py b/syncmaster/worker/spark.py index 85fa9903..62693db6 100644 --- a/syncmaster/worker/spark.py +++ b/syncmaster/worker/spark.py @@ -32,9 +32,9 @@ def get_worker_spark_session( settings: WorkerSettings, ) -> SparkSession: """Construct Spark Session using run parameters and application settings""" - from pyspark.sql import SparkSession + from pyspark.sql import SparkSession # noqa: PLC0415 - name = run.transfer.group.name + "_" + run.transfer.name # noqa: WPS336 + name = run.transfer.group.name + "_" + run.transfer.name spark_builder = SparkSession.builder.appName(f"SyncMaster__{name}") master = settings.spark_session_default_config.get("spark.master") @@ -98,7 +98,7 @@ def get_spark_session_conf( log.debug("Exclude Maven packages: %s", excluded_packages) config["spark.jars.excludes"] = ",".join(excluded_packages) - if target.type == "s3": # type: ignore + if target.type == "s3": config.update( { "spark.hadoop.fs.s3a.committer.magic.enabled": "true", diff --git a/tests/resources/file_df_connection/generate_data.py b/tests/resources/file_df_connection/generate_data.py old mode 100644 new mode 100755 index 763e096a..bdbb11b9 --- a/tests/resources/file_df_connection/generate_data.py +++ b/tests/resources/file_df_connection/generate_data.py @@ -5,6 +5,7 @@ import sys from argparse import ArgumentParser from datetime import UTC +from pathlib import Path from faker import Faker @@ -40,7 +41,7 @@ def calculate_intravals(elements: int, parts: int): def generate_data_file(elements: int, parts: int): data = create_data(elements) - with open("test_data.py", "w") as f: + with Path("test_data.py").open("w") as f: data_to_write = f"import datetime\nintervals={calculate_intravals(elements, parts)}\ndata={data}" f.write(data_to_write) diff --git a/tests/resources/file_df_connection/generate_files.py b/tests/resources/file_df_connection/generate_files.py old mode 100644 new mode 100755 index 57640287..a9f2aae8 --- a/tests/resources/file_df_connection/generate_files.py +++ b/tests/resources/file_df_connection/generate_files.py @@ -6,6 +6,7 @@ import gzip import io import json +import logging import random import shutil import sys @@ -27,26 +28,27 @@ from pyarrow import Table as ArrowTable SEED = 42 +logger = logging.getLogger(__name__) def get_data() -> list: try: from test_data import data - + except Exception: + logger.exception("File test_data.py does not exists, run 'generate_data.py'.") + sys.exit(1) + else: return data - except Exception as e: - print("File test_data.py does not exists, run 'generate_data.py'.", e) - exit(1) def get_intervals() -> list: try: from test_data import intervals - + except Exception: + logger.exception("File test_data.py does not exists, run 'generate_data.py'.") + sys.exit(1) + else: return intervals - except Exception as e: - print("File test_data.py does not exists, run 'generate_data.py'.", e) - exit(1) def get_pandas_dataframe(data: list[dict]) -> PandasDataFrame: @@ -91,7 +93,10 @@ def get_avro_schema() -> AvroSchema: {"name": "REGION", "type": "string"}, {"name": "NUMBER", "type": "int"}, {"name": "BIRTH_DATE", "type": {"type": "int", "logicalType": "date"}}, - {"name": "REGISTERED_AT", "type": {"type": "long", "logicalType": "timestamp-millis"}}, + { + "name": "REGISTERED_AT", + "type": {"type": "long", "logicalType": "timestamp-millis"}, + }, {"name": "ACCOUNT_BALANCE", "type": "double"}, ], } @@ -117,19 +122,19 @@ def _write_csv(data: list[dict], file: TextIO, include_header: bool = False, **k def save_as_csv_without_header(data: list[dict], path: Path) -> None: path.mkdir(parents=True, exist_ok=True) - with open(path / "file.csv", "w", newline="") as file: + with path.joinpath("file.csv").open("w", newline="") as file: _write_csv(data, file) def save_as_csv_with_header(data: list[dict], path: Path) -> None: path.mkdir(parents=True, exist_ok=True) - with open(path / "file.csv", "w", newline="") as file: + with path.joinpath("file.csv").open("w", newline="") as file: _write_csv(data, file, include_header=True) def save_as_csv_with_delimiter(data: list[dict], path: Path) -> None: path.mkdir(parents=True, exist_ok=True) - with open(path / "file.csv", "w", newline="") as file: + with path.joinpath("file.csv").open("w", newline="") as file: _write_csv(data, file, delimiter=";") @@ -140,9 +145,11 @@ def save_as_csv_gz(data: list[dict], path: Path) -> None: # Instead of just writing data to file we write it to a buffer, and then compress with fixed mtime buffer = io.StringIO() _write_csv(data, buffer) - with open(path / "file.csv.gz", "wb") as file: - with gzip.GzipFile(fileobj=file, mode="w", mtime=0) as gzfile: - gzfile.write(buffer.getvalue().encode("utf-8")) + with ( + path.joinpath("file.csv.gz").open("wb") as file, + gzip.GzipFile(fileobj=file, mode="w", mtime=0) as gzfile, + ): + gzfile.write(buffer.getvalue().encode("utf-8")) def save_as_csv_nested(data: list[dict], path: Path) -> None: @@ -150,23 +157,19 @@ def save_as_csv_nested(data: list[dict], path: Path) -> None: path.joinpath("some/path/more").mkdir(parents=True, exist_ok=True) path.joinpath("some/path/more/even_more").mkdir(parents=True, exist_ok=True) - counter = 0 - for num, interval in enumerate(get_intervals()): - if counter <= 2: - with open(path / f"some/path/for_val{num}.csv", "w", newline="") as file: + if num <= 2: + with path.joinpath(f"some/path/for_val{num}.csv").open("w", newline="") as file: _write_csv([row for row in data if row["NUMBER"] in interval], file) - if counter in (3, 4, 5): - with open(path / f"some/path/more/for_val{num}.csv", "w", newline="") as file: + if num in (3, 4, 5): + with path.joinpath(f"some/path/more/for_val{num}.csv").open("w", newline="") as file: _write_csv([row for row in data if row["NUMBER"] in interval], file) - if counter >= 6: - with open(path / f"some/path/more/even_more/for_val{num}.csv", "w", newline="") as file: + if num >= 6: + with path.joinpath(f"some/path/more/even_more/for_val{num}.csv").open("w", newline="") as file: _write_csv([row for row in data if row["NUMBER"] in interval], file) - counter += 1 - def save_as_csv_partitioned(data: list[dict], path: Path) -> None: def filter_and_drop(rows: list[dict], column: str, value: Any) -> list[dict]: @@ -185,7 +188,7 @@ def filter_and_drop(rows: list[dict], column: str, value: Any) -> list[dict]: columns.remove("NUMBER") for num, interval in enumerate(get_intervals()): - with open(path / f"NUMBER={num}/file.csv", "w", newline="") as file: + with path.joinpath(f"NUMBER={num}/file.csv").open("w", newline="") as file: data_for_val = filter_and_drop(data, "NUMBER", interval) _write_csv(data_for_val, file) @@ -211,9 +214,11 @@ def save_as_json_gz(data: list[dict], path: Path) -> None: path.mkdir(parents=True, exist_ok=True) buffer = io.StringIO() json.dump(data, buffer, default=_to_string) - with open(path / "file.json.gz", "wb") as file: - with gzip.GzipFile(fileobj=file, mode="w", mtime=0) as gzfile: - gzfile.write(buffer.getvalue().encode("utf-8")) + with ( + path.joinpath("file.json.gz").open("wb") as file, + gzip.GzipFile(fileobj=file, mode="w", mtime=0) as gzfile, + ): + gzfile.write(buffer.getvalue().encode("utf-8")) def save_as_json(data: list[dict], path: Path) -> None: @@ -226,7 +231,7 @@ def save_as_json(data: list[dict], path: Path) -> None: def save_as_jsonline_plain(data: list[dict], path: Path) -> None: path.mkdir(parents=True, exist_ok=True) - with open(path / "file.jsonl", "w") as file: + with path.joinpath("file.jsonl").open("w") as file: for row in data: row_str = json.dumps(row, default=_to_string) file.write(row_str + "\n") @@ -240,9 +245,11 @@ def save_as_jsonline_gz(data: list[dict], path: Path) -> None: row_str = json.dumps(row, default=_to_string) buffer.write(row_str + "\n") - with open(path / "file.jsonl.gz", "wb") as file: - with gzip.GzipFile(fileobj=file, mode="w", mtime=0) as gzfile: - gzfile.write(buffer.getvalue().encode("utf-8")) + with ( + path.joinpath("file.jsonl.gz").open("wb") as file, + gzip.GzipFile(fileobj=file, mode="w", mtime=0) as gzfile, + ): + gzfile.write(buffer.getvalue().encode("utf-8")) def save_as_jsonline(data: list[dict], path: Path) -> None: @@ -307,7 +314,7 @@ def save_as_parquet(data: list[dict], path: Path) -> None: def temporary_set_seed(seed: int) -> Iterator[int]: """Set random.seed to expected value, and return previous value after exit""" state = random.getstate() - try: # noqa: WPS501 + try: random.seed(seed) yield seed finally: @@ -320,13 +327,16 @@ def save_as_avro_plain(data: list[dict], path: Path) -> None: path.mkdir(parents=True, exist_ok=True) schema = get_avro_schema() - with open(path / "file.avro", "wb") as file: - # DataFileDFWriter.sync_marker is initialized with randbytes - # temporary set seed to avoid generating files with different hashes - with temporary_set_seed(SEED): - with DataFileWriter(file, DatumWriter(), schema) as writer: - for row in data: - writer.append(row) + + # DataFileDFWriter.sync_marker is initialized with randbytes + # temporary set seed to avoid generating files with different hashes + with ( + temporary_set_seed(SEED), + path.joinpath("file.avro").open("wb") as file, + DataFileWriter(file, DatumWriter(), schema) as writer, + ): + for row in data: + writer.append(row) def save_as_avro_snappy(data: list[dict], path: Path) -> None: @@ -335,13 +345,16 @@ def save_as_avro_snappy(data: list[dict], path: Path) -> None: path.mkdir(parents=True, exist_ok=True) schema = get_avro_schema() - with open(path / "file.snappy.avro", "wb") as file: - # DataFileDFWriter.sync_marker is initialized with randbytes - # temporary set seed to avoid generating files with different hashes - with temporary_set_seed(SEED): - with DataFileWriter(file, DatumWriter(), schema, codec="snappy") as writer: - for row in data: - writer.append(row) + + # DataFileDFWriter.sync_marker is initialized with randbytes + # temporary set seed to avoid generating files with different hashes + with ( + temporary_set_seed(SEED), + path.joinpath("file.snappy.avro").open("wb") as file, + DataFileWriter(file, DatumWriter(), schema, codec="snappy") as writer, + ): + for row in data: + writer.append(row) def save_as_avro(data: list[dict], path: Path) -> None: @@ -359,7 +372,6 @@ def save_as_xls_with_options( **kwargs, ) -> None: # required to register xlwt writer which supports generating .xls files - pass path.mkdir(parents=True, exist_ok=True) file = path / "file.xls" @@ -373,15 +385,14 @@ def make_zip_deterministic(path: Path) -> None: temp_dir = gettempdir() file_copy = Path(shutil.copy(path, temp_dir)) - with ZipFile(file_copy, "r") as original_file: - with ZipFile(path, "w") as new_file: - for item in original_file.infolist(): - if item.filename == "docProps/core.xml": - # this file contains modification time, which produces files with different hashes - continue - # reset modification time of all files - item.date_time = (1980, 1, 1, 0, 0, 0) - new_file.writestr(item, original_file.read(item.filename)) + with ZipFile(file_copy, "r") as original_file, ZipFile(path, "w") as new_file: + for item in original_file.infolist(): + if item.filename == "docProps/core.xml": + # this file contains modification time, which produces files with different hashes + continue + # reset modification time of all files + item.date_time = (1980, 1, 1, 0, 0, 0) + new_file.writestr(item, original_file.read(item.filename)) def save_as_xlsx_with_options( @@ -525,9 +536,10 @@ def main(argv: list[str] | None = None) -> None: args = parser.parse_args(argv or sys.argv[1:]) if args.format not in format_mapping and args.format != "all": - raise ValueError(f"Format {args.format} is not supported") + msg = f"Format {args.format} is not supported" + raise ValueError(msg) - if args.format == "all": + if args.format == "all": # noqa: SIM108 save_functions = list(format_mapping.values()) else: save_functions = [format_mapping[args.format]] diff --git a/tests/spark/__init__.py b/tests/spark/__init__.py index c995481c..7845c286 100644 --- a/tests/spark/__init__.py +++ b/tests/spark/__init__.py @@ -5,13 +5,12 @@ from onetl.connection import HDFS, SparkHDFS from onetl.hooks import hook -from syncmaster.worker.spark import get_worker_spark_session - # this is just to automatically import hooks -get_worker_spark_session = get_worker_spark_session +from syncmaster.worker.spark import get_worker_spark_session # noqa: F401 @SparkHDFS.Slots.get_cluster_namenodes.bind +@HDFS.Slots.get_cluster_namenodes.bind @hook def get_cluster_namenodes(cluster: str) -> set[str] | None: if cluster == "test-hive": @@ -19,12 +18,11 @@ def get_cluster_namenodes(cluster: str) -> set[str] | None: return None +@HDFS.Slots.is_namenode_active.bind @SparkHDFS.Slots.is_namenode_active.bind @hook def is_namenode_active(host: str, cluster: str) -> bool: - if cluster == "test-hive": - return True - return False + return cluster == "test-hive" @SparkHDFS.Slots.get_ipc_port.bind @@ -35,22 +33,6 @@ def get_ipc_port(cluster: str) -> int | None: return None -@HDFS.Slots.get_cluster_namenodes.bind -@hook -def get_cluster_namenodes(cluster: str) -> set[str] | None: - if cluster == "test-hive": - return {"test-hive"} - return None - - -@HDFS.Slots.is_namenode_active.bind -@hook -def is_namenode_active(host: str, cluster: str) -> bool: - if cluster == "test-hive": - return True - return False - - @HDFS.Slots.get_webhdfs_port.bind @hook def get_webhdfs_port(cluster: str) -> int | None: @@ -66,7 +48,7 @@ def get_webhdfs_port(cluster: str) -> int | None: @worker_process_init.connect def start_coverage(**kwargs): - global COV + global COV # noqa: PLW0603 COV = Coverage(data_suffix=True) COV.start() diff --git a/tests/test_integration/celery_test.py b/tests/test_integration/celery_test.py index 5fee7307..de3c6168 100644 --- a/tests/test_integration/celery_test.py +++ b/tests/test_integration/celery_test.py @@ -1,3 +1,8 @@ from syncmaster.worker.celery import app as celery -celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"]) +celery.conf.update( + imports=[ + *list(celery.conf.imports), + "tests.test_integration.test_scheduler.test_task", + ], +) diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/dataframe_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/dataframe_fixtures.py index ea8f1e0f..4b90840f 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/dataframe_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/dataframe_fixtures.py @@ -59,7 +59,7 @@ def init_df_with_mixed_column_naming(spark: SparkSession) -> DataFrame: "Mordor", 1, datetime.date(year=2023, month=3, day=11), - datetime.datetime.now(), + datetime.datetime.now(), # noqa: DTZ005 1234.2343, ), ( @@ -68,7 +68,7 @@ def init_df_with_mixed_column_naming(spark: SparkSession) -> DataFrame: "Gondor", 2, datetime.date(2022, 6, 19), - datetime.datetime.now(), + datetime.datetime.now(), # noqa: DTZ005 2345.5678, ), ( @@ -77,7 +77,7 @@ def init_df_with_mixed_column_naming(spark: SparkSession) -> DataFrame: "Rohan", 3, datetime.date(2021, 11, 5), - datetime.datetime.now(), + datetime.datetime.now(), # noqa: DTZ005 3456.7890, ), ( @@ -86,7 +86,7 @@ def init_df_with_mixed_column_naming(spark: SparkSession) -> DataFrame: "Shire", 4, datetime.date(2020, 1, 30), - datetime.datetime.now(), + datetime.datetime.now(), # noqa: DTZ005 4567.8901, ), ( @@ -95,7 +95,7 @@ def init_df_with_mixed_column_naming(spark: SparkSession) -> DataFrame: "Isengard", 5, datetime.date(2023, 8, 15), - datetime.datetime.now(), + datetime.datetime.now(), # noqa: DTZ005 5678.9012, ), ], diff --git a/tests/test_unit/test_auth/auth_fixtures/keycloak_fixture.py b/tests/test_unit/test_auth/auth_fixtures/keycloak_fixture.py index faf31aca..963c00a6 100644 --- a/tests/test_unit/test_auth/auth_fixtures/keycloak_fixture.py +++ b/tests/test_unit/test_auth/auth_fixtures/keycloak_fixture.py @@ -86,7 +86,7 @@ def _create_session_cookie(user, expire_in_msec=60000) -> str: @pytest_asyncio.fixture -async def mock_keycloak_api(settings): # noqa: F811 +async def mock_keycloak_api(settings): keycloak_settings = settings.auth.model_dump()["keycloak"] api_url = keycloak_settings["api_url"] diff --git a/tests/utils.py b/tests/utils.py index 92b47ccd..3283d7ee 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -29,6 +29,7 @@ ) from syncmaster.db.models import Status +from syncmaster.exceptions.base import EntityNotFoundError from syncmaster.server.settings import ServerAppSettings as Settings from tests.mocks import MockUser @@ -109,7 +110,7 @@ async def get_run_on_end( client: AsyncClient, run_id: int, token: str, - timeout: int = 120, + timeout: int = 120, # noqa: ASYNC109 ) -> dict[str, Any]: end_time = datetime.now(tz=UTC).timestamp() + timeout while True: @@ -119,7 +120,7 @@ async def get_run_on_end( headers={"Authorization": f"Bearer {token}"}, ) if response.status_code != 200: - raise Exception("Run not found") + raise EntityNotFoundError data = response.json() if data["status"] in [Status.FINISHED, Status.FAILED]: diff --git a/uv.lock b/uv.lock index 5a29afc0..2ae77a8b 100644 --- a/uv.lock +++ b/uv.lock @@ -903,31 +903,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e3/7f/a1a97644e39e7316d850784c642093c99df1290a460df4ede27659056834/filelock-3.20.1-py3-none-any.whl", hash = "sha256:15d9e9a67306188a44baa72f569d2bfd803076269365fdea0934385da4dc361a", size = 16666, upload-time = "2025-12-15T23:54:26.874Z" }, ] -[[package]] -name = "flake8" -version = "7.3.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "mccabe" }, - { name = "pycodestyle" }, - { name = "pyflakes" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/9b/af/fbfe3c4b5a657d79e5c47a2827a362f9e1b763336a52f926126aa6dc7123/flake8-7.3.0.tar.gz", hash = "sha256:fe044858146b9fc69b551a4b490d69cf960fcb78ad1edcb84e7fbb1b4a8e3872", size = 48326, upload-time = "2025-06-20T19:31:35.838Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/9f/56/13ab06b4f93ca7cac71078fbe37fcea175d3216f31f85c3168a6bbd0bb9a/flake8-7.3.0-py2.py3-none-any.whl", hash = "sha256:b9696257b9ce8beb888cdbe31cf885c90d31928fe202be0889a7cdafad32f01e", size = 57922, upload-time = "2025-06-20T19:31:34.425Z" }, -] - -[[package]] -name = "flake8-pyproject" -version = "1.2.4" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "flake8" }, -] -wheels = [ - { url = "https://files.pythonhosted.org/packages/85/6a/cdee9ff7f2b7c6ddc219fd95b7c70c0a3d9f0367a506e9793eedfc72e337/flake8_pyproject-1.2.4-py3-none-any.whl", hash = "sha256:ea34c057f9a9329c76d98723bb2bb498cc6ba8ff9872c4d19932d48c91249a77", size = 5694, upload-time = "2025-11-28T21:40:01.309Z" }, -] - [[package]] name = "frozendict" version = "2.4.7" @@ -1521,15 +1496,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/70/bc/6f1c2f612465f5fa89b95bead1f44dcb607670fd42891d8fdcd5d039f4f4/markupsafe-3.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:32001d6a8fc98c8cb5c947787c5d08b0a50663d139f1305bac5885d98d9b40fa", size = 14146, upload-time = "2025-09-27T18:37:28.327Z" }, ] -[[package]] -name = "mccabe" -version = "0.7.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/e7/ff/0ffefdcac38932a54d2b5eed4e0ba8a408f215002cd178ad1df0f2806ff8/mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325", size = 9658, upload-time = "2022-01-24T01:14:51.113Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/27/1a/1f68f9ba0c207934b35b86a8ca3aad8395a3d6dd7921c0686e23853ff5a9/mccabe-0.7.0-py2.py3-none-any.whl", hash = "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e", size = 7350, upload-time = "2022-01-24T01:14:49.62Z" }, -] - [[package]] name = "minio" version = "7.2.20" @@ -1844,15 +1810,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c8/f1/d6a797abb14f6283c0ddff96bbdd46937f64122b8c925cab503dd37f8214/pyasn1-0.6.1-py3-none-any.whl", hash = "sha256:0d632f46f2ba09143da3a8afe9e33fb6f92fa2320ab7e886e2d0f7672af84629", size = 83135, upload-time = "2024-09-11T16:00:36.122Z" }, ] -[[package]] -name = "pycodestyle" -version = "2.14.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/11/e0/abfd2a0d2efe47670df87f3e3a0e2edda42f055053c85361f19c0e2c1ca8/pycodestyle-2.14.0.tar.gz", hash = "sha256:c4b5b517d278089ff9d0abdec919cd97262a3367449ea1c8b49b91529167b783", size = 39472, upload-time = "2025-06-20T18:49:48.75Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d7/27/a58ddaf8c588a3ef080db9d0b7e0b97215cee3a45df74f3a94dbbf5c893a/pycodestyle-2.14.0-py2.py3-none-any.whl", hash = "sha256:dd6bf7cb4ee77f8e016f9c8e74a35ddd9f67e1d5fd4184d86c3b98e07099f42d", size = 31594, upload-time = "2025-06-20T18:49:47.491Z" }, -] - [[package]] name = "pycparser" version = "2.23" @@ -2031,15 +1988,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a0/cb/17907f09ac34e13c761ba3563db68d1d43333d001e6f0d3076bf62c16dab/pydantic_settings_logging-0.1.1-py3-none-any.whl", hash = "sha256:08a961882a6ef6bde4fc4f7ec121aa29ee46cbd2f013974702c9a57f84e6204b", size = 15309, upload-time = "2025-07-03T20:23:45.12Z" }, ] -[[package]] -name = "pyflakes" -version = "3.4.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/45/dc/fd034dc20b4b264b3d015808458391acbf9df40b1e54750ef175d39180b1/pyflakes-3.4.0.tar.gz", hash = "sha256:b24f96fafb7d2ab0ec5075b7350b3d2d2218eab42003821c06344973d3ea2f58", size = 64669, upload-time = "2025-06-20T18:45:27.834Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/c2/2f/81d580a0fb83baeb066698975cb14a618bdbed7720678566f1b046a95fe8/pyflakes-3.4.0-py2.py3-none-any.whl", hash = "sha256:f742a7dbd0d9cb9ea41e9a24a918996e8170c799fa528688d40dd582c8265f4f", size = 63551, upload-time = "2025-06-20T18:45:26.937Z" }, -] - [[package]] name = "pygments" version = "2.19.2" @@ -2794,8 +2742,6 @@ worker = [ [package.dev-dependencies] dev = [ - { name = "flake8" }, - { name = "flake8-pyproject" }, { name = "mypy" }, { name = "pre-commit" }, { name = "sqlalchemy", extra = ["mypy"] }, @@ -2867,8 +2813,6 @@ provides-extras = ["server", "scheduler", "worker", "kerberos"] [package.metadata.requires-dev] dev = [ - { name = "flake8", specifier = "~=7.3.0" }, - { name = "flake8-pyproject", specifier = "~=1.2.3" }, { name = "mypy", specifier = "~=1.19.1" }, { name = "pre-commit", specifier = "~=4.5.0" }, { name = "sqlalchemy", extras = ["mypy"], specifier = "~=2.0.44" },