Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions .github/actions/run-pytest-with-uv/action.yml

This file was deleted.

51 changes: 0 additions & 51 deletions .github/workflows/elt-common_e2e_tests.yml

This file was deleted.

47 changes: 47 additions & 0 deletions .github/workflows/elt-common_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: Tests for elt-common
on:
push:
branches:
- "main"
paths:
- ".github/workflows/elt-common_tests.yml"
- "elt-common/**"
pull_request:
types: [opened, synchronize, reopened]
paths:
- ".github/workflows/elt-common_tests.yml"
- "elt-common/**"

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.run_id }}
cancel-in-progress: true

env:
PYTHON_VERSION: 3.13
UV_VERSION: latest

jobs:
test:
name: test
runs-on: ubuntu-slim

steps:
- name: Checkout
uses: actions/checkout@v6

- name: Install uv
uses: astral-sh/setup-uv@v7
with:
activate-environment: false
cache-dependency-glob: elt-common/pyproject.toml
python-version: ${{ env.PYTHON_VERSION }}

- name: Install the project
shell: bash -l {0}
run: uv sync --locked --all-extras --dev
working-directory: elt-common

- name: Run tests
shell: bash -l {0}
run: uv run pytest --durations-min=0.5 --exitfirst tests/ --cache-clear
working-directory: elt-common
33 changes: 0 additions & 33 deletions .github/workflows/elt-common_unit_tests.yml

This file was deleted.

4 changes: 2 additions & 2 deletions .github/workflows/warehouses_e2e_tests.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: End-to-end tests for extract and load scripts in the warehouse
name: End-to-end extract and load tests
on:
push:
branches:
Expand All @@ -24,7 +24,7 @@ env:

jobs:
test:
name: warehouses end-to-end tests
name: test
runs-on: ubuntu-latest

steps:
Expand Down
21 changes: 4 additions & 17 deletions elt-common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ to each pipeline.
Development requires the following tools:

- [uv](https://docs.astral.uv/uv/): Used to manage both Python installations and dependencies
- [docker & docker compose]: Used for standing up services for end-to-end testing.

### Setting up a Python virtual environment

Expand All @@ -21,25 +20,13 @@ package in editable mode, along with the development dependencies:
> uv pip install --editable . --group dev
```

## Running unit tests
## Running the tests

Run the unit tests using `pytest`:

```bash
> pytest tests/unit_tests
> pytest tests
```

## Running end-to-end tests

The end-to-end (e2e) tests for the `pyiceberg` destination require a running Iceberg
rest catalog to test complete functionality.
The local, docker-compose-based configuration provided by
[infra/local/docker-compose.yml](../infra/local/docker-compose.yml) is the easiest way to
spin up a set of services compatible with running the tests.
_Note the requirement to edit `/etc/hosts` described in [infra/local/README](../infra/local/README.md)._

Once the compose services are running, execute the e2e tests using `pytest`:

```bash
> pytest tests/e2e_tests
```
See ["How to invoke pytest"](https://docs.pytest.org/en/stable/how-to/usage.html#usage) fo
instructions on how to limit the tests that are executed.
2 changes: 2 additions & 0 deletions elt-common/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
[pytest]
filterwarnings= ignore::DeprecationWarning
markers =
requires_trino: marks tests that require a trino server (deselect with '-m "not requires_trino"')
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dataclasses
from typing import Dict, Literal, Final, Optional, TypeAlias
from typing import Dict, Literal, Final, Optional, TypeAlias, Type

from dlt.common.configuration import configspec
from dlt.common.configuration import configspec, resolve_type
from dlt.common.configuration.specs import CredentialsConfiguration
from dlt.common.destination.client import DestinationClientDwhConfiguration
from dlt.common.destination.exceptions import DestinationTerminalException
Expand All @@ -27,7 +27,9 @@ def as_dict(self) -> Dict[str, str]:
"""Return the credentials as a dictionary suitable for the Catalog constructor"""
# Map variable names to property names
# client_id & secret need to be combined
properties = {"credential": self.client_credential()} if self.client_id else {}
properties = {"type": "rest"}
if self.client_id:
properties.update({"credential": self.client_credential()})

field_aliases: Dict[str, str] = {
"access_delegation": f"{CATALOG_HEADER_PREFIX}x-iceberg-access-delegation",
Expand All @@ -48,37 +50,63 @@ def as_dict(self) -> Dict[str, str]:
def client_credential(self) -> str:
return f"{self.client_id}:{self.client_secret}"

def on_resolved(self) -> None:
# Check we have the minimum number of required authentication properties
# if any are supplied
auth_props = {
prop: getattr(self, prop)
for prop in ("oauth2_server_uri", "client_id", "client_secret")
}

PyIcebergCatalogCredentials: TypeAlias = PyIcebergRestCatalogCredentials
non_null_count = sum(map(lambda x: 1 if x is not None else 0, auth_props.values()))
if non_null_count != 0 and non_null_count != 3:
raise DestinationTerminalException(
f"Missing required configuration value(s) for authentication: {list(name for name, value in auth_props.items() if value is None)}"
)


@configspec(init=False)
class PyIcebergSqlCatalogCredentials(CredentialsConfiguration):
uri: str = None # type: ignore
warehouse: str = None # type: ignore

def as_dict(self) -> Dict[str, str]:
"""Return the credentials as a dictionary suitable for the Catalog constructor"""
return {"type": "sql", "uri": self.uri, "warehouse": self.warehouse}

def on_resolved(self) -> None:
pass


PyIcebergCatalogCredentials: TypeAlias = (
PyIcebergRestCatalogCredentials | PyIcebergSqlCatalogCredentials
)


@configspec(init=False)
class IcebergClientConfiguration(DestinationClientDwhConfiguration):
destination_type: Final[str] = dataclasses.field(
destination_type: Final[str] = dataclasses.field( # type: ignore[misc]
default="pyiceberg", init=False, repr=False, compare=False
) # type: ignore[misc]
)

catalog_type: Literal["rest"] = "rest"
catalog_type: Literal["rest", "sql"] = "rest"
credentials: PyIcebergCatalogCredentials = None # type: ignore

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
return (
PyIcebergRestCatalogCredentials
if self.catalog_type == "rest"
else PyIcebergSqlCatalogCredentials
)

@property
def connection_properties(self) -> Dict[str, str]:
"""Returns a mapping of connection properties to pass to the catalog constructor"""
return self.credentials.as_dict() if self.credentials is not None else {}

def on_resolved(self) -> None:
# Check we have the minimum number of required authentication properties
# if any are supplied
auth_props = {
prop: getattr(self.credentials, prop)
for prop in ("oauth2_server_uri", "client_id", "client_secret")
}

non_null_count = sum(map(lambda x: 1 if x is not None else 0, auth_props.values()))
if non_null_count != 0 and non_null_count != 3:
raise DestinationTerminalException(
f"Missing required configuration value(s) for authentication: {list(name for name, value in auth_props.items() if value is None)}"
)
return self.credentials.on_resolved()

def fingerprint(self) -> str:
"""Returns a fingerprint of a connection string."""
Expand Down
12 changes: 0 additions & 12 deletions elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.schema.typing import TColumnType, TTableSchemaColumns
from pyiceberg.catalog import Catalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionField,
Expand Down Expand Up @@ -48,17 +47,6 @@
###############################################################################


def create_catalog(name: str, **properties: str) -> Catalog:
"""Create an Iceberg catalog

Args:
name: Name to identify the catalog.
properties: Properties that are passed along to the configuration.
"""

return RestCatalog(name, **properties)


def namespace_exists(catalog: Catalog, namespace: Union[str, Identifier]) -> bool:
try:
catalog.load_namespace_properties(namespace)
Expand Down
Loading