Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/297.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for fetching S3 params & credentials from Iceberg REST Catalog (delegation).
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ max-annotation-complexity = 4
max-returns = 5
max-awaits = 5
max-local-variables = 20
max-name-length = 60
max-name-length = 65
# Max of expressions in a function
max-expressions = 15
# Max args in a function
Expand Down
4 changes: 2 additions & 2 deletions syncmaster/db/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
class ConnectionType(StrEnum):
POSTGRES = "postgres"
HIVE = "hive"
ICEBERG_REST_S3 = "iceberg_rest_s3"
ICEBERG = "iceberg"
ORACLE = "oracle"
CLICKHOUSE = "clickhouse"
MSSQL = "mssql"
Expand Down Expand Up @@ -62,7 +62,7 @@ class Connection(Base, ResourceMixin, TimestampMixin):
'simple',
translate(coalesce(data->>'host', ''), './-_:\\', ' ')
)
""",
""", # noqa: WPS342
persisted=True,
),
nullable=False,
Expand Down
65 changes: 55 additions & 10 deletions syncmaster/dto/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,14 @@ class HiveConnectionDTO(ConnectionDTO):


@dataclass
class IcebergRESTCatalogS3ConnectionBaseDTO(ConnectionDTO):
class IcebergConnectionBaseDTO(ConnectionDTO):
rest_catalog_url: str
flavor: ClassVar[str]
type: ClassVar[str] = "iceberg"


@dataclass
class IcebergRESTCatalogS3DirectConnectionBaseDTO(IcebergConnectionBaseDTO):
s3_warehouse_path: str
s3_host: str
s3_bucket: str
Expand All @@ -86,18 +92,18 @@ class IcebergRESTCatalogS3ConnectionBaseDTO(ConnectionDTO):
s3_port: int | None
s3_protocol: str
s3_additional_params: dict
type: ClassVar[str] = "iceberg_rest_s3"
implementation: ClassVar[str] = "iceberg_rest_s3_direct"


@dataclass(kw_only=True)
class IcebergRESTCatalogBasicAuthS3DTO(IcebergRESTCatalogS3ConnectionBaseDTO):
class IcebergRESTCatalogBasicAuthS3BasicDTO(IcebergRESTCatalogS3DirectConnectionBaseDTO):
rest_catalog_username: str
rest_catalog_password: str
rest_catalog_auth_type: Literal["basic"] = "basic"


@dataclass(kw_only=True)
class IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(IcebergRESTCatalogS3ConnectionBaseDTO):
class IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO(IcebergRESTCatalogS3DirectConnectionBaseDTO):
rest_catalog_oauth2_client_id: str
rest_catalog_oauth2_client_secret: str
rest_catalog_oauth2_scopes: list[str]
Expand All @@ -107,13 +113,52 @@ class IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(IcebergRESTCatalogS3Connect
rest_catalog_auth_type: Literal["oauth2"] = "oauth2"


# TODO: should be refactored
class IcebergRESTCatalogS3ConnectionDTO:
def __new__(cls, **data):
if "rest_catalog_oauth2_client_id" in data:
return IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(**data)
@dataclass
class IcebergRESTCatalogS3DelegatedConnectionBaseDTO(IcebergConnectionBaseDTO):
s3_warehouse_name: str | None = None
s3_access_delegation: Literal["vended-credentials", "remote-signing"] = "vended-credentials"
implementation: ClassVar[str] = "iceberg_rest_s3_delegated"


@dataclass(kw_only=True)
class IcebergRESTCatalogBasicAuthS3DelegatedDTO(IcebergRESTCatalogS3DelegatedConnectionBaseDTO):
rest_catalog_username: str
rest_catalog_password: str
rest_catalog_auth_type: Literal["basic"] = "basic"


return IcebergRESTCatalogBasicAuthS3DTO(**data)
@dataclass(kw_only=True)
class IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO(IcebergRESTCatalogS3DelegatedConnectionBaseDTO):
rest_catalog_oauth2_client_id: str
rest_catalog_oauth2_client_secret: str
rest_catalog_oauth2_scopes: list[str]
rest_catalog_oauth2_resource: str | None = None
rest_catalog_oauth2_audience: str | None = None
rest_catalog_oauth2_token_endpoint: str | None = None
rest_catalog_auth_type: Literal["oauth2"] = "oauth2"


# TODO: should be refactored
def get_iceberg_rest_catalog_s3_direct_connection_dto(
**data,
) -> IcebergRESTCatalogBasicAuthS3BasicDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO:
if "rest_catalog_oauth2_client_id" in data:
return IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO(**data)
return IcebergRESTCatalogBasicAuthS3BasicDTO(**data)


def get_iceberg_rest_catalog_s3_delegated_connection_dto(
**data,
) -> IcebergRESTCatalogBasicAuthS3DelegatedDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO:
if "rest_catalog_oauth2_client_id" in data:
return IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO(**data)
return IcebergRESTCatalogBasicAuthS3DelegatedDTO(**data)


def get_iceberg_connection_dto(**data) -> IcebergConnectionBaseDTO:
if "s3_warehouse_path" in data:
return get_iceberg_rest_catalog_s3_direct_connection_dto(**data)
return get_iceberg_rest_catalog_s3_delegated_connection_dto(**data)


@dataclass
Expand Down
6 changes: 3 additions & 3 deletions syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def __post_init__(self):


@dataclass
class IcebergRESTCatalogS3TransferDTO(DBTransferDTO):
type: ClassVar[str] = "iceberg_rest_s3"
catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_{uuid4().hex[:8]}") # noqa: WPS237
class IcebergTransferDTO(DBTransferDTO):
type: ClassVar[str] = "iceberg"
catalog_name: str = field(default_factory=lambda: f"iceberg_{uuid4().hex[:8]}") # noqa: WPS237

def __post_init__(self):
super().__post_init__()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,28 @@

from pydantic import Field

from syncmaster.schemas.v1.auth.iceberg.basic import (
from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.basic import (
CreateIcebergRESTCatalogBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
UpdateIcebergRESTCatalogBasicAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg.oauth2_client_credentials import (
from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.oauth2_client_credentials import (
CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
)

CreateIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
CreateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
CreateIcebergRESTCatalogBasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]

ReadIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
ReadIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
ReadIcebergRESTCatalogBasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]

UpdateIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
UpdateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
UpdateIcebergRESTCatalogBasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@


class ReadIcebergRESTCatalogBasicAuthSchema(BaseModel):
type: Literal["iceberg_rest_basic_s3_basic"] = Field(description="Auth type")
type: Literal["iceberg_rest_basic"] = Field(description="Auth type")
rest_catalog_username: str
s3_access_key: str


class CreateIcebergRESTCatalogBasicAuthSchema(ReadIcebergRESTCatalogBasicAuthSchema):
rest_catalog_password: SecretStr
s3_secret_key: SecretStr


class UpdateIcebergRESTCatalogBasicAuthSchema(ReadIcebergRESTCatalogBasicAuthSchema):
rest_catalog_password: SecretStr | None = None
s3_secret_key: SecretStr | None = None

def get_secret_fields(self) -> tuple[str, ...]:
return ("rest_catalog_password", "s3_secret_key")
return ("rest_catalog_password",)
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,24 @@


class ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(BaseModel):
type: Literal["iceberg_rest_oauth2_client_credentials_s3_basic"] = Field(description="Auth type")
type: Literal["iceberg_rest_oauth2_client_credentials"] = Field(description="Auth type")
rest_catalog_oauth2_client_id: str
rest_catalog_oauth2_scopes: list[str] = Field(default_factory=list)
rest_catalog_oauth2_resource: str | None = None
rest_catalog_oauth2_audience: str | None = None
rest_catalog_oauth2_token_endpoint: URL | None = None
s3_access_key: str


class CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
):
rest_catalog_oauth2_client_secret: SecretStr
s3_secret_key: SecretStr


class UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
):
rest_catalog_oauth2_client_secret: SecretStr | None = None
s3_secret_key: SecretStr | None = None

def get_secret_fields(self) -> tuple[str, ...]:
return ("rest_catalog_oauth2_client_secret", "s3_secret_key")
return ("rest_catalog_oauth2_client_secret",)
31 changes: 31 additions & 0 deletions syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Annotated

from pydantic import Field

from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.basic import (
CreateIcebergRESTCatalogBasicS3BasicAuthSchema,
ReadIcebergRESTCatalogBasicS3BasicAuthSchema,
UpdateIcebergRESTCatalogBasicS3BasicAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.oauth2_client_credentials import (
CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
)

CreateIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
CreateIcebergRESTCatalogBasicS3BasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
Field(discriminator="type"),
]

ReadIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
ReadIcebergRESTCatalogBasicS3BasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
Field(discriminator="type"),
]

UpdateIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
UpdateIcebergRESTCatalogBasicS3BasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
Field(discriminator="type"),
]
24 changes: 24 additions & 0 deletions syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Literal

from pydantic import BaseModel, Field, SecretStr


class ReadIcebergRESTCatalogBasicS3BasicAuthSchema(BaseModel):
type: Literal["iceberg_rest_basic_s3_basic"] = Field(description="Auth type")
rest_catalog_username: str
s3_access_key: str


class CreateIcebergRESTCatalogBasicS3BasicAuthSchema(ReadIcebergRESTCatalogBasicS3BasicAuthSchema):
rest_catalog_password: SecretStr
s3_secret_key: SecretStr


class UpdateIcebergRESTCatalogBasicS3BasicAuthSchema(ReadIcebergRESTCatalogBasicS3BasicAuthSchema):
rest_catalog_password: SecretStr | None = None
s3_secret_key: SecretStr | None = None

def get_secret_fields(self) -> tuple[str, ...]:
return ("rest_catalog_password", "s3_secret_key")
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Literal

from pydantic import BaseModel, Field, SecretStr


class ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema(BaseModel):
type: Literal["iceberg_rest_oauth2_client_credentials_s3_basic"] = Field(description="Auth type")
rest_catalog_oauth2_client_id: str
rest_catalog_oauth2_scopes: list[str] = Field(default_factory=list)
rest_catalog_oauth2_resource: str | None = None
rest_catalog_oauth2_audience: str | None = None
rest_catalog_oauth2_token_endpoint: str | None = None
s3_access_key: str


class CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema(
ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
):
rest_catalog_oauth2_client_secret: SecretStr
s3_secret_key: SecretStr


class UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema(
ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
):
rest_catalog_oauth2_client_secret: SecretStr | None = None
s3_secret_key: SecretStr | None = None

def get_secret_fields(self) -> tuple[str, ...]:
return ("rest_catalog_oauth2_client_secret", "s3_secret_key")
6 changes: 3 additions & 3 deletions syncmaster/schemas/v1/connection_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Literal

HIVE_TYPE = Literal["hive"]
ICEBERG_REST_S3_TYPE = Literal["iceberg_rest_s3"]
ICEBERG_TYPE = Literal["iceberg"]
ORACLE_TYPE = Literal["oracle"]
POSTGRES_TYPE = Literal["postgres"]
CLICKHOUSE_TYPE = Literal["clickhouse"]
Expand All @@ -20,7 +20,7 @@
CONNECTION_TYPES = [
"clickhouse",
"hive",
"iceberg_rest_s3",
"iceberg",
"mssql",
"mysql",
"oracle",
Expand All @@ -45,7 +45,7 @@
DB_CONNECTION_TYPES = [
"clickhouse",
"hive",
"iceberg_rest_s3",
"iceberg",
"mssql",
"mysql",
"oracle",
Expand Down
10 changes: 7 additions & 3 deletions syncmaster/schemas/v1/connections/connection_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
ReadS3AuthSchema,
ReadSambaAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg.basic import (
from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated import (
ReadIcebergRESTCatalogBasicAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg.oauth2_client_credentials import (
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct import (
ReadIcebergRESTCatalogBasicS3BasicAuthSchema,
ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
)
from syncmaster.schemas.v1.types import NameConstr

ReadConnectionAuthDataSchema = (
ReadBasicAuthSchema
| ReadS3AuthSchema
| ReadSambaAuthSchema
| ReadIcebergRESTCatalogBasicS3BasicAuthSchema
| ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema
| ReadIcebergRESTCatalogBasicAuthSchema
| ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema
)
Expand Down
Loading
Loading