From c868ea42c389879537d935a907dfdf15f7e26e04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 20 Nov 2025 16:54:52 +0300 Subject: [PATCH] [DOP-30631] Add support for Iceberg REST Catalog + S3 delegated access --- docs/changelog/next_release/297.feature.rst | 1 + pyproject.toml | 2 +- syncmaster/db/models/connection.py | 4 +- syncmaster/dto/connections.py | 65 +++- syncmaster/dto/transfers.py | 6 +- .../__init__.py | 10 +- .../basic.py | 7 +- .../oauth2_client_credentials.py | 7 +- .../auth/iceberg_rest_s3_direct/__init__.py | 31 ++ .../v1/auth/iceberg_rest_s3_direct/basic.py | 24 ++ .../oauth2_client_credentials.py | 32 ++ syncmaster/schemas/v1/connection_types.py | 6 +- .../schemas/v1/connections/connection_base.py | 10 +- syncmaster/schemas/v1/connections/iceberg.py | 79 ++++- syncmaster/schemas/v1/transfers/db.py | 8 +- syncmaster/worker/controller.py | 24 +- syncmaster/worker/handlers/db/iceberg.py | 86 +++-- syncmaster/worker/spark.py | 2 +- .../connection_fixtures/iceberg_fixtures.py | 14 +- .../test_run_transfer/test_iceberg.py | 4 +- .../group_connections_fixture.py | 3 +- .../test_create_connection.py | 2 +- .../test_create_iceberg_connection.py | 310 +++++++++++++++++- .../test_update_iceberg_connection.py | 164 ++++++++- .../test_connections/test_read_connections.py | 2 +- .../test_transfers/test_create_transfer.py | 4 +- tests/test_unit/utils.py | 28 +- tests/utils.py | 6 + 28 files changed, 812 insertions(+), 129 deletions(-) create mode 100644 docs/changelog/next_release/297.feature.rst rename syncmaster/schemas/v1/auth/{iceberg => iceberg_rest_s3_delegated}/__init__.py (69%) rename syncmaster/schemas/v1/auth/{iceberg => iceberg_rest_s3_delegated}/basic.py (70%) rename syncmaster/schemas/v1/auth/{iceberg => iceberg_rest_s3_delegated}/oauth2_client_credentials.py (78%) create mode 100644 syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/__init__.py create mode 100644 syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/basic.py create mode 100644 syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/oauth2_client_credentials.py diff --git a/docs/changelog/next_release/297.feature.rst b/docs/changelog/next_release/297.feature.rst new file mode 100644 index 00000000..d82f84f1 --- /dev/null +++ b/docs/changelog/next_release/297.feature.rst @@ -0,0 +1 @@ +Added support for fetching S3 params & credentials from Iceberg REST Catalog (delegation). diff --git a/pyproject.toml b/pyproject.toml index 1cce0baa..4c554c47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -263,7 +263,7 @@ max-annotation-complexity = 4 max-returns = 5 max-awaits = 5 max-local-variables = 20 -max-name-length = 60 +max-name-length = 65 # Max of expressions in a function max-expressions = 15 # Max args in a function diff --git a/syncmaster/db/models/connection.py b/syncmaster/db/models/connection.py index 16df8710..7915e7e8 100644 --- a/syncmaster/db/models/connection.py +++ b/syncmaster/db/models/connection.py @@ -18,7 +18,7 @@ class ConnectionType(StrEnum): POSTGRES = "postgres" HIVE = "hive" - ICEBERG_REST_S3 = "iceberg_rest_s3" + ICEBERG = "iceberg" ORACLE = "oracle" CLICKHOUSE = "clickhouse" MSSQL = "mssql" @@ -62,7 +62,7 @@ class Connection(Base, ResourceMixin, TimestampMixin): 'simple', translate(coalesce(data->>'host', ''), './-_:\\', ' ') ) - """, + """, # noqa: WPS342 persisted=True, ), nullable=False, diff --git a/syncmaster/dto/connections.py b/syncmaster/dto/connections.py index 52bbf58f..14d346c6 100644 --- a/syncmaster/dto/connections.py +++ b/syncmaster/dto/connections.py @@ -74,8 +74,14 @@ class HiveConnectionDTO(ConnectionDTO): @dataclass -class IcebergRESTCatalogS3ConnectionBaseDTO(ConnectionDTO): +class IcebergConnectionBaseDTO(ConnectionDTO): rest_catalog_url: str + flavor: ClassVar[str] + type: ClassVar[str] = "iceberg" + + +@dataclass +class IcebergRESTCatalogS3DirectConnectionBaseDTO(IcebergConnectionBaseDTO): s3_warehouse_path: str s3_host: str s3_bucket: str @@ -86,18 +92,18 @@ class IcebergRESTCatalogS3ConnectionBaseDTO(ConnectionDTO): s3_port: int | None s3_protocol: str s3_additional_params: dict - type: ClassVar[str] = "iceberg_rest_s3" + implementation: ClassVar[str] = "iceberg_rest_s3_direct" @dataclass(kw_only=True) -class IcebergRESTCatalogBasicAuthS3DTO(IcebergRESTCatalogS3ConnectionBaseDTO): +class IcebergRESTCatalogBasicAuthS3BasicDTO(IcebergRESTCatalogS3DirectConnectionBaseDTO): rest_catalog_username: str rest_catalog_password: str rest_catalog_auth_type: Literal["basic"] = "basic" @dataclass(kw_only=True) -class IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(IcebergRESTCatalogS3ConnectionBaseDTO): +class IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO(IcebergRESTCatalogS3DirectConnectionBaseDTO): rest_catalog_oauth2_client_id: str rest_catalog_oauth2_client_secret: str rest_catalog_oauth2_scopes: list[str] @@ -107,13 +113,52 @@ class IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(IcebergRESTCatalogS3Connect rest_catalog_auth_type: Literal["oauth2"] = "oauth2" -# TODO: should be refactored -class IcebergRESTCatalogS3ConnectionDTO: - def __new__(cls, **data): - if "rest_catalog_oauth2_client_id" in data: - return IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(**data) +@dataclass +class IcebergRESTCatalogS3DelegatedConnectionBaseDTO(IcebergConnectionBaseDTO): + s3_warehouse_name: str | None = None + s3_access_delegation: Literal["vended-credentials", "remote-signing"] = "vended-credentials" + implementation: ClassVar[str] = "iceberg_rest_s3_delegated" + + +@dataclass(kw_only=True) +class IcebergRESTCatalogBasicAuthS3DelegatedDTO(IcebergRESTCatalogS3DelegatedConnectionBaseDTO): + rest_catalog_username: str + rest_catalog_password: str + rest_catalog_auth_type: Literal["basic"] = "basic" + - return IcebergRESTCatalogBasicAuthS3DTO(**data) +@dataclass(kw_only=True) +class IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO(IcebergRESTCatalogS3DelegatedConnectionBaseDTO): + rest_catalog_oauth2_client_id: str + rest_catalog_oauth2_client_secret: str + rest_catalog_oauth2_scopes: list[str] + rest_catalog_oauth2_resource: str | None = None + rest_catalog_oauth2_audience: str | None = None + rest_catalog_oauth2_token_endpoint: str | None = None + rest_catalog_auth_type: Literal["oauth2"] = "oauth2" + + +# TODO: should be refactored +def get_iceberg_rest_catalog_s3_direct_connection_dto( + **data, +) -> IcebergRESTCatalogBasicAuthS3BasicDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO: + if "rest_catalog_oauth2_client_id" in data: + return IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO(**data) + return IcebergRESTCatalogBasicAuthS3BasicDTO(**data) + + +def get_iceberg_rest_catalog_s3_delegated_connection_dto( + **data, +) -> IcebergRESTCatalogBasicAuthS3DelegatedDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO: + if "rest_catalog_oauth2_client_id" in data: + return IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO(**data) + return IcebergRESTCatalogBasicAuthS3DelegatedDTO(**data) + + +def get_iceberg_connection_dto(**data) -> IcebergConnectionBaseDTO: + if "s3_warehouse_path" in data: + return get_iceberg_rest_catalog_s3_direct_connection_dto(**data) + return get_iceberg_rest_catalog_s3_delegated_connection_dto(**data) @dataclass diff --git a/syncmaster/dto/transfers.py b/syncmaster/dto/transfers.py index 056d5974..88db9ba4 100644 --- a/syncmaster/dto/transfers.py +++ b/syncmaster/dto/transfers.py @@ -128,9 +128,9 @@ def __post_init__(self): @dataclass -class IcebergRESTCatalogS3TransferDTO(DBTransferDTO): - type: ClassVar[str] = "iceberg_rest_s3" - catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_{uuid4().hex[:8]}") # noqa: WPS237 +class IcebergTransferDTO(DBTransferDTO): + type: ClassVar[str] = "iceberg" + catalog_name: str = field(default_factory=lambda: f"iceberg_{uuid4().hex[:8]}") # noqa: WPS237 def __post_init__(self): super().__post_init__() diff --git a/syncmaster/schemas/v1/auth/iceberg/__init__.py b/syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/__init__.py similarity index 69% rename from syncmaster/schemas/v1/auth/iceberg/__init__.py rename to syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/__init__.py index 23b08c27..afdad3fe 100644 --- a/syncmaster/schemas/v1/auth/iceberg/__init__.py +++ b/syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/__init__.py @@ -4,28 +4,28 @@ from pydantic import Field -from syncmaster.schemas.v1.auth.iceberg.basic import ( +from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.basic import ( CreateIcebergRESTCatalogBasicAuthSchema, ReadIcebergRESTCatalogBasicAuthSchema, UpdateIcebergRESTCatalogBasicAuthSchema, ) -from syncmaster.schemas.v1.auth.iceberg.oauth2_client_credentials import ( +from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.oauth2_client_credentials import ( CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, ) -CreateIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[ +CreateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[ CreateIcebergRESTCatalogBasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, Field(discriminator="type"), ] -ReadIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[ +ReadIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[ ReadIcebergRESTCatalogBasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, Field(discriminator="type"), ] -UpdateIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[ +UpdateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[ UpdateIcebergRESTCatalogBasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, Field(discriminator="type"), ] diff --git a/syncmaster/schemas/v1/auth/iceberg/basic.py b/syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/basic.py similarity index 70% rename from syncmaster/schemas/v1/auth/iceberg/basic.py rename to syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/basic.py index 5c2f8699..3f597156 100644 --- a/syncmaster/schemas/v1/auth/iceberg/basic.py +++ b/syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/basic.py @@ -6,19 +6,16 @@ class ReadIcebergRESTCatalogBasicAuthSchema(BaseModel): - type: Literal["iceberg_rest_basic_s3_basic"] = Field(description="Auth type") + type: Literal["iceberg_rest_basic"] = Field(description="Auth type") rest_catalog_username: str - s3_access_key: str class CreateIcebergRESTCatalogBasicAuthSchema(ReadIcebergRESTCatalogBasicAuthSchema): rest_catalog_password: SecretStr - s3_secret_key: SecretStr class UpdateIcebergRESTCatalogBasicAuthSchema(ReadIcebergRESTCatalogBasicAuthSchema): rest_catalog_password: SecretStr | None = None - s3_secret_key: SecretStr | None = None def get_secret_fields(self) -> tuple[str, ...]: - return ("rest_catalog_password", "s3_secret_key") + return ("rest_catalog_password",) diff --git a/syncmaster/schemas/v1/auth/iceberg/oauth2_client_credentials.py b/syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/oauth2_client_credentials.py similarity index 78% rename from syncmaster/schemas/v1/auth/iceberg/oauth2_client_credentials.py rename to syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/oauth2_client_credentials.py index e9f221e9..bbbd72bd 100644 --- a/syncmaster/schemas/v1/auth/iceberg/oauth2_client_credentials.py +++ b/syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/oauth2_client_credentials.py @@ -8,27 +8,24 @@ class ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(BaseModel): - type: Literal["iceberg_rest_oauth2_client_credentials_s3_basic"] = Field(description="Auth type") + type: Literal["iceberg_rest_oauth2_client_credentials"] = Field(description="Auth type") rest_catalog_oauth2_client_id: str rest_catalog_oauth2_scopes: list[str] = Field(default_factory=list) rest_catalog_oauth2_resource: str | None = None rest_catalog_oauth2_audience: str | None = None rest_catalog_oauth2_token_endpoint: URL | None = None - s3_access_key: str class CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema( ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, ): rest_catalog_oauth2_client_secret: SecretStr - s3_secret_key: SecretStr class UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema( ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, ): rest_catalog_oauth2_client_secret: SecretStr | None = None - s3_secret_key: SecretStr | None = None def get_secret_fields(self) -> tuple[str, ...]: - return ("rest_catalog_oauth2_client_secret", "s3_secret_key") + return ("rest_catalog_oauth2_client_secret",) diff --git a/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/__init__.py b/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/__init__.py new file mode 100644 index 00000000..c3c0115c --- /dev/null +++ b/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/__init__.py @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from typing import Annotated + +from pydantic import Field + +from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.basic import ( + CreateIcebergRESTCatalogBasicS3BasicAuthSchema, + ReadIcebergRESTCatalogBasicS3BasicAuthSchema, + UpdateIcebergRESTCatalogBasicS3BasicAuthSchema, +) +from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.oauth2_client_credentials import ( + CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, + ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, + UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, +) + +CreateIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[ + CreateIcebergRESTCatalogBasicS3BasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, + Field(discriminator="type"), +] + +ReadIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[ + ReadIcebergRESTCatalogBasicS3BasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, + Field(discriminator="type"), +] + +UpdateIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[ + UpdateIcebergRESTCatalogBasicS3BasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, + Field(discriminator="type"), +] diff --git a/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/basic.py b/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/basic.py new file mode 100644 index 00000000..9e40b068 --- /dev/null +++ b/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/basic.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from typing import Literal + +from pydantic import BaseModel, Field, SecretStr + + +class ReadIcebergRESTCatalogBasicS3BasicAuthSchema(BaseModel): + type: Literal["iceberg_rest_basic_s3_basic"] = Field(description="Auth type") + rest_catalog_username: str + s3_access_key: str + + +class CreateIcebergRESTCatalogBasicS3BasicAuthSchema(ReadIcebergRESTCatalogBasicS3BasicAuthSchema): + rest_catalog_password: SecretStr + s3_secret_key: SecretStr + + +class UpdateIcebergRESTCatalogBasicS3BasicAuthSchema(ReadIcebergRESTCatalogBasicS3BasicAuthSchema): + rest_catalog_password: SecretStr | None = None + s3_secret_key: SecretStr | None = None + + def get_secret_fields(self) -> tuple[str, ...]: + return ("rest_catalog_password", "s3_secret_key") diff --git a/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/oauth2_client_credentials.py b/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/oauth2_client_credentials.py new file mode 100644 index 00000000..bdcb7579 --- /dev/null +++ b/syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/oauth2_client_credentials.py @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from typing import Literal + +from pydantic import BaseModel, Field, SecretStr + + +class ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema(BaseModel): + type: Literal["iceberg_rest_oauth2_client_credentials_s3_basic"] = Field(description="Auth type") + rest_catalog_oauth2_client_id: str + rest_catalog_oauth2_scopes: list[str] = Field(default_factory=list) + rest_catalog_oauth2_resource: str | None = None + rest_catalog_oauth2_audience: str | None = None + rest_catalog_oauth2_token_endpoint: str | None = None + s3_access_key: str + + +class CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema( + ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, +): + rest_catalog_oauth2_client_secret: SecretStr + s3_secret_key: SecretStr + + +class UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema( + ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, +): + rest_catalog_oauth2_client_secret: SecretStr | None = None + s3_secret_key: SecretStr | None = None + + def get_secret_fields(self) -> tuple[str, ...]: + return ("rest_catalog_oauth2_client_secret", "s3_secret_key") diff --git a/syncmaster/schemas/v1/connection_types.py b/syncmaster/schemas/v1/connection_types.py index 6576c078..cfd004b7 100644 --- a/syncmaster/schemas/v1/connection_types.py +++ b/syncmaster/schemas/v1/connection_types.py @@ -3,7 +3,7 @@ from typing import Literal HIVE_TYPE = Literal["hive"] -ICEBERG_REST_S3_TYPE = Literal["iceberg_rest_s3"] +ICEBERG_TYPE = Literal["iceberg"] ORACLE_TYPE = Literal["oracle"] POSTGRES_TYPE = Literal["postgres"] CLICKHOUSE_TYPE = Literal["clickhouse"] @@ -20,7 +20,7 @@ CONNECTION_TYPES = [ "clickhouse", "hive", - "iceberg_rest_s3", + "iceberg", "mssql", "mysql", "oracle", @@ -45,7 +45,7 @@ DB_CONNECTION_TYPES = [ "clickhouse", "hive", - "iceberg_rest_s3", + "iceberg", "mssql", "mysql", "oracle", diff --git a/syncmaster/schemas/v1/connections/connection_base.py b/syncmaster/schemas/v1/connections/connection_base.py index 6ff312bb..02cb7c12 100644 --- a/syncmaster/schemas/v1/connections/connection_base.py +++ b/syncmaster/schemas/v1/connections/connection_base.py @@ -7,18 +7,22 @@ ReadS3AuthSchema, ReadSambaAuthSchema, ) -from syncmaster.schemas.v1.auth.iceberg.basic import ( +from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated import ( ReadIcebergRESTCatalogBasicAuthSchema, -) -from syncmaster.schemas.v1.auth.iceberg.oauth2_client_credentials import ( ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema, ) +from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct import ( + ReadIcebergRESTCatalogBasicS3BasicAuthSchema, + ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema, +) from syncmaster.schemas.v1.types import NameConstr ReadConnectionAuthDataSchema = ( ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema + | ReadIcebergRESTCatalogBasicS3BasicAuthSchema + | ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema | ReadIcebergRESTCatalogBasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema ) diff --git a/syncmaster/schemas/v1/connections/iceberg.py b/syncmaster/schemas/v1/connections/iceberg.py index fe35f3a8..3dbdf2fa 100644 --- a/syncmaster/schemas/v1/connections/iceberg.py +++ b/syncmaster/schemas/v1/connections/iceberg.py @@ -1,16 +1,21 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 -from typing import Literal +from typing import Annotated, Literal -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator -from syncmaster.schemas.v1.auth.iceberg import ( - CreateIcebergRESTCatalogS3ConnectionAuthDataSchema, - ReadIcebergRESTCatalogS3ConnectionAuthDataSchema, - UpdateIcebergRESTCatalogS3ConnectionAuthDataSchema, +from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated import ( + CreateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema, + ReadIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema, + UpdateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema, ) -from syncmaster.schemas.v1.connection_types import ICEBERG_REST_S3_TYPE +from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct import ( + CreateIcebergRESTCatalogS3DirectConnectionAuthDataSchema, + ReadIcebergRESTCatalogS3DirectConnectionAuthDataSchema, + UpdateIcebergRESTCatalogS3DirectConnectionAuthDataSchema, +) +from syncmaster.schemas.v1.connection_types import ICEBERG_TYPE from syncmaster.schemas.v1.connections.connection_base import ( CreateConnectionBaseSchema, ReadConnectionBaseSchema, @@ -18,7 +23,8 @@ from syncmaster.schemas.v1.types import URL -class IcebergRESTCatalogS3ConnectionDataSchema(BaseModel): +class IcebergRESTCatalogS3DirectConnectionDataSchema(BaseModel): + type: Literal["iceberg_rest_s3_direct"] rest_catalog_url: URL s3_warehouse_path: str s3_host: str @@ -30,27 +36,70 @@ class IcebergRESTCatalogS3ConnectionDataSchema(BaseModel): s3_additional_params: dict = Field(default_factory=dict) +class IcebergRESTCatalogS3DelegatedConnectionDataSchema(BaseModel): + type: Literal["iceberg_rest_s3_delegated"] + rest_catalog_url: URL + s3_warehouse_name: str | None = None + s3_access_delegation: Literal["vended-credentials", "remote-signing"] = "vended-credentials" + + +IcebergConnectionDataSchema = Annotated[ + IcebergRESTCatalogS3DirectConnectionDataSchema | IcebergRESTCatalogS3DelegatedConnectionDataSchema, + Field(discriminator="type"), +] + +CreateIcebergAuthDataSchema = Annotated[ + CreateIcebergRESTCatalogS3DirectConnectionAuthDataSchema + | CreateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema, + Field(discriminator="type"), +] + +ReadIcebergAuthDataSchema = Annotated[ + ReadIcebergRESTCatalogS3DirectConnectionAuthDataSchema | ReadIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema, + Field(discriminator="type"), +] + +UpdateIcebergAuthDataSchema = Annotated[ + UpdateIcebergRESTCatalogS3DirectConnectionAuthDataSchema + | UpdateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema, + Field(discriminator="type"), +] + + class CreateIcebergConnectionSchema(CreateConnectionBaseSchema): - type: ICEBERG_REST_S3_TYPE = Field(description="Connection type") - data: IcebergRESTCatalogS3ConnectionDataSchema = Field( + type: ICEBERG_TYPE = Field(description="Connection type") + data: IcebergConnectionDataSchema = Field( alias="connection_data", description="Data required to connect to REST catalog and to object storage", ) - auth_data: CreateIcebergRESTCatalogS3ConnectionAuthDataSchema = Field( + auth_data: CreateIcebergAuthDataSchema = Field( description="Credentials for REST Catalog and object storage", ) + @model_validator(mode="after") + def connection_and_auth_data_match(self): + if not self.auth_data: + return self + if self.data.type == "iceberg_rest_s3_direct" and "s3" not in self.auth_data.type: + raise ValueError("Cannot create direct S3 connection without S3 credentials") + if self.data.type == "iceberg_rest_s3_delegated" and "s3" in self.auth_data.type: + raise ValueError("Cannot create delegated S3 connection with S3 credentials") + return self + class ReadIcebergConnectionSchema(ReadConnectionBaseSchema): - type: ICEBERG_REST_S3_TYPE = Field(description="Connection type") - data: IcebergRESTCatalogS3ConnectionDataSchema = Field( + type: ICEBERG_TYPE = Field(description="Connection type") + data: IcebergConnectionDataSchema = Field( alias="connection_data", description="Data required to connect to REST catalog and to object storage", ) - auth_data: ReadIcebergRESTCatalogS3ConnectionAuthDataSchema | None = None + auth_data: ReadIcebergAuthDataSchema | None = Field( + default=None, + description="Credentials for REST Catalog and object storage", + ) class UpdateIcebergConnectionSchema(CreateIcebergConnectionSchema): - auth_data: UpdateIcebergRESTCatalogS3ConnectionAuthDataSchema = Field( + auth_data: UpdateIcebergAuthDataSchema = Field( description="Credentials for REST Catalog and object storage", ) diff --git a/syncmaster/schemas/v1/transfers/db.py b/syncmaster/schemas/v1/transfers/db.py index ea7aea27..0f023f09 100644 --- a/syncmaster/schemas/v1/transfers/db.py +++ b/syncmaster/schemas/v1/transfers/db.py @@ -7,7 +7,7 @@ from syncmaster.schemas.v1.connection_types import ( CLICKHOUSE_TYPE, HIVE_TYPE, - ICEBERG_REST_S3_TYPE, + ICEBERG_TYPE, MSSQL_TYPE, MYSQL_TYPE, ORACLE_TYPE, @@ -43,14 +43,14 @@ class MySQLTransferSourceOrTarget(DBTransfer): type: MYSQL_TYPE -class IcebergRESTCatalogS3TransferSourceOrTarget(DBTransfer): - type: ICEBERG_REST_S3_TYPE +class IcebergTransferSourceOrTarget(DBTransfer): + type: ICEBERG_TYPE DBTransferSourceOrTarget = ( ClickhouseTransferSourceOrTarget | HiveTransferSourceOrTarget - | IcebergRESTCatalogS3TransferSourceOrTarget + | IcebergTransferSourceOrTarget | MSSQLTransferSourceOrTarget | MySQLTransferSourceOrTarget | OracleTransferSourceOrTarget diff --git a/syncmaster/worker/controller.py b/syncmaster/worker/controller.py index d068109c..6890185d 100644 --- a/syncmaster/worker/controller.py +++ b/syncmaster/worker/controller.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 import logging +from collections.abc import Callable from tempfile import TemporaryDirectory from typing import Any @@ -13,11 +14,11 @@ from syncmaster.db.models import Connection, Run from syncmaster.dto.connections import ( ClickhouseConnectionDTO, + ConnectionDTO, FTPConnectionDTO, FTPSConnectionDTO, HDFSConnectionDTO, HiveConnectionDTO, - IcebergRESTCatalogS3ConnectionDTO, MSSQLConnectionDTO, MySQLConnectionDTO, OracleConnectionDTO, @@ -26,6 +27,7 @@ SambaConnectionDTO, SFTPConnectionDTO, WebDAVConnectionDTO, + get_iceberg_connection_dto, ) from syncmaster.dto.runs import RunDTO from syncmaster.dto.transfers import ( @@ -34,7 +36,7 @@ FTPTransferDTO, HDFSTransferDTO, HiveTransferDTO, - IcebergRESTCatalogS3TransferDTO, + IcebergTransferDTO, MSSQLTransferDTO, MySQLTransferDTO, OracleTransferDTO, @@ -42,6 +44,7 @@ S3TransferDTO, SambaTransferDTO, SFTPTransferDTO, + TransferDTO, WebDAVTransferDTO, ) from syncmaster.dto.transfers_resources import Resources @@ -70,17 +73,26 @@ logger = logging.getLogger(__name__) -connection_handler_proxy = { +HANDLERS_MAPPING = dict[ + str, + tuple[ + type[Handler], + Callable[..., ConnectionDTO], + type[TransferDTO], + type[RunDTO], + ], +] +connection_handler_proxy: HANDLERS_MAPPING = { "hive": ( HiveHandler, HiveConnectionDTO, HiveTransferDTO, RunDTO, ), - "iceberg_rest_s3": ( + "iceberg": ( IcebergRESTCatalogS3Handler, - IcebergRESTCatalogS3ConnectionDTO, - IcebergRESTCatalogS3TransferDTO, + get_iceberg_connection_dto, + IcebergTransferDTO, RunDTO, ), "oracle": ( diff --git a/syncmaster/worker/handlers/db/iceberg.py b/syncmaster/worker/handlers/db/iceberg.py index 2cfc04cf..9cb35ed2 100644 --- a/syncmaster/worker/handlers/db/iceberg.py +++ b/syncmaster/worker/handlers/db/iceberg.py @@ -8,8 +8,15 @@ from onetl.connection import Iceberg from onetl.hooks import slot, support_hooks -from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionBaseDTO -from syncmaster.dto.transfers import IcebergRESTCatalogS3TransferDTO +from syncmaster.dto.connections import ( + IcebergRESTCatalogBasicAuthS3BasicDTO, + IcebergRESTCatalogBasicAuthS3DelegatedDTO, + IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO, + IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO, + IcebergRESTCatalogS3DelegatedConnectionBaseDTO, + IcebergRESTCatalogS3DirectConnectionBaseDTO, +) +from syncmaster.dto.transfers import IcebergTransferDTO from syncmaster.worker.handlers.db.base import DBHandler if TYPE_CHECKING: @@ -20,33 +27,47 @@ @support_hooks class IcebergRESTCatalogS3Handler(DBHandler): connection: Iceberg - connection_dto: IcebergRESTCatalogS3ConnectionBaseDTO - transfer_dto: IcebergRESTCatalogS3TransferDTO + connection_dto: IcebergRESTCatalogS3DirectConnectionBaseDTO | IcebergRESTCatalogS3DelegatedConnectionBaseDTO + transfer_dto: IcebergTransferDTO _operators = { "regexp": "RLIKE", **DBHandler._operators, } def connect(self, spark: SparkSession): - self.connection = Iceberg( - spark=spark, - catalog_name=self.transfer_dto.catalog_name, - catalog=Iceberg.RESTCatalog( - uri=self.connection_dto.rest_catalog_url, - auth=self._make_auth(), - ), - warehouse=Iceberg.S3Warehouse( - path=self.connection_dto.s3_warehouse_path, - host=self.connection_dto.s3_host, - port=self.connection_dto.s3_port, - protocol=self.connection_dto.s3_protocol, - bucket=self.connection_dto.s3_bucket, - path_style_access=self.connection_dto.s3_bucket_style == "path", - region=self.connection_dto.s3_region, - access_key=self.connection_dto.s3_access_key, - secret_key=self.connection_dto.s3_secret_key, - ), - ).check() + if isinstance(self.connection_dto, IcebergRESTCatalogS3DirectConnectionBaseDTO): + self.connection = Iceberg( + spark=spark, + catalog_name=self.transfer_dto.catalog_name, + catalog=Iceberg.RESTCatalog( + uri=self.connection_dto.rest_catalog_url, + auth=self._make_auth(), + ), + warehouse=Iceberg.S3Warehouse( + path=self.connection_dto.s3_warehouse_path, + host=self.connection_dto.s3_host, + port=self.connection_dto.s3_port, + protocol=self.connection_dto.s3_protocol, + bucket=self.connection_dto.s3_bucket, + path_style_access=self.connection_dto.s3_bucket_style == "path", + region=self.connection_dto.s3_region, + access_key=self.connection_dto.s3_access_key, + secret_key=self.connection_dto.s3_secret_key, + ), + ).check() + else: + self.connection = Iceberg( + spark=spark, + catalog_name=self.transfer_dto.catalog_name, + catalog=Iceberg.RESTCatalog( + uri=self.connection_dto.rest_catalog_url, + auth=self._make_auth(), + ), + warehouse=Iceberg.DelegatedWarehouse( + name=self.connection_dto.s3_warehouse_name, + access_delegation=self.connection_dto.s3_access_delegation, + ), + ).check() @slot def read(self) -> DataFrame: @@ -90,7 +111,11 @@ def _quote_field(self, field: str) -> str: return f"`{field}`" def _make_auth(self): - if self.connection_dto.rest_catalog_auth_type == "oauth2": + if isinstance( + self.connection_dto, + IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO + | IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO, + ): return Iceberg.RESTCatalog.OAuth2ClientCredentials( client_id=self.connection_dto.rest_catalog_oauth2_client_id, client_secret=self.connection_dto.rest_catalog_oauth2_client_secret, @@ -99,7 +124,12 @@ def _make_auth(self): audience=self.connection_dto.rest_catalog_oauth2_audience, oauth2_token_endpoint=self.connection_dto.rest_catalog_oauth2_token_endpoint, ) - return Iceberg.RESTCatalog.BasicAuth( - user=self.connection_dto.rest_catalog_username, - password=self.connection_dto.rest_catalog_password, - ) + if isinstance( + self.connection_dto, + IcebergRESTCatalogBasicAuthS3DelegatedDTO | IcebergRESTCatalogBasicAuthS3BasicDTO, + ): + return Iceberg.RESTCatalog.BasicAuth( + user=self.connection_dto.rest_catalog_username, + password=self.connection_dto.rest_catalog_password, + ) + return None diff --git a/syncmaster/worker/spark.py b/syncmaster/worker/spark.py index 8ecd2281..db600e19 100644 --- a/syncmaster/worker/spark.py +++ b/syncmaster/worker/spark.py @@ -89,7 +89,7 @@ def get_packages(connection_types: set[str]) -> list[str]: # noqa: WPS212 if connection_types & {"s3", "all"}: result.extend(SparkS3.get_packages(spark_version=spark_version)) - if connection_types & {"iceberg_rest_s3", "all"}: + if connection_types & {"iceberg", "all"}: result.extend( [ *Iceberg.get_packages(package_version="1.10.0", spark_version=spark_version), diff --git a/tests/test_integration/test_run_transfer/connection_fixtures/iceberg_fixtures.py b/tests/test_integration/test_run_transfer/connection_fixtures/iceberg_fixtures.py index 7ea749ff..1d411a3f 100644 --- a/tests/test_integration/test_run_transfer/connection_fixtures/iceberg_fixtures.py +++ b/tests/test_integration/test_run_transfer/connection_fixtures/iceberg_fixtures.py @@ -9,7 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from syncmaster.db.models import Group -from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO +from syncmaster.dto.connections import IcebergRESTCatalogBasicAuthS3BasicDTO from syncmaster.server.settings import ServerAppSettings as Settings from tests.settings import TestSettings from tests.test_unit.utils import create_connection, create_credentials @@ -21,8 +21,8 @@ scope="session", params=[pytest.param("iceberg", marks=[pytest.mark.iceberg])], ) -def iceberg_rest_s3_for_conftest(test_settings: TestSettings) -> IcebergRESTCatalogS3ConnectionDTO: - return IcebergRESTCatalogS3ConnectionDTO( +def iceberg_rest_s3_for_conftest(test_settings: TestSettings) -> IcebergRESTCatalogBasicAuthS3BasicDTO: + return IcebergRESTCatalogBasicAuthS3BasicDTO( rest_catalog_url=test_settings.TEST_ICEBERG_REST_CATALOG_URL_FOR_CONFTEST, s3_warehouse_path=test_settings.TEST_ICEBERG_S3_WAREHOUSE_PATH, s3_host=test_settings.TEST_S3_HOST_FOR_CONFTEST, @@ -43,8 +43,8 @@ def iceberg_rest_s3_for_conftest(test_settings: TestSettings) -> IcebergRESTCata scope="session", params=[pytest.param("iceberg", marks=[pytest.mark.iceberg])], ) -def iceberg_rest_s3_for_worker(test_settings: TestSettings) -> IcebergRESTCatalogS3ConnectionDTO: - return IcebergRESTCatalogS3ConnectionDTO( +def iceberg_rest_s3_for_worker(test_settings: TestSettings) -> IcebergRESTCatalogBasicAuthS3BasicDTO: + return IcebergRESTCatalogBasicAuthS3BasicDTO( rest_catalog_url=test_settings.TEST_ICEBERG_REST_CATALOG_URL_FOR_WORKER, s3_warehouse_path=test_settings.TEST_ICEBERG_S3_WAREHOUSE_PATH, s3_host=test_settings.TEST_S3_HOST_FOR_WORKER, @@ -64,7 +64,7 @@ def iceberg_rest_s3_for_worker(test_settings: TestSettings) -> IcebergRESTCatalo @pytest.fixture def prepare_iceberg_rest_s3( spark: SparkSession, - iceberg_rest_s3_for_conftest: IcebergRESTCatalogS3ConnectionDTO, + iceberg_rest_s3_for_conftest: IcebergRESTCatalogBasicAuthS3BasicDTO, s3_file_connection: S3, ): iceberg = iceberg_rest_s3_for_conftest @@ -118,7 +118,7 @@ def fill_with_data(df: DataFrame): @pytest_asyncio.fixture async def iceberg_rest_s3_connection( - iceberg_rest_s3_for_worker: IcebergRESTCatalogS3ConnectionDTO, + iceberg_rest_s3_for_worker: IcebergRESTCatalogBasicAuthS3BasicDTO, settings: Settings, session: AsyncSession, group: Group, diff --git a/tests/test_integration/test_run_transfer/test_iceberg.py b/tests/test_integration/test_run_transfer/test_iceberg.py index ea4167e2..9a52c145 100644 --- a/tests/test_integration/test_run_transfer/test_iceberg.py +++ b/tests/test_integration/test_run_transfer/test_iceberg.py @@ -41,7 +41,7 @@ async def postgres_to_iceberg_rest_s3( "table_name": "public.source_table", }, target_params={ - "type": "iceberg_rest_s3", + "type": "iceberg", "table_name": "default.target_table", "catalog_name": "iceberg_rest_s3", }, @@ -71,7 +71,7 @@ async def iceberg_rest_s3_to_postgres( source_connection_id=iceberg_rest_s3_connection.id, target_connection_id=postgres_connection.id, source_params={ - "type": "iceberg_rest_s3", + "type": "iceberg", "table_name": "default.source_table", "catalog_name": "iceberg_rest_s3", }, diff --git a/tests/test_unit/test_connections/connection_fixtures/group_connections_fixture.py b/tests/test_unit/test_connections/connection_fixtures/group_connections_fixture.py index 8019a508..5fc573ca 100644 --- a/tests/test_unit/test_connections/connection_fixtures/group_connections_fixture.py +++ b/tests/test_unit/test_connections/connection_fixtures/group_connections_fixture.py @@ -31,9 +31,10 @@ async def group_connections( "cluster": "cluster", }, ) - elif conn_type == ConnectionType.ICEBERG_REST_S3: + elif conn_type == ConnectionType.ICEBERG: new_data.update( { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "https://rest.domain.com", "s3_warehouse_path": "/some/warehouse", "s3_host": "s3.domain.com", diff --git a/tests/test_unit/test_connections/test_create_connection.py b/tests/test_unit/test_connections/test_create_connection.py index 9b536715..8fbc03fd 100644 --- a/tests/test_unit/test_connections/test_create_connection.py +++ b/tests/test_unit/test_connections/test_create_connection.py @@ -215,7 +215,7 @@ async def test_check_name_field_validation_on_create_connection( message = response.json()["error"]["details"][0]["message"] assert message == ( "Input tag 'POSTGRESQL' found using 'type' does not match any of the expected tags: " - "'clickhouse', 'hive', 'iceberg_rest_s3', 'mssql', 'mysql', 'oracle', 'postgres', " + "'clickhouse', 'hive', 'iceberg', 'mssql', 'mysql', 'oracle', 'postgres', " "'ftp', 'ftps', 'hdfs', 's3', 'samba', 'sftp', 'webdav'" ) diff --git a/tests/test_unit/test_connections/test_db_connection/test_create_iceberg_connection.py b/tests/test_unit/test_connections/test_db_connection/test_create_iceberg_connection.py index 3b68e0c9..86797f27 100644 --- a/tests/test_unit/test_connections/test_db_connection/test_create_iceberg_connection.py +++ b/tests/test_unit/test_connections/test_db_connection/test_create_iceberg_connection.py @@ -11,7 +11,7 @@ pytestmark = [pytest.mark.asyncio, pytest.mark.server, pytest.mark.iceberg] -async def test_developer_plus_can_create_iceberg_rest_s3_connection( +async def test_developer_plus_can_create_iceberg_rest_s3_direct_connection( client: AsyncClient, group: MockGroup, session: AsyncSession, @@ -27,8 +27,9 @@ async def test_developer_plus_can_create_iceberg_rest_s3_connection( "group_id": group.id, "name": "New connection", "description": "", - "type": "iceberg_rest_s3", + "type": "iceberg", "connection_data": { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "https://rest.domain.com", "s3_warehouse_path": "/some/warehouse", "s3_protocol": "http", @@ -73,6 +74,7 @@ async def test_developer_plus_can_create_iceberg_rest_s3_connection( "description": connection.description, "type": connection.type, "connection_data": { + "type": connection.data["type"], "rest_catalog_url": connection.data["rest_catalog_url"], "s3_warehouse_path": connection.data["s3_warehouse_path"], "s3_protocol": connection.data["s3_protocol"], @@ -91,14 +93,80 @@ async def test_developer_plus_can_create_iceberg_rest_s3_connection( } -async def test_developer_plus_can_create_iceberg_rest_s3_connection_with_oauth2_client_credentials( +async def test_developer_plus_can_create_iceberg_rest_s3_delegated_connection( client: AsyncClient, group: MockGroup, session: AsyncSession, settings: Settings, - role_developer_plus: UserTestRoles, ): - user = group.get_member_of_role(role_developer_plus) + user = group.get_member_of_role(UserTestRoles.Developer) + + response = await client.post( + "v1/connections", + headers={"Authorization": f"Bearer {user.token}"}, + json={ + "group_id": group.id, + "name": "New connection", + "description": "", + "type": "iceberg", + "connection_data": { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "https://rest.domain.com", + "s3_warehouse_name": "some-warehouse", + "s3_access_delegation": "vended-credentials", + }, + "auth_data": { + "type": "iceberg_rest_basic", + "rest_catalog_username": "user", + "rest_catalog_password": "secret", + }, + }, + ) + assert response.status_code == 200, response.text + + connection = ( + await session.scalars( + select(Connection).filter_by( + name="New connection", + ), + ) + ).first() + + creds = ( + await session.scalars( + select(AuthData).filter_by( + connection_id=connection.id, + ), + ) + ).one() + + decrypted = decrypt_auth_data(creds.value, settings=settings) + assert response.json() == { + "id": connection.id, + "group_id": connection.group_id, + "name": connection.name, + "description": connection.description, + "type": connection.type, + "connection_data": { + "type": connection.data["type"], + "rest_catalog_url": connection.data["rest_catalog_url"], + "s3_warehouse_name": connection.data["s3_warehouse_name"], + "s3_access_delegation": connection.data["s3_access_delegation"], + }, + "auth_data": { + "type": decrypted["type"], + "rest_catalog_username": decrypted["rest_catalog_username"], + }, + } + + +async def test_developer_plus_can_create_iceberg_rest_s3_direct_connection_with_oauth2_client_credentials( + client: AsyncClient, + group: MockGroup, + session: AsyncSession, + settings: Settings, +): + user = group.get_member_of_role(UserTestRoles.Developer) response = await client.post( "v1/connections", @@ -107,8 +175,9 @@ async def test_developer_plus_can_create_iceberg_rest_s3_connection_with_oauth2_ "group_id": group.id, "name": "New connection", "description": "", - "type": "iceberg_rest_s3", + "type": "iceberg", "connection_data": { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "https://rest.domain.com", "s3_warehouse_path": "/some/warehouse", "s3_protocol": "http", @@ -156,6 +225,7 @@ async def test_developer_plus_can_create_iceberg_rest_s3_connection_with_oauth2_ "description": connection.description, "type": connection.type, "connection_data": { + "type": connection.data["type"], "rest_catalog_url": connection.data["rest_catalog_url"], "s3_warehouse_path": connection.data["s3_warehouse_path"], "s3_protocol": connection.data["s3_protocol"], @@ -176,3 +246,231 @@ async def test_developer_plus_can_create_iceberg_rest_s3_connection_with_oauth2_ "s3_access_key": decrypted["s3_access_key"], }, } + + +async def test_developer_plus_can_create_iceberg_rest_s3_delegated_connection_with_oauth2_client_credentials( + client: AsyncClient, + group: MockGroup, + session: AsyncSession, + settings: Settings, +): + user = group.get_member_of_role(UserTestRoles.Developer) + + response = await client.post( + "v1/connections", + headers={"Authorization": f"Bearer {user.token}"}, + json={ + "group_id": group.id, + "name": "New connection", + "description": "", + "type": "iceberg", + "connection_data": { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "https://rest.domain.com", + "s3_warehouse_name": "some-warehouse", + "s3_access_delegation": "vended-credentials", + }, + "auth_data": { + "type": "iceberg_rest_oauth2_client_credentials", + "rest_catalog_oauth2_client_id": "my_client_id", + "rest_catalog_oauth2_client_secret": "my_client_secret", + "rest_catalog_oauth2_scopes": ["catalog:read"], + "rest_catalog_oauth2_audience": "iceberg-catalog", + "rest_catalog_oauth2_resource": "some_resource", + "rest_catalog_oauth2_token_endpoint": "https://oauth.example.com/token", + }, + }, + ) + assert response.status_code == 200, response.text + + connection = ( + await session.scalars( + select(Connection).filter_by( + name="New connection", + ), + ) + ).first() + + creds = ( + await session.scalars( + select(AuthData).filter_by( + connection_id=connection.id, + ), + ) + ).one() + + decrypted = decrypt_auth_data(creds.value, settings=settings) + assert response.json() == { + "id": connection.id, + "group_id": connection.group_id, + "name": connection.name, + "description": connection.description, + "type": connection.type, + "connection_data": { + "type": connection.data["type"], + "rest_catalog_url": connection.data["rest_catalog_url"], + "s3_warehouse_name": connection.data["s3_warehouse_name"], + "s3_access_delegation": connection.data["s3_access_delegation"], + }, + "auth_data": { + "type": decrypted["type"], + "rest_catalog_oauth2_client_id": decrypted["rest_catalog_oauth2_client_id"], + "rest_catalog_oauth2_scopes": decrypted["rest_catalog_oauth2_scopes"], + "rest_catalog_oauth2_audience": decrypted["rest_catalog_oauth2_audience"], + "rest_catalog_oauth2_resource": decrypted["rest_catalog_oauth2_resource"], + "rest_catalog_oauth2_token_endpoint": decrypted["rest_catalog_oauth2_token_endpoint"], + }, + } + + +@pytest.mark.parametrize( + ("auth_data"), + [ + pytest.param( + { + "type": "iceberg_rest_basic", + "rest_catalog_username": "user", + "rest_catalog_password": "secret", + }, + id="with_basic_auth", + ), + pytest.param( + { + "type": "iceberg_rest_oauth2_client_credentials", + "rest_catalog_oauth2_client_id": "my_client_id", + "rest_catalog_oauth2_client_secret": "my_client_secret", + "rest_catalog_oauth2_scopes": ["catalog:read"], + "rest_catalog_oauth2_audience": "iceberg-catalog", + "rest_catalog_oauth2_resource": "some_resource", + "rest_catalog_oauth2_token_endpoint": "https://oauth.example.com/token", + }, + id="with_oauth2_client_credentials", + ), + ], +) +async def test_developer_plus_can_create_iceberg_rest_s3_direct_connection_without_credentials( + client: AsyncClient, + group: MockGroup, + auth_data: dict, +): + user = group.get_member_of_role(UserTestRoles.Developer) + + body = { + "group_id": group.id, + "name": "New connection", + "description": "", + "type": "iceberg", + "connection_data": { + "type": "iceberg_rest_s3_direct", + "rest_catalog_url": "https://rest.domain.com", + "s3_warehouse_path": "/some/warehouse", + "s3_protocol": "http", + "s3_host": "localhost", + "s3_port": 9010, + "s3_bucket": "some_bucket", + "s3_region": "us-east-1", + "s3_bucket_style": "path", + }, + "auth_data": auth_data, + } + + response = await client.post( + "v1/connections", + headers={"Authorization": f"Bearer {user.token}"}, + json=body, + ) + assert response.status_code == 422, response.text + + assert response.json() == { + "error": { + "code": "invalid_request", + "message": "Invalid request", + "details": [ + { + "location": ["body", "iceberg"], + "message": "Value error, Cannot create direct S3 connection without S3 credentials", + "code": "value_error", + "context": {}, + "input": body, + }, + ], + }, + } + + +@pytest.mark.parametrize( + ("auth_data"), + [ + pytest.param( + { + "type": "iceberg_rest_basic_s3_basic", + "rest_catalog_username": "user", + "rest_catalog_password": "secret", + "s3_access_key": "access_key", + "s3_secret_key": "secret_key", + }, + id="with_basic_auth", + ), + pytest.param( + { + "type": "iceberg_rest_oauth2_client_credentials_s3_basic", + "rest_catalog_oauth2_client_id": "my_client_id", + "rest_catalog_oauth2_client_secret": "my_client_secret", + "rest_catalog_oauth2_scopes": ["catalog:read"], + "rest_catalog_oauth2_audience": "iceberg-catalog", + "rest_catalog_oauth2_token_endpoint": "https://oauth.example.com/token", + "s3_access_key": "access_key", + "s3_secret_key": "secret_key", + }, + id="with_oauth2_client_credentials", + ), + ], +) +async def test_developer_plus_can_create_iceberg_rest_s3_delegated_connection_with_wrong_credentials( + client: AsyncClient, + group: MockGroup, + auth_data: dict, +): + user = group.get_member_of_role(UserTestRoles.Developer) + + body = { + "group_id": group.id, + "name": "New connection", + "description": "", + "type": "iceberg", + "connection_data": { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "https://rest.domain.com", + "s3_warehouse_path": "/some/warehouse", + "s3_protocol": "http", + "s3_host": "localhost", + "s3_port": 9010, + "s3_bucket": "some_bucket", + "s3_region": "us-east-1", + "s3_bucket_style": "path", + }, + "auth_data": auth_data, + } + + response = await client.post( + "v1/connections", + headers={"Authorization": f"Bearer {user.token}"}, + json=body, + ) + assert response.status_code == 422, response.text + + assert response.json() == { + "error": { + "code": "invalid_request", + "message": "Invalid request", + "details": [ + { + "location": ["body", "iceberg"], + "message": "Value error, Cannot create delegated S3 connection with S3 credentials", + "code": "value_error", + "context": {}, + "input": body, + }, + ], + }, + } diff --git a/tests/test_unit/test_connections/test_db_connection/test_update_iceberg_connection.py b/tests/test_unit/test_connections/test_db_connection/test_update_iceberg_connection.py index 480fab35..9d64b5cf 100644 --- a/tests/test_unit/test_connections/test_db_connection/test_update_iceberg_connection.py +++ b/tests/test_unit/test_connections/test_db_connection/test_update_iceberg_connection.py @@ -11,8 +11,9 @@ "connection_type,create_connection_data,create_connection_auth_data", [ ( - "iceberg_rest_s3", + "iceberg", { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "http://domain.com:8000", "s3_warehouse_path": "/some/warehouse", "s3_protocol": "http", @@ -33,7 +34,7 @@ ], indirect=["create_connection_data", "create_connection_auth_data"], ) -async def test_developer_plus_can_update_iceberg_rest_s3_connection( +async def test_developer_plus_can_update_iceberg_rest_s3_direct_connection( client: AsyncClient, group_connection: MockConnection, role_developer_plus: UserTestRoles, @@ -48,6 +49,7 @@ async def test_developer_plus_can_update_iceberg_rest_s3_connection( **connection_json, "type": group_connection.type, "connection_data": { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "http://rest.domain.com:8000", "s3_warehouse_path": "/some/new/warehouse", "s3_protocol": "https", @@ -73,6 +75,7 @@ async def test_developer_plus_can_update_iceberg_rest_s3_connection( "group_id": group_connection.group_id, "type": group_connection.type, "connection_data": { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "http://rest.domain.com:8000", "s3_warehouse_path": "/some/new/warehouse", "s3_protocol": "https", @@ -95,8 +98,77 @@ async def test_developer_plus_can_update_iceberg_rest_s3_connection( "connection_type,create_connection_data,create_connection_auth_data", [ ( - "iceberg_rest_s3", + "iceberg", { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "http://domain.com:8000", + "s3_warehouse_path": "some-warehouse", + "s3_access_delegation": "vended-credentials", + }, + { + "type": "iceberg_rest_basic", + "rest_catalog_username": "user", + "rest_catalog_password": "secret", + }, + ), + ], + indirect=["create_connection_data", "create_connection_auth_data"], +) +async def test_developer_plus_can_update_iceberg_rest_s3_delegated_connection( + client: AsyncClient, + group_connection: MockConnection, + role_developer_plus: UserTestRoles, +): + user = group_connection.owner_group.get_member_of_role(role_developer_plus) + connection_json = await fetch_connection_json(client, user.token, group_connection) + + response = await client.put( + f"v1/connections/{group_connection.id}", + headers={"Authorization": f"Bearer {user.token}"}, + json={ + **connection_json, + "type": group_connection.type, + "connection_data": { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "http://rest.domain.com:8000", + "s3_warehouse_name": "some-new-warehouse", + "s3_access_delegation": "remote-signing", + }, + "auth_data": { + "type": "iceberg_rest_basic", + "rest_catalog_username": "new_user", + "rest_catalog_password": "new_password", + }, + }, + ) + + assert response.status_code == 200, response.json() + assert response.json() == { + "id": group_connection.id, + "name": group_connection.name, + "description": group_connection.description, + "group_id": group_connection.group_id, + "type": group_connection.type, + "connection_data": { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "http://rest.domain.com:8000", + "s3_warehouse_name": "some-new-warehouse", + "s3_access_delegation": "remote-signing", + }, + "auth_data": { + "type": "iceberg_rest_basic", + "rest_catalog_username": "new_user", + }, + } + + +@pytest.mark.parametrize( + "connection_type,create_connection_data,create_connection_auth_data", + [ + ( + "iceberg", + { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "http://domain.com:8000", "s3_warehouse_path": "/some/warehouse", "s3_protocol": "http", @@ -121,7 +193,7 @@ async def test_developer_plus_can_update_iceberg_rest_s3_connection( ], indirect=["create_connection_data", "create_connection_auth_data"], ) -async def test_developer_plus_can_update_iceberg_rest_s3_connection_with_oauth2_client_credentials( +async def test_developer_plus_can_update_iceberg_rest_s3_direct_connection_with_oauth2_client_credentials( client: AsyncClient, group_connection: MockConnection, role_developer_plus: UserTestRoles, @@ -136,6 +208,7 @@ async def test_developer_plus_can_update_iceberg_rest_s3_connection_with_oauth2_ **connection_json, "type": group_connection.type, "connection_data": { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "http://rest.domain.com:8000", "s3_warehouse_path": "/some/new/warehouse", "s3_protocol": "https", @@ -166,6 +239,7 @@ async def test_developer_plus_can_update_iceberg_rest_s3_connection_with_oauth2_ "group_id": group_connection.group_id, "type": group_connection.type, "connection_data": { + "type": "iceberg_rest_s3_direct", "rest_catalog_url": "http://rest.domain.com:8000", "s3_warehouse_path": "/some/new/warehouse", "s3_protocol": "https", @@ -177,7 +251,7 @@ async def test_developer_plus_can_update_iceberg_rest_s3_connection_with_oauth2_ "s3_additional_params": {}, }, "auth_data": { - "type": group_connection.credentials.value["type"], + "type": "iceberg_rest_oauth2_client_credentials_s3_basic", "rest_catalog_oauth2_client_id": "my_new_client_id", "rest_catalog_oauth2_scopes": ["catalog:write"], "rest_catalog_oauth2_audience": "iceberg-new-catalog", @@ -186,3 +260,83 @@ async def test_developer_plus_can_update_iceberg_rest_s3_connection_with_oauth2_ "s3_access_key": "new_access_key", }, } + + +@pytest.mark.parametrize( + "connection_type,create_connection_data,create_connection_auth_data", + [ + ( + "iceberg", + { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "http://domain.com:8000", + "s3_warehouse_path": "some-warehouse", + "s3_access_delegation": "vended-credentials", + }, + { + "type": "iceberg_rest_oauth2_client_credentials", + "rest_catalog_oauth2_client_id": "my_client_id", + "rest_catalog_oauth2_client_secret": "my_new_client_secret", + "rest_catalog_oauth2_scopes": ["catalog:read"], + "rest_catalog_oauth2_audience": "iceberg-catalog", + "rest_catalog_oauth2_resource": "some-old-resource", + "rest_catalog_oauth2_token_endpoint": "https://oauth.example.com/token", + }, + ), + ], + indirect=["create_connection_data", "create_connection_auth_data"], +) +async def test_developer_plus_can_update_iceberg_rest_s3_delegated_connection_with_oauth2_client_credentials( + client: AsyncClient, + group_connection: MockConnection, + role_developer_plus: UserTestRoles, +): + user = group_connection.owner_group.get_member_of_role(role_developer_plus) + connection_json = await fetch_connection_json(client, user.token, group_connection) + + response = await client.put( + f"v1/connections/{group_connection.id}", + headers={"Authorization": f"Bearer {user.token}"}, + json={ + **connection_json, + "type": group_connection.type, + "connection_data": { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "http://rest.domain.com:8000", + "s3_warehouse_name": "some-new-warehouse", + "s3_access_delegation": "remote-signing", + }, + "auth_data": { + "type": "iceberg_rest_oauth2_client_credentials", + "rest_catalog_oauth2_client_id": "my_new_client_id", + "rest_catalog_oauth2_client_secret": "my_new_client_secret", + "rest_catalog_oauth2_scopes": ["catalog:write"], + "rest_catalog_oauth2_audience": "iceberg-new-catalog", + "rest_catalog_oauth2_resource": "iceberg-new-resource", + "rest_catalog_oauth2_token_endpoint": "https://oauth.new.example.com/token", + }, + }, + ) + + assert response.status_code == 200, response.json() + assert response.json() == { + "id": group_connection.id, + "name": group_connection.name, + "description": group_connection.description, + "group_id": group_connection.group_id, + "type": group_connection.type, + "connection_data": { + "type": "iceberg_rest_s3_delegated", + "rest_catalog_url": "http://rest.domain.com:8000", + "s3_warehouse_name": "some-new-warehouse", + "s3_access_delegation": "remote-signing", + }, + "auth_data": { + "type": "iceberg_rest_oauth2_client_credentials", + "rest_catalog_oauth2_client_id": "my_new_client_id", + "rest_catalog_oauth2_scopes": ["catalog:write"], + "rest_catalog_oauth2_audience": "iceberg-new-catalog", + "rest_catalog_oauth2_resource": "iceberg-new-resource", + "rest_catalog_oauth2_token_endpoint": "https://oauth.new.example.com/token", + }, + } diff --git a/tests/test_unit/test_connections/test_read_connections.py b/tests/test_unit/test_connections/test_read_connections.py index d9c50dc8..60dfcd6e 100644 --- a/tests/test_unit/test_connections/test_read_connections.py +++ b/tests/test_unit/test_connections/test_read_connections.py @@ -298,7 +298,7 @@ async def test_search_connections_with_nonexistent_query( "type": [ "postgres", "hive", - "iceberg_rest_s3", + "iceberg", "oracle", "clickhouse", "mssql", diff --git a/tests/test_unit/test_transfers/test_create_transfer.py b/tests/test_unit/test_transfers/test_create_transfer.py index b29b9147..380eadae 100644 --- a/tests/test_unit/test_transfers/test_create_transfer.py +++ b/tests/test_unit/test_transfers/test_create_transfer.py @@ -500,14 +500,14 @@ async def test_superuser_can_create_transfer( "message": ( "Input tag 'new some connection type' found using 'type' " "does not match any of the expected tags: 'clickhouse', 'hive', " - "'iceberg_rest_s3', 'mssql', 'mysql', 'oracle', 'postgres', " + "'iceberg', 'mssql', 'mysql', 'oracle', 'postgres', " "'hdfs', 's3', 'sftp', 'ftp', 'ftps', 'webdav', 'samba'" ), "code": "union_tag_invalid", "context": { "discriminator": "'type'", "expected_tags": ( - "'clickhouse', 'hive', 'iceberg_rest_s3', 'mssql', 'mysql', 'oracle', 'postgres', " + "'clickhouse', 'hive', 'iceberg', 'mssql', 'mysql', 'oracle', 'postgres', " "'hdfs', 's3', 'sftp', 'ftp', 'ftps', 'webdav', 'samba'" ), "tag": "new some connection type", diff --git a/tests/test_unit/utils.py b/tests/test_unit/utils.py index afd59856..041fd30f 100644 --- a/tests/test_unit/utils.py +++ b/tests/test_unit/utils.py @@ -268,22 +268,24 @@ async def fetch_connection_json(client: AsyncClient, user_token: str, mock_conne f"v1/connections/{mock_connection.id}", headers={"Authorization": f"Bearer {user_token}"}, ) + assert connection.status_code == 200, connection.text connection_json = connection.json() auth_data = connection_json["auth_data"] - if auth_data["type"] in ("basic", "samba"): - auth_data["password"] = mock_connection.credentials.value["password"] - elif auth_data["type"] == "s3": - auth_data["secret_key"] = mock_connection.credentials.value["secret_key"] - elif auth_data["type"] == "iceberg_rest_basic_s3_basic": - auth_data["rest_catalog_password"] = mock_connection.credentials.value["rest_catalog_password"] - auth_data["s3_secret_key"] = mock_connection.credentials.value["s3_secret_key"] - elif auth_data["type"] == "iceberg_rest_oauth2_client_credentials_s3_basic": - auth_data["rest_catalog_oauth2_client_secret"] = mock_connection.credentials.value[ - "rest_catalog_oauth2_client_secret" - ] - auth_data["s3_secret_key"] = mock_connection.credentials.value["s3_secret_key"] - + auth_data_secret_fields = { + "basic": ["password"], + "samba": ["password"], + "s3": ["secret_key"], + "iceberg_rest_basic": ["rest_catalog_password"], + "iceberg_rest_basic_s3_basic": ["rest_catalog_password", "s3_secret_key"], + "iceberg_rest_oauth2_client_credentials": ["rest_catalog_oauth2_client_secret"], + "iceberg_rest_oauth2_client_credentials_s3_basic": [ + "rest_catalog_oauth2_client_secret", + "s3_secret_key", + ], + } + for field in auth_data_secret_fields[auth_data["type"]]: + auth_data[field] = mock_connection.credentials.value[field] return connection_json diff --git a/tests/utils.py b/tests/utils.py index 00cd5987..fbdee997 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -140,11 +140,17 @@ def verify_transfer_auth_data(run_data: dict[str, Any], source_auth: str, target if auth_type == "s3": assert auth_data["access_key"] assert "secret_key" not in auth_data + elif auth_type == "iceberg_rest_basic": + assert auth_data["rest_catalog_username"] + assert "rest_catalog_password" not in auth_data elif auth_type == "iceberg_rest_basic_s3_basic": assert auth_data["s3_access_key"] assert auth_data["rest_catalog_username"] assert "s3_secret_key" not in auth_data assert "rest_catalog_password" not in auth_data + elif auth_type == "iceberg_rest_oauth2_client_credentials": + assert auth_data["rest_catalog_oauth2_client_id"] + assert "rest_catalog_oauth2_client_secret" not in auth_data elif auth_type == "iceberg_rest_oauth2_client_credentials_s3_basic": assert auth_data["s3_access_key"] assert auth_data["rest_catalog_oauth2_client_id"]