Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/294.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added OAuth2ClientCredentials to Iceberg REST Catalog
150 changes: 107 additions & 43 deletions poetry.lock

Large diffs are not rendered by default.

31 changes: 28 additions & 3 deletions syncmaster/dto/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ class HiveConnectionDTO(ConnectionDTO):


@dataclass
class IcebergRESTCatalogS3ConnectionDTO(ConnectionDTO):
class IcebergRESTCatalogS3ConnectionBaseDTO(ConnectionDTO):
metastore_url: str
metastore_username: str
metastore_password: str
s3_warehouse_path: str
s3_host: str
s3_bucket: str
Expand All @@ -91,6 +89,33 @@ class IcebergRESTCatalogS3ConnectionDTO(ConnectionDTO):
type: ClassVar[str] = "iceberg_rest_s3"


@dataclass(kw_only=True)
class IcebergRESTCatalogBasicAuthS3DTO(IcebergRESTCatalogS3ConnectionBaseDTO):
metastore_username: str
metastore_password: str
metastore_auth_type: Literal["basic"] = "basic"


@dataclass(kw_only=True)
class IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(IcebergRESTCatalogS3ConnectionBaseDTO):
metastore_oauth2_client_id: str
metastore_oauth2_client_secret: str
metastore_oauth2_scopes: list[str]
metastore_oauth2_resource: str | None = None
metastore_oauth2_audience: str | None = None
metastore_oauth2_server_uri: str | None = None
metastore_auth_type: Literal["oauth2"] = "oauth2"


# TODO: should be refactored
class IcebergRESTCatalogS3ConnectionDTO:
def __new__(cls, **data):
if "metastore_oauth2_client_id" in data:
return IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(**data)

return IcebergRESTCatalogBasicAuthS3DTO(**data)


@dataclass
class HDFSConnectionDTO(ConnectionDTO):
user: str
Expand Down
10 changes: 0 additions & 10 deletions syncmaster/schemas/v1/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@
ReadBasicAuthSchema,
UpdateBasicAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg_rest_basic import (
CreateIcebergRESTCatalogBasicAuthSchema,
IcebergRESTCatalogBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
UpdateIcebergRESTCatalogBasicAuthSchema,
)
from syncmaster.schemas.v1.auth.s3 import (
CreateS3AuthSchema,
ReadS3AuthSchema,
Expand Down Expand Up @@ -41,8 +35,4 @@
"UpdateSambaAuthSchema",
"AuthTokenSchema",
"TokenPayloadSchema",
"IcebergRESTCatalogBasicAuthSchema",
"CreateIcebergRESTCatalogBasicAuthSchema",
"ReadIcebergRESTCatalogBasicAuthSchema",
"UpdateIcebergRESTCatalogBasicAuthSchema",
]
2 changes: 2 additions & 0 deletions syncmaster/schemas/v1/auth/iceberg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
31 changes: 31 additions & 0 deletions syncmaster/schemas/v1/auth/iceberg/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Annotated

from pydantic import Field

from syncmaster.schemas.v1.auth.iceberg.basic import (
CreateIcebergRESTCatalogBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
UpdateIcebergRESTCatalogBasicAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg.oauth2_client_credentials import (
CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
)

CreateIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
CreateIcebergRESTCatalogBasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]

ReadIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
ReadIcebergRESTCatalogBasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]

UpdateIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
UpdateIcebergRESTCatalogBasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
Field(discriminator="type"),
]
39 changes: 39 additions & 0 deletions syncmaster/schemas/v1/auth/iceberg/oauth2_client_credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Literal

from pydantic import BaseModel, Field, SecretStr


class IcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(BaseModel):
type: Literal["iceberg_rest_oauth2_client_credentials_s3_basic"]


class CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(IcebergRESTCatalogOAuth2ClientCredentialsAuthSchema):
metastore_oauth2_client_id: str
metastore_oauth2_client_secret: SecretStr
metastore_oauth2_scopes: list[str] = Field(default_factory=list)
metastore_oauth2_resource: str | None = None
metastore_oauth2_audience: str | None = None
metastore_oauth2_server_uri: str | None = None
s3_access_key: str
s3_secret_key: SecretStr


class ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(IcebergRESTCatalogOAuth2ClientCredentialsAuthSchema):
metastore_oauth2_client_id: str
metastore_oauth2_scopes: list[str]
metastore_oauth2_resource: str | None
metastore_oauth2_audience: str | None
metastore_oauth2_server_uri: str | None
s3_access_key: str


class UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(
CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
):
metastore_oauth2_client_secret: SecretStr | None = None
s3_secret_key: SecretStr | None = None

def get_secret_fields(self) -> tuple[str, ...]:
return ("metastore_oauth2_client_secret", "s3_secret_key")
13 changes: 11 additions & 2 deletions syncmaster/schemas/v1/connections/connection_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,23 @@

from syncmaster.schemas.v1.auth import (
ReadBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
ReadS3AuthSchema,
ReadSambaAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg.basic import (
ReadIcebergRESTCatalogBasicAuthSchema,
)
from syncmaster.schemas.v1.auth.iceberg.oauth2_client_credentials import (
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
)
from syncmaster.schemas.v1.types import NameConstr

ReadConnectionAuthDataSchema = (
ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema | ReadIcebergRESTCatalogBasicAuthSchema
ReadBasicAuthSchema
| ReadS3AuthSchema
| ReadSambaAuthSchema
| ReadIcebergRESTCatalogBasicAuthSchema
| ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema
)


Expand Down
14 changes: 7 additions & 7 deletions syncmaster/schemas/v1/connections/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

from pydantic import BaseModel, Field

from syncmaster.schemas.v1.auth.iceberg_rest_basic import (
CreateIcebergRESTCatalogBasicAuthSchema,
ReadIcebergRESTCatalogBasicAuthSchema,
UpdateIcebergRESTCatalogBasicAuthSchema,
from syncmaster.schemas.v1.auth.iceberg.auth import (
CreateIcebergRESTCatalogS3ConnectionAuthDataSchema,
ReadIcebergRESTCatalogS3ConnectionAuthDataSchema,
UpdateIcebergRESTCatalogS3ConnectionAuthDataSchema,
)
from syncmaster.schemas.v1.connection_types import ICEBERG_REST_S3_TYPE
from syncmaster.schemas.v1.connections.connection_base import (
Expand Down Expand Up @@ -50,18 +50,18 @@ class CreateIcebergConnectionSchema(CreateConnectionBaseSchema):
"Data required to connect to the database. These are the parameters that are specified in the URL request."
),
)
auth_data: CreateIcebergRESTCatalogBasicAuthSchema = Field(
auth_data: CreateIcebergRESTCatalogS3ConnectionAuthDataSchema = Field(
description="Credentials for authorization",
)


class ReadIcebergConnectionSchema(ReadConnectionBaseSchema):
type: ICEBERG_REST_S3_TYPE
data: ReadIcebergRESTCatalogS3ConnectionDataSchema = Field(alias="connection_data")
auth_data: ReadIcebergRESTCatalogBasicAuthSchema | None = None
auth_data: ReadIcebergRESTCatalogS3ConnectionAuthDataSchema | None = None


class UpdateIcebergConnectionSchema(CreateIcebergConnectionSchema):
auth_data: UpdateIcebergRESTCatalogBasicAuthSchema = Field(
auth_data: UpdateIcebergRESTCatalogS3ConnectionAuthDataSchema = Field(
description="Credentials for authorization",
)
24 changes: 18 additions & 6 deletions syncmaster/worker/handlers/db/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from onetl.connection import Iceberg
from onetl.hooks import slot, support_hooks

from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO
from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionBaseDTO
from syncmaster.dto.transfers import IcebergRESTCatalogS3TransferDTO
from syncmaster.worker.handlers.db.base import DBHandler

Expand All @@ -20,7 +20,7 @@
@support_hooks
class IcebergRESTCatalogS3Handler(DBHandler):
connection: Iceberg
connection_dto: IcebergRESTCatalogS3ConnectionDTO
connection_dto: IcebergRESTCatalogS3ConnectionBaseDTO
transfer_dto: IcebergRESTCatalogS3TransferDTO
_operators = {
"regexp": "RLIKE",
Expand All @@ -33,10 +33,7 @@ def connect(self, spark: SparkSession):
catalog_name=self.transfer_dto.catalog_name,
catalog=Iceberg.RESTCatalog(
uri=self.connection_dto.metastore_url,
auth=Iceberg.RESTCatalog.BasicAuth(
user=self.connection_dto.metastore_username,
password=self.connection_dto.metastore_password,
),
auth=self._make_auth(),
),
warehouse=Iceberg.S3Warehouse(
path=self.connection_dto.s3_warehouse_path,
Expand Down Expand Up @@ -91,3 +88,18 @@ def _get_reading_options(self) -> dict:

def _quote_field(self, field: str) -> str:
return f"`{field}`"

def _make_auth(self):
if self.connection_dto.metastore_auth_type == "oauth2":
return Iceberg.RESTCatalog.OAuth2ClientCredentials(
client_id=self.connection_dto.metastore_oauth2_client_id,
client_secret=self.connection_dto.metastore_oauth2_client_secret,
scopes=self.connection_dto.metastore_oauth2_scopes,
resource=self.connection_dto.metastore_oauth2_resource,
audience=self.connection_dto.metastore_oauth2_audience,
server_uri=self.connection_dto.metastore_oauth2_server_uri,
)
return Iceberg.RESTCatalog.BasicAuth(
user=self.connection_dto.metastore_username,
password=self.connection_dto.metastore_password,
)
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,89 @@ async def test_developer_plus_can_create_iceberg_rest_s3_connection(
"s3_access_key": decrypted["s3_access_key"],
},
}


async def test_developer_plus_can_create_iceberg_rest_s3_connection_with_oauth2_client_credentials(
client: AsyncClient,
group: MockGroup,
session: AsyncSession,
settings: Settings,
role_developer_plus: UserTestRoles,
):
user = group.get_member_of_role(role_developer_plus)

result = await client.post(
"v1/connections",
headers={"Authorization": f"Bearer {user.token}"},
json={
"group_id": group.id,
"name": "New connection",
"description": "",
"type": "iceberg_rest_s3",
"connection_data": {
"metastore_url": "https://rest.domain.com",
"s3_warehouse_path": "/some/warehouse",
"s3_protocol": "http",
"s3_host": "localhost",
"s3_port": 9010,
"s3_bucket": "some_bucket",
"s3_region": "us-east-1",
"s3_bucket_style": "path",
},
"auth_data": {
"type": "iceberg_rest_oauth2_client_credentials_s3_basic",
"metastore_oauth2_client_id": "my_client_id",
"metastore_oauth2_client_secret": "my_client_secret",
"metastore_oauth2_scopes": ["catalog:read"],
"metastore_oauth2_audience": "iceberg-catalog",
"metastore_oauth2_server_uri": "https://oauth.example.com/token",
"s3_access_key": "access_key",
"s3_secret_key": "secret_key",
},
},
)
connection = (
await session.scalars(
select(Connection).filter_by(
name="New connection",
),
)
).first()

creds = (
await session.scalars(
select(AuthData).filter_by(
connection_id=connection.id,
),
)
).one()

decrypted = decrypt_auth_data(creds.value, settings=settings)
assert result.status_code == 200, result.json()
assert result.json() == {
"id": connection.id,
"group_id": connection.group_id,
"name": connection.name,
"description": connection.description,
"type": connection.type,
"connection_data": {
"metastore_url": connection.data["metastore_url"],
"s3_warehouse_path": connection.data["s3_warehouse_path"],
"s3_protocol": connection.data["s3_protocol"],
"s3_host": connection.data["s3_host"],
"s3_port": connection.data["s3_port"],
"s3_bucket": connection.data["s3_bucket"],
"s3_region": connection.data["s3_region"],
"s3_bucket_style": connection.data["s3_bucket_style"],
"s3_additional_params": connection.data["s3_additional_params"],
},
"auth_data": {
"type": decrypted["type"],
"metastore_oauth2_client_id": decrypted["metastore_oauth2_client_id"],
"metastore_oauth2_scopes": decrypted["metastore_oauth2_scopes"],
"metastore_oauth2_audience": decrypted["metastore_oauth2_audience"],
"metastore_oauth2_resource": decrypted["metastore_oauth2_resource"],
"metastore_oauth2_server_uri": decrypted["metastore_oauth2_server_uri"],
"s3_access_key": decrypted["s3_access_key"],
},
}
Loading
Loading