Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .env.docker.test
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ TEST_HIVE_PASSWORD=123UsedForTestOnly@!

TEST_ICEBERG_REST_CATALOG_URL_FOR_CONFTEST=http://test-iceberg-rest:8181
TEST_ICEBERG_REST_CATALOG_URL_FOR_WORKER=http://test-iceberg-rest:8181
TEST_ICEBERG_REST_CATALOG_USERNAME=syncmaster
TEST_ICEBERG_REST_CATALOG_PASSWORD=123UsedForTestOnly@!
TEST_ICEBERG_S3_WAREHOUSE_PATH=/data
TEST_ICEBERG_S3_REGION=us-east-1
TEST_ICEBERG_S3_BUCKET_STYLE=path
Expand Down
2 changes: 0 additions & 2 deletions .env.local.test
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ export TEST_HIVE_PASSWORD=123UsedForTestOnly@!

export TEST_ICEBERG_REST_CATALOG_URL_FOR_CONFTEST=http://localhost:8181
export TEST_ICEBERG_REST_CATALOG_URL_FOR_WORKER=http://test-iceberg-rest:8181
export TEST_ICEBERG_REST_CATALOG_USERNAME=syncmaster
export TEST_ICEBERG_REST_CATALOG_PASSWORD=123UsedForTestOnly@!
export TEST_ICEBERG_S3_WAREHOUSE_PATH=/data
export TEST_ICEBERG_S3_REGION=us-east-1
export TEST_ICEBERG_S3_BUCKET_STYLE=path
Expand Down
22 changes: 10 additions & 12 deletions syncmaster/dto/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ class IcebergRESTCatalogS3DirectConnectionBaseDTO(IcebergConnectionBaseDTO):


@dataclass(kw_only=True)
class IcebergRESTCatalogBasicAuthS3BasicDTO(IcebergRESTCatalogS3DirectConnectionBaseDTO):
rest_catalog_username: str
rest_catalog_password: str
rest_catalog_auth_type: Literal["basic"] = "basic"
class IcebergRESTCatalogBearerAuthS3BasicDTO(IcebergRESTCatalogS3DirectConnectionBaseDTO):
rest_catalog_token: str
rest_catalog_auth_type: Literal["bearer"] = "bearer"


@dataclass(kw_only=True)
Expand All @@ -121,10 +120,9 @@ class IcebergRESTCatalogS3DelegatedConnectionBaseDTO(IcebergConnectionBaseDTO):


@dataclass(kw_only=True)
class IcebergRESTCatalogBasicAuthS3DelegatedDTO(IcebergRESTCatalogS3DelegatedConnectionBaseDTO):
rest_catalog_username: str
rest_catalog_password: str
rest_catalog_auth_type: Literal["basic"] = "basic"
class IcebergRESTCatalogBearerAuthS3DelegatedDTO(IcebergRESTCatalogS3DelegatedConnectionBaseDTO):
rest_catalog_token: str
rest_catalog_auth_type: Literal["bearer"] = "bearer"


@dataclass(kw_only=True)
Expand All @@ -141,18 +139,18 @@ class IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO(IcebergRESTCatalog
# TODO: should be refactored
def get_iceberg_rest_catalog_s3_direct_connection_dto(
**data,
) -> IcebergRESTCatalogBasicAuthS3BasicDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO:
) -> IcebergRESTCatalogBearerAuthS3BasicDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO:
if "rest_catalog_oauth2_client_id" in data:
return IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO(**data)
return IcebergRESTCatalogBasicAuthS3BasicDTO(**data)
return IcebergRESTCatalogBearerAuthS3BasicDTO(**data)


def get_iceberg_rest_catalog_s3_delegated_connection_dto(
**data,
) -> IcebergRESTCatalogBasicAuthS3DelegatedDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO:
) -> IcebergRESTCatalogBearerAuthS3DelegatedDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO:
if "rest_catalog_oauth2_client_id" in data:
return IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO(**data)
return IcebergRESTCatalogBasicAuthS3DelegatedDTO(**data)
return IcebergRESTCatalogBearerAuthS3DelegatedDTO(**data)


def get_iceberg_connection_dto(**data) -> IcebergConnectionBaseDTO:
Expand Down
14 changes: 7 additions & 7 deletions syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

from pydantic import Field

from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.basic import (
CreateIcebergRESTCatalogBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
UpdateIcebergRESTCatalogBasicAuthSchema,
from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.bearer import (
CreateIcebergRESTCatalogBearerAuthSchema,
ReadIcebergRESTCatalogBearerAuthSchema,
UpdateIcebergRESTCatalogBearerAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.oauth2_client_credentials import (
CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Expand All @@ -16,16 +16,16 @@
)

CreateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
CreateIcebergRESTCatalogBasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
CreateIcebergRESTCatalogBearerAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]

ReadIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
ReadIcebergRESTCatalogBasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
ReadIcebergRESTCatalogBearerAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]

UpdateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
UpdateIcebergRESTCatalogBasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
UpdateIcebergRESTCatalogBearerAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]
21 changes: 0 additions & 21 deletions syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/basic.py

This file was deleted.

20 changes: 20 additions & 0 deletions syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/bearer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Literal

from pydantic import BaseModel, Field, SecretStr


class ReadIcebergRESTCatalogBearerAuthSchema(BaseModel):
type: Literal["iceberg_rest_bearer"] = Field(description="Auth type")


class CreateIcebergRESTCatalogBearerAuthSchema(ReadIcebergRESTCatalogBearerAuthSchema):
rest_catalog_token: SecretStr


class UpdateIcebergRESTCatalogBearerAuthSchema(ReadIcebergRESTCatalogBearerAuthSchema):
rest_catalog_token: SecretStr | None = None

def get_secret_fields(self) -> tuple[str, ...]:
return ("rest_catalog_token",)
14 changes: 7 additions & 7 deletions syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

from pydantic import Field

from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.basic import (
CreateIcebergRESTCatalogBasicS3BasicAuthSchema,
ReadIcebergRESTCatalogBasicS3BasicAuthSchema,
UpdateIcebergRESTCatalogBasicS3BasicAuthSchema,
from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.bearer import (
CreateIcebergRESTCatalogBearerS3BasicAuthSchema,
ReadIcebergRESTCatalogBearerS3BasicAuthSchema,
UpdateIcebergRESTCatalogBearerS3BasicAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.oauth2_client_credentials import (
CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
Expand All @@ -16,16 +16,16 @@
)

CreateIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
CreateIcebergRESTCatalogBasicS3BasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
CreateIcebergRESTCatalogBearerS3BasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
Field(discriminator="type"),
]

ReadIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
ReadIcebergRESTCatalogBasicS3BasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
ReadIcebergRESTCatalogBearerS3BasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
Field(discriminator="type"),
]

UpdateIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
UpdateIcebergRESTCatalogBasicS3BasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
UpdateIcebergRESTCatalogBearerS3BasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
Field(discriminator="type"),
]
24 changes: 0 additions & 24 deletions syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/basic.py

This file was deleted.

23 changes: 23 additions & 0 deletions syncmaster/schemas/v1/auth/iceberg_rest_s3_direct/bearer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Literal

from pydantic import BaseModel, Field, SecretStr


class ReadIcebergRESTCatalogBearerS3BasicAuthSchema(BaseModel):
type: Literal["iceberg_rest_bearer_s3_basic"] = Field(description="Auth type")
s3_access_key: str


class CreateIcebergRESTCatalogBearerS3BasicAuthSchema(ReadIcebergRESTCatalogBearerS3BasicAuthSchema):
rest_catalog_token: SecretStr
s3_secret_key: SecretStr


class UpdateIcebergRESTCatalogBearerS3BasicAuthSchema(ReadIcebergRESTCatalogBearerS3BasicAuthSchema):
rest_catalog_token: SecretStr | None = None
s3_secret_key: SecretStr | None = None

def get_secret_fields(self) -> tuple[str, ...]:
return ("rest_catalog_token", "s3_secret_key")
8 changes: 4 additions & 4 deletions syncmaster/schemas/v1/connections/connection_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
ReadSambaAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated import (
ReadIcebergRESTCatalogBasicAuthSchema,
ReadIcebergRESTCatalogBearerAuthSchema,
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct import (
ReadIcebergRESTCatalogBasicS3BasicAuthSchema,
ReadIcebergRESTCatalogBearerS3BasicAuthSchema,
ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
)
from syncmaster.schemas.v1.types import NameConstr
Expand All @@ -21,9 +21,9 @@
ReadBasicAuthSchema
| ReadS3AuthSchema
| ReadSambaAuthSchema
| ReadIcebergRESTCatalogBasicS3BasicAuthSchema
| ReadIcebergRESTCatalogBearerS3BasicAuthSchema
| ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema
| ReadIcebergRESTCatalogBasicAuthSchema
| ReadIcebergRESTCatalogBearerAuthSchema
| ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema
)

Expand Down
11 changes: 5 additions & 6 deletions syncmaster/worker/handlers/db/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import (
IcebergRESTCatalogBasicAuthS3BasicDTO,
IcebergRESTCatalogBasicAuthS3DelegatedDTO,
IcebergRESTCatalogBearerAuthS3BasicDTO,
IcebergRESTCatalogBearerAuthS3DelegatedDTO,
IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO,
IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO,
IcebergRESTCatalogS3DelegatedConnectionBaseDTO,
Expand Down Expand Up @@ -126,10 +126,9 @@ def _make_auth(self):
)
if isinstance(
self.connection_dto,
IcebergRESTCatalogBasicAuthS3DelegatedDTO | IcebergRESTCatalogBasicAuthS3BasicDTO,
IcebergRESTCatalogBearerAuthS3DelegatedDTO | IcebergRESTCatalogBearerAuthS3BasicDTO,
):
return Iceberg.RESTCatalog.BasicAuth(
user=self.connection_dto.rest_catalog_username,
password=self.connection_dto.rest_catalog_password,
return Iceberg.RESTCatalog.BearerAuth(
access_token=self.connection_dto.rest_catalog_token,
)
return None
2 changes: 0 additions & 2 deletions tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class TestSettings(BaseSettings):

TEST_ICEBERG_REST_CATALOG_URL_FOR_CONFTEST: str
TEST_ICEBERG_REST_CATALOG_URL_FOR_WORKER: str
TEST_ICEBERG_REST_CATALOG_USERNAME: str
TEST_ICEBERG_REST_CATALOG_PASSWORD: str
TEST_ICEBERG_S3_WAREHOUSE_PATH: str
TEST_ICEBERG_S3_REGION: str
TEST_ICEBERG_S3_BUCKET_STYLE: Literal["domain", "path"] = "path"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlalchemy.ext.asyncio import AsyncSession

from syncmaster.db.models import Group
from syncmaster.dto.connections import IcebergRESTCatalogBasicAuthS3BasicDTO
from syncmaster.dto.connections import IcebergRESTCatalogBearerAuthS3BasicDTO
from syncmaster.server.settings import ServerAppSettings as Settings
from tests.settings import TestSettings
from tests.test_unit.utils import create_connection, create_credentials
Expand All @@ -21,8 +21,8 @@
scope="session",
params=[pytest.param("iceberg", marks=[pytest.mark.iceberg])],
)
def iceberg_rest_s3_for_conftest(test_settings: TestSettings) -> IcebergRESTCatalogBasicAuthS3BasicDTO:
return IcebergRESTCatalogBasicAuthS3BasicDTO(
def iceberg_rest_s3_for_conftest(test_settings: TestSettings) -> IcebergRESTCatalogBearerAuthS3BasicDTO:
return IcebergRESTCatalogBearerAuthS3BasicDTO(
rest_catalog_url=test_settings.TEST_ICEBERG_REST_CATALOG_URL_FOR_CONFTEST,
s3_warehouse_path=test_settings.TEST_ICEBERG_S3_WAREHOUSE_PATH,
s3_host=test_settings.TEST_S3_HOST_FOR_CONFTEST,
Expand All @@ -34,17 +34,16 @@ def iceberg_rest_s3_for_conftest(test_settings: TestSettings) -> IcebergRESTCata
s3_access_key=test_settings.TEST_S3_ACCESS_KEY,
s3_secret_key=test_settings.TEST_S3_SECRET_KEY,
s3_additional_params=test_settings.TEST_ICEBERG_S3_ADDITIONAL_PARAMS,
rest_catalog_username=test_settings.TEST_ICEBERG_REST_CATALOG_USERNAME,
rest_catalog_password=test_settings.TEST_ICEBERG_REST_CATALOG_PASSWORD,
rest_catalog_token="",
)


@pytest.fixture(
scope="session",
params=[pytest.param("iceberg", marks=[pytest.mark.iceberg])],
)
def iceberg_rest_s3_for_worker(test_settings: TestSettings) -> IcebergRESTCatalogBasicAuthS3BasicDTO:
return IcebergRESTCatalogBasicAuthS3BasicDTO(
def iceberg_rest_s3_for_worker(test_settings: TestSettings) -> IcebergRESTCatalogBearerAuthS3BasicDTO:
return IcebergRESTCatalogBearerAuthS3BasicDTO(
rest_catalog_url=test_settings.TEST_ICEBERG_REST_CATALOG_URL_FOR_WORKER,
s3_warehouse_path=test_settings.TEST_ICEBERG_S3_WAREHOUSE_PATH,
s3_host=test_settings.TEST_S3_HOST_FOR_WORKER,
Expand All @@ -56,15 +55,14 @@ def iceberg_rest_s3_for_worker(test_settings: TestSettings) -> IcebergRESTCatalo
s3_access_key=test_settings.TEST_S3_ACCESS_KEY,
s3_secret_key=test_settings.TEST_S3_SECRET_KEY,
s3_additional_params=test_settings.TEST_ICEBERG_S3_ADDITIONAL_PARAMS,
rest_catalog_username=test_settings.TEST_ICEBERG_REST_CATALOG_USERNAME,
rest_catalog_password=test_settings.TEST_ICEBERG_REST_CATALOG_PASSWORD,
rest_catalog_token="",
)


@pytest.fixture
def prepare_iceberg_rest_s3(
spark: SparkSession,
iceberg_rest_s3_for_conftest: IcebergRESTCatalogBasicAuthS3BasicDTO,
iceberg_rest_s3_for_conftest: IcebergRESTCatalogBearerAuthS3BasicDTO,
s3_file_connection: S3,
):
iceberg = iceberg_rest_s3_for_conftest
Expand All @@ -76,13 +74,7 @@ def prepare_iceberg_rest_s3(
connection = Iceberg(
spark=spark,
catalog_name=catalog_name,
catalog=Iceberg.RESTCatalog(
url=iceberg.rest_catalog_url,
auth=Iceberg.RESTCatalog.BasicAuth(
user=iceberg.rest_catalog_username,
password=iceberg.rest_catalog_password,
),
),
catalog=Iceberg.RESTCatalog(url=iceberg.rest_catalog_url),
warehouse=Iceberg.S3Warehouse(
path=iceberg.s3_warehouse_path,
host=iceberg.s3_host,
Expand Down Expand Up @@ -118,7 +110,7 @@ def fill_with_data(df: DataFrame):

@pytest_asyncio.fixture
async def iceberg_rest_s3_connection(
iceberg_rest_s3_for_worker: IcebergRESTCatalogBasicAuthS3BasicDTO,
iceberg_rest_s3_for_worker: IcebergRESTCatalogBearerAuthS3BasicDTO,
settings: Settings,
session: AsyncSession,
group: Group,
Expand Down Expand Up @@ -147,11 +139,10 @@ async def iceberg_rest_s3_connection(
settings=settings,
connection_id=result.id,
auth_data=dict(
type="iceberg_rest_basic_s3_basic",
type="iceberg_rest_bearer_s3_basic",
s3_access_key=iceberg.s3_access_key,
s3_secret_key=iceberg.s3_secret_key,
rest_catalog_username=iceberg.rest_catalog_username,
rest_catalog_password=iceberg.rest_catalog_password,
rest_catalog_token=iceberg.rest_catalog_token,
),
)

Expand Down
Loading
Loading