From 4c30347d956d2722c565491f153d1a8ccde64176 Mon Sep 17 00:00:00 2001 From: Martyn Gigg Date: Mon, 12 Jan 2026 17:12:41 +0000 Subject: [PATCH 1/8] WIP: Switch to sqlite catalog for elt-common e2e_tests It avoids any external services and vastly improves test times. Testing with the RestCatalog will be used for the pipelines. --- .../pyiceberg/configuration.py | 66 +++++++++++++------ .../dlt_destinations/pyiceberg/helpers.py | 12 ---- .../dlt_destinations/pyiceberg/pyiceberg.py | 4 +- .../src/elt_common/testing/lakekeeper.py | 4 +- 4 files changed, 51 insertions(+), 35 deletions(-) diff --git a/elt-common/src/elt_common/dlt_destinations/pyiceberg/configuration.py b/elt-common/src/elt_common/dlt_destinations/pyiceberg/configuration.py index 0c7103ad..11d2c213 100644 --- a/elt-common/src/elt_common/dlt_destinations/pyiceberg/configuration.py +++ b/elt-common/src/elt_common/dlt_destinations/pyiceberg/configuration.py @@ -1,7 +1,7 @@ import dataclasses -from typing import Dict, Literal, Final, Optional, TypeAlias +from typing import Dict, Literal, Final, Optional, TypeAlias, Type -from dlt.common.configuration import configspec +from dlt.common.configuration import configspec, resolve_type from dlt.common.configuration.specs import CredentialsConfiguration from dlt.common.destination.client import DestinationClientDwhConfiguration from dlt.common.destination.exceptions import DestinationTerminalException @@ -27,7 +27,9 @@ def as_dict(self) -> Dict[str, str]: """Return the credentials as a dictionary suitable for the Catalog constructor""" # Map variable names to property names # client_id & secret need to be combined - properties = {"credential": self.client_credential()} if self.client_id else {} + properties = {"type": "rest"} + if self.client_id: + properties.update({"credential": self.client_credential()}) field_aliases: Dict[str, str] = { "access_delegation": f"{CATALOG_HEADER_PREFIX}x-iceberg-access-delegation", @@ -48,37 +50,63 @@ def as_dict(self) -> Dict[str, str]: def client_credential(self) -> str: return f"{self.client_id}:{self.client_secret}" + def on_resolved(self) -> None: + # Check we have the minimum number of required authentication properties + # if any are supplied + auth_props = { + prop: getattr(self, prop) + for prop in ("oauth2_server_uri", "client_id", "client_secret") + } -PyIcebergCatalogCredentials: TypeAlias = PyIcebergRestCatalogCredentials + non_null_count = sum(map(lambda x: 1 if x is not None else 0, auth_props.values())) + if non_null_count != 0 and non_null_count != 3: + raise DestinationTerminalException( + f"Missing required configuration value(s) for authentication: {list(name for name, value in auth_props.items() if value is None)}" + ) + + +@configspec(init=False) +class PyIcebergSqlCatalogCredentials(CredentialsConfiguration): + uri: str = None # type: ignore + warehouse: str = None # type: ignore + + def as_dict(self) -> Dict[str, str]: + """Return the credentials as a dictionary suitable for the Catalog constructor""" + return {"type": "sql", "uri": self.uri, "warehouse": self.warehouse} + + def on_resolved(self) -> None: + pass + + +PyIcebergCatalogCredentials: TypeAlias = ( + PyIcebergRestCatalogCredentials | PyIcebergSqlCatalogCredentials +) @configspec(init=False) class IcebergClientConfiguration(DestinationClientDwhConfiguration): - destination_type: Final[str] = dataclasses.field( + destination_type: Final[str] = dataclasses.field( # type: ignore[misc] default="pyiceberg", init=False, repr=False, compare=False - ) # type: ignore[misc] + ) - catalog_type: Literal["rest"] = "rest" + catalog_type: Literal["rest", "sql"] = "rest" credentials: PyIcebergCatalogCredentials = None # type: ignore + @resolve_type("credentials") + def resolve_credentials_type(self) -> Type[CredentialsConfiguration]: + return ( + PyIcebergRestCatalogCredentials + if self.catalog_type == "rest" + else PyIcebergSqlCatalogCredentials + ) + @property def connection_properties(self) -> Dict[str, str]: """Returns a mapping of connection properties to pass to the catalog constructor""" return self.credentials.as_dict() if self.credentials is not None else {} def on_resolved(self) -> None: - # Check we have the minimum number of required authentication properties - # if any are supplied - auth_props = { - prop: getattr(self.credentials, prop) - for prop in ("oauth2_server_uri", "client_id", "client_secret") - } - - non_null_count = sum(map(lambda x: 1 if x is not None else 0, auth_props.values())) - if non_null_count != 0 and non_null_count != 3: - raise DestinationTerminalException( - f"Missing required configuration value(s) for authentication: {list(name for name, value in auth_props.items() if value is None)}" - ) + return self.credentials.on_resolved() def fingerprint(self) -> str: """Returns a fingerprint of a connection string.""" diff --git a/elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py b/elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py index c6a21363..3b286e14 100644 --- a/elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py +++ b/elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py @@ -3,7 +3,6 @@ from dlt.common.destination.typing import PreparedTableSchema from dlt.common.schema.typing import TColumnType, TTableSchemaColumns from pyiceberg.catalog import Catalog -from pyiceberg.catalog.rest import RestCatalog from pyiceberg.partitioning import ( UNPARTITIONED_PARTITION_SPEC, PartitionField, @@ -48,17 +47,6 @@ ############################################################################### -def create_catalog(name: str, **properties: str) -> Catalog: - """Create an Iceberg catalog - - Args: - name: Name to identify the catalog. - properties: Properties that are passed along to the configuration. - """ - - return RestCatalog(name, **properties) - - def namespace_exists(catalog: Catalog, namespace: Union[str, Identifier]) -> bool: try: catalog.load_namespace_properties(namespace) diff --git a/elt-common/src/elt_common/dlt_destinations/pyiceberg/pyiceberg.py b/elt-common/src/elt_common/dlt_destinations/pyiceberg/pyiceberg.py index 0dc96150..c0518ace 100644 --- a/elt-common/src/elt_common/dlt_destinations/pyiceberg/pyiceberg.py +++ b/elt-common/src/elt_common/dlt_destinations/pyiceberg/pyiceberg.py @@ -24,6 +24,7 @@ from dlt.common.libs.pyarrow import pyarrow as pa import pyarrow.parquet as pq +from pyiceberg.catalog import load_catalog from pyiceberg.expressions import AlwaysTrue, And, EqualTo from pyiceberg.table import Table as PyIcebergTable @@ -31,7 +32,6 @@ IcebergClientConfiguration, ) from elt_common.dlt_destinations.pyiceberg.helpers import ( - create_catalog, create_iceberg_schema, create_partition_spec, create_sort_order, @@ -78,7 +78,7 @@ def __init__( super().__init__(schema, config, capabilities) self.dataset_name = self.config.normalize_dataset_name(self.schema) self.config = config - self.iceberg_catalog = create_catalog(name="default", **config.connection_properties) + self.iceberg_catalog = load_catalog(name="default", **config.connection_properties) self.type_mapper = cast(PyIcebergTypeMapper, self.capabilities.get_type_mapper()) # ----- JobClientBase ----- diff --git a/elt-common/src/elt_common/testing/lakekeeper.py b/elt-common/src/elt_common/testing/lakekeeper.py index 1c1a5217..7db66d2a 100644 --- a/elt-common/src/elt_common/testing/lakekeeper.py +++ b/elt-common/src/elt_common/testing/lakekeeper.py @@ -10,7 +10,7 @@ import tenacity from elt_common.dlt_destinations.pyiceberg.helpers import create_catalog -from elt_common.dlt_destinations.pyiceberg.configuration import PyIcebergCatalogCredentials +from elt_common.dlt_destinations.pyiceberg.configuration import PyIcebergRestCatalogCredentials from . import DEFAULT_RETRY_ARGS, Endpoint, Settings @@ -102,7 +102,7 @@ class Warehouse: def connect(self) -> PyIcebergCatalog: """Connect to the warehouse in the catalog""" - creds = PyIcebergCatalogCredentials() + creds = PyIcebergRestCatalogCredentials() creds.uri = str(self.server.catalog_endpoint()) creds.project_id = self.server.settings.project_id creds.warehouse = self.name From 5a8afa6fbd587474ced2c83fdde17fee5a69245c Mon Sep 17 00:00:00 2001 From: Martyn Gigg Date: Tue, 13 Jan 2026 13:27:02 +0000 Subject: [PATCH 2/8] Support e2e tests with SqlCatalog, except those requiring Trino --- elt-common/pytest.ini | 2 + elt-common/src/elt_common/testing/__init__.py | 11 +- elt-common/src/elt_common/testing/dlt.py | 62 +++++---- elt-common/src/elt_common/testing/fixtures.py | 119 ++++++++---------- .../src/elt_common/testing/lakekeeper.py | 41 ++++-- .../src/elt_common/testing/sqlcatalog.py | 20 +++ .../e2e_tests/elt_common/iceberg/conftest.py | 4 +- .../elt_common/iceberg/test_maintenance.py | 10 +- 8 files changed, 159 insertions(+), 110 deletions(-) create mode 100644 elt-common/src/elt_common/testing/sqlcatalog.py diff --git a/elt-common/pytest.ini b/elt-common/pytest.ini index 26bff6c5..486d1c89 100644 --- a/elt-common/pytest.ini +++ b/elt-common/pytest.ini @@ -1,2 +1,4 @@ [pytest] filterwarnings= ignore::DeprecationWarning +markers = + requires_trino: marks tests that require a trino server (deselect with '-m "not requires_trino"') diff --git a/elt-common/src/elt_common/testing/__init__.py b/elt-common/src/elt_common/testing/__init__.py index bb7940e1..ee0ae4b0 100644 --- a/elt-common/src/elt_common/testing/__init__.py +++ b/elt-common/src/elt_common/testing/__init__.py @@ -34,10 +34,16 @@ def value(self, *, use_internal_netloc: bool) -> str: class Settings(BaseSettings): model_config = SettingsConfigDict( - env_prefix="tests_", + env_prefix="elt_common_testing_", ) - # iceberg catalog + # iceberg catalog type: 'sql' or 'rest' + catalog_type: str = "sql" + + # common settings + warehouse_name: str = "e2e_tests" + + # rest catalog settings # The default values assume the docker-compose.yml in the infra/local has been used. # These are provided for the convenience of easily running a debugger without having # to set up remote debugging @@ -53,7 +59,6 @@ class Settings(BaseSettings): openid_client_secret: str = "s3cr3t" openid_scope: str = "lakekeeper" project_id: str = "c4fcd44f-7ce7-4446-9f7c-dcc7ba76dd22" - warehouse_name: str = "e2e_tests" # trino trino_http_scheme: str = "https" diff --git a/elt-common/src/elt_common/testing/dlt.py b/elt-common/src/elt_common/testing/dlt.py index 277d8036..ad38e565 100644 --- a/elt-common/src/elt_common/testing/dlt.py +++ b/elt-common/src/elt_common/testing/dlt.py @@ -14,7 +14,8 @@ ) from dlt.common.runtime.run_context import RunContext -from .lakekeeper import Warehouse +from .sqlcatalog import SqlCatalogWarehouse +from .lakekeeper import RestCatalogWarehouse def configure_dlt_for_testing(): @@ -34,7 +35,7 @@ def initial_providers(self) -> List[ConfigProvider]: class PyIcebergDestinationTestConfiguration: """Class for defining test setup for pyiceberg destination.""" - warehouse: Warehouse + warehouse: SqlCatalogWarehouse | RestCatalogWarehouse destination: TDestinationReferenceArg = "elt_common.dlt_destinations.pyiceberg" def setup(self, environ: MutableMapping = os.environ) -> None: @@ -42,30 +43,39 @@ def setup(self, environ: MutableMapping = os.environ) -> None: Defaults to to os.environ """ - server, server_settings = self.warehouse.server, self.warehouse.server.settings - environ["DESTINATION__PYICEBERG__CREDENTIALS__URI"] = str( - self.warehouse.server.catalog_endpoint() - ) - environ["DESTINATION__PYICEBERG__CREDENTIALS__PROJECT_ID"] = str( - self.warehouse.server.settings.project_id - ) - environ.setdefault("DESTINATION__PYICEBERG__CREDENTIALS__WAREHOUSE", self.warehouse.name) - environ.setdefault( - "DESTINATION__PYICEBERG__CREDENTIALS__OAUTH2_SERVER_URI", - str(server.token_endpoint), - ) - environ.setdefault( - "DESTINATION__PYICEBERG__CREDENTIALS__CLIENT_ID", - server_settings.openid_client_id, - ) - environ.setdefault( - "DESTINATION__PYICEBERG__CREDENTIALS__CLIENT_SECRET", - server_settings.openid_client_secret, - ) - environ.setdefault( - "DESTINATION__PYICEBERG__CREDENTIALS__SCOPE", - server_settings.openid_scope, - ) + if isinstance(self.warehouse, SqlCatalogWarehouse): + environ["DESTINATION__PYICEBERG__CATALOG_TYPE"] = "sql" + environ["DESTINATION__PYICEBERG__CREDENTIALS__URI"] = self.warehouse.uri + environ["DESTINATION__PYICEBERG__CREDENTIALS__WAREHOUSE"] = ( + self.warehouse.warehouse_path + ) + else: + server, server_settings = self.warehouse.server, self.warehouse.server.settings + environ["DESTINATION__PYICEBERG__CREDENTIALS__URI"] = str( + self.warehouse.server.catalog_endpoint() + ) + environ["DESTINATION__PYICEBERG__CREDENTIALS__PROJECT_ID"] = str( + self.warehouse.server.settings.project_id + ) + environ.setdefault( + "DESTINATION__PYICEBERG__CREDENTIALS__WAREHOUSE", self.warehouse.name + ) + environ.setdefault( + "DESTINATION__PYICEBERG__CREDENTIALS__OAUTH2_SERVER_URI", + str(server.token_endpoint), + ) + environ.setdefault( + "DESTINATION__PYICEBERG__CREDENTIALS__CLIENT_ID", + server_settings.openid_client_id, + ) + environ.setdefault( + "DESTINATION__PYICEBERG__CREDENTIALS__CLIENT_SECRET", + server_settings.openid_client_secret, + ) + environ.setdefault( + "DESTINATION__PYICEBERG__CREDENTIALS__SCOPE", + server_settings.openid_scope, + ) def setup_pipeline( self, diff --git a/elt-common/src/elt_common/testing/fixtures.py b/elt-common/src/elt_common/testing/fixtures.py index f33bbd6e..d30f0f43 100644 --- a/elt-common/src/elt_common/testing/fixtures.py +++ b/elt-common/src/elt_common/testing/fixtures.py @@ -4,16 +4,17 @@ import time from typing import Generator import urllib.parse +import shutil import warnings from minio import Minio import pytest -import requests import tenacity from . import DEFAULT_RETRY_ARGS from .dlt import PyIcebergDestinationTestConfiguration -from .lakekeeper import Settings, Server, Warehouse +from .lakekeeper import Settings, Server +from .sqlcatalog import SqlCatalogWarehouse @pytest.fixture(scope="session") @@ -22,79 +23,65 @@ def settings() -> Settings: @pytest.fixture(scope="session") -def token_endpoint(settings: Settings) -> str: - response = requests.get(str(settings.openid_provider_uri + "/.well-known/openid-configuration")) - response.raise_for_status() - return response.json()["token_endpoint"] - - -@pytest.fixture(scope="session") -def access_token(settings: Settings, token_endpoint: str) -> str: - response = requests.post( - token_endpoint, - data={ - "grant_type": "client_credentials", - "client_id": settings.openid_client_id, - "client_secret": settings.openid_client_secret, - "scope": settings.openid_scope, - }, - ) - response.raise_for_status() - return response.json()["access_token"] - - -@pytest.fixture(scope="session") -def server(settings: Settings, access_token: str) -> Server: - return Server(access_token, settings) - - -@pytest.fixture(scope="session") -def warehouse(settings: Settings, server: Server) -> Generator: +def warehouse(settings: Settings) -> Generator: if not settings.warehouse_name: raise ValueError("Empty 'warehouse_name' is not allowed.") - storage_config = settings.storage_config() - # Ensure bucket exists - s3_hostname = urllib.parse.urlparse(storage_config["storage-profile"]["endpoint"]).netloc - minio_client = Minio( - endpoint=s3_hostname, - access_key=storage_config["storage-credential"]["aws-access-key-id"], - secret_key=storage_config["storage-credential"]["aws-secret-access-key"], - secure=False, - ) - bucket_name = storage_config["storage-profile"]["bucket"] - if not minio_client.bucket_exists(bucket_name=bucket_name): - minio_client.make_bucket(bucket_name=bucket_name) - print(f"Bucket {bucket_name} created.") - - warehouse = server.create_warehouse( - settings.warehouse_name, server.settings.project_id, storage_config - ) - print(f"Warehouse {warehouse.project_id} created.") + if settings.catalog_type not in ("sql", "rest"): + raise ValueError( + f"Invalid catalog_type '{settings.catalog_type}'. Allowed values: sql, rest." + ) + + if settings.catalog_type == "sql": + warehouse = SqlCatalogWarehouse(settings.warehouse_name) + + def cleanup_func(): + shutil.rmtree(warehouse.workdir.name) + else: + server = Server(settings) + storage_config = settings.storage_config() + s3_hostname = urllib.parse.urlparse(storage_config["storage-profile"]["endpoint"]).netloc + minio_client = Minio( + endpoint=s3_hostname, + access_key=storage_config["storage-credential"]["aws-access-key-id"], + secret_key=storage_config["storage-credential"]["aws-secret-access-key"], + secure=False, + ) + bucket_name = storage_config["storage-profile"]["bucket"] + if not minio_client.bucket_exists(bucket_name=bucket_name): + minio_client.make_bucket(bucket_name=bucket_name) + print(f"Bucket {bucket_name} created.") + + warehouse = server.create_warehouse( + settings.warehouse_name, server.settings.project_id, storage_config + ) + + def cleanup_func(): + @tenacity.retry(**DEFAULT_RETRY_ARGS) + def _remove_bucket(bucket_name): + minio_client.remove_bucket(bucket_name=bucket_name) + + try: + # Allow a brief pause for the test operations to complete + time.sleep(1) + server.purge_warehouse(warehouse) + server.delete_warehouse(warehouse) + _remove_bucket(bucket_name) + + except RuntimeError as exc: + warnings.warn( + f"Error deleting test warehouse '{str(warehouse.project_id)}'. It may need to be removed manually." + ) + warnings.warn(f"Error:\n{str(exc)}") + try: yield warehouse finally: - - @tenacity.retry(**DEFAULT_RETRY_ARGS) - def _remove_bucket(bucket_name): - minio_client.remove_bucket(bucket_name=bucket_name) - - try: - # Allow a brief pause for the test operations to complete - time.sleep(1) - server.purge_warehouse(warehouse) - server.delete_warehouse(warehouse) - _remove_bucket(bucket_name) - - except RuntimeError as exc: - warnings.warn( - f"Error deleting test warehouse '{str(warehouse.project_id)}'. It may need to be removed manually." - ) - warnings.warn(f"Error:\n{str(exc)}") + cleanup_func() @pytest.fixture -def destination_config(warehouse: Warehouse): +def destination_config(warehouse): destination_config = PyIcebergDestinationTestConfiguration(warehouse) try: yield destination_config diff --git a/elt-common/src/elt_common/testing/lakekeeper.py b/elt-common/src/elt_common/testing/lakekeeper.py index 7db66d2a..bebc6481 100644 --- a/elt-common/src/elt_common/testing/lakekeeper.py +++ b/elt-common/src/elt_common/testing/lakekeeper.py @@ -5,11 +5,10 @@ import uuid import pyarrow as pa -from pyiceberg.catalog import Catalog as PyIcebergCatalog +from pyiceberg.catalog import Catalog as PyIcebergCatalog, load_catalog import requests import tenacity -from elt_common.dlt_destinations.pyiceberg.helpers import create_catalog from elt_common.dlt_destinations.pyiceberg.configuration import PyIcebergRestCatalogCredentials from . import DEFAULT_RETRY_ARGS, Endpoint, Settings @@ -18,9 +17,9 @@ class Server: """Wraps a Lakekeeper instance. It is assumed that the instance is bootstrapped.""" - def __init__(self, access_token: str, settings: Settings): - self.access_token = access_token + def __init__(self, settings: Settings): self.settings = settings + self.access_token = access_token(settings) @property def token_endpoint(self) -> Endpoint: @@ -42,7 +41,9 @@ def management_endpoint(self, *, version: int | None = None) -> Endpoint: def warehouse_endpoint(self, *, version: int = 1) -> Endpoint: return self.management_endpoint(version=version) + "/warehouse" - def create_warehouse(self, name: str, project_id: str, storage_config: dict) -> "Warehouse": + def create_warehouse( + self, name: str, project_id: str, storage_config: dict + ) -> "RestCatalogWarehouse": """Create a warehouse in this server""" payload = { @@ -65,7 +66,7 @@ def create_warehouse(self, name: str, project_id: str, storage_config: dict) -> warehouse_id = response.json()["warehouse-id"] print(f"Created warehouse {name} with ID {warehouse_id}") - return Warehouse( + return RestCatalogWarehouse( self, name, uuid.UUID(warehouse_id), @@ -73,12 +74,12 @@ def create_warehouse(self, name: str, project_id: str, storage_config: dict) -> ) @tenacity.retry(**DEFAULT_RETRY_ARGS) - def purge_warehouse(self, warehouse: "Warehouse") -> None: + def purge_warehouse(self, warehouse: "RestCatalogWarehouse") -> None: """Purge all of the data in the given warehouse""" warehouse.purge() @tenacity.retry(**DEFAULT_RETRY_ARGS) - def delete_warehouse(self, warehouse: "Warehouse") -> None: + def delete_warehouse(self, warehouse: "RestCatalogWarehouse") -> None: """Purge all of the data in the given warehouse and delete it""" response = self._request_with_auth( requests.delete, self.warehouse_endpoint() + f"/{str(warehouse.project_id)}" @@ -94,7 +95,7 @@ def _request_with_auth(self, requests_method: Callable, url: Endpoint, **kwargs) @dataclasses.dataclass -class Warehouse: +class RestCatalogWarehouse: server: Server name: str project_id: uuid.UUID @@ -110,7 +111,7 @@ def connect(self) -> PyIcebergCatalog: creds.client_id = self.server.settings.openid_client_id creds.client_secret = self.server.settings.openid_client_secret creds.scope = self.server.settings.openid_scope - return create_catalog(name="default", **creds.as_dict()) + return load_catalog(name="default", **creds.as_dict()) @contextmanager def create_test_tables( @@ -165,3 +166,23 @@ def purge(self): for table_id in catalog.list_tables(ns): catalog.purge_table(table_id) catalog.drop_namespace(ns) + + +def token_endpoint(settings: Settings) -> str: + response = requests.get(str(settings.openid_provider_uri + "/.well-known/openid-configuration")) + response.raise_for_status() + return response.json()["token_endpoint"] + + +def access_token(settings: Settings) -> str: + response = requests.post( + token_endpoint(settings), + data={ + "grant_type": "client_credentials", + "client_id": settings.openid_client_id, + "client_secret": settings.openid_client_secret, + "scope": settings.openid_scope, + }, + ) + response.raise_for_status() + return response.json()["access_token"] diff --git a/elt-common/src/elt_common/testing/sqlcatalog.py b/elt-common/src/elt_common/testing/sqlcatalog.py new file mode 100644 index 00000000..13dbbc00 --- /dev/null +++ b/elt-common/src/elt_common/testing/sqlcatalog.py @@ -0,0 +1,20 @@ +from tempfile import TemporaryDirectory + +from elt_common.dlt_destinations.pyiceberg.configuration import PyIcebergSqlCatalogCredentials +from pyiceberg.catalog import Catalog as PyIcebergCatalog +from pyiceberg.catalog import load_catalog + + +class SqlCatalogWarehouse: + def __init__(self, warehouse_name: str): + self.name = warehouse_name + self.workdir = TemporaryDirectory() + self.uri = f"sqlite:///{self.workdir.name}/{warehouse_name}.db" + self.warehouse_path = f"file://{self.workdir.name}/{warehouse_name}" + + def connect(self) -> PyIcebergCatalog: + """Connect to the warehouse in the catalog""" + creds = PyIcebergSqlCatalogCredentials() + creds.uri = self.uri + creds.warehouse = self.warehouse_path + return load_catalog(name="default", **creds.as_dict()) diff --git a/elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py b/elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py index 85e47fae..d3c0618d 100644 --- a/elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py +++ b/elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py @@ -1,11 +1,11 @@ from elt_common.iceberg.maintenance import TrinoCredentials, TrinoQueryEngine import pytest -from elt_common.testing.lakekeeper import Warehouse +from elt_common.testing.lakekeeper import RestCatalogWarehouse @pytest.fixture(scope="session") -def trino_engine(warehouse: Warehouse): +def trino_engine(warehouse: RestCatalogWarehouse): server = warehouse.server server_settings = server.settings creds = TrinoCredentials( diff --git a/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py b/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py index 658c0be0..c6c9b1b5 100644 --- a/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py +++ b/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py @@ -5,7 +5,7 @@ from click.testing import CliRunner from elt_common.iceberg.trino import TrinoCredentials, TrinoQueryEngine from elt_common.iceberg.maintenance import cli, IcebergTableMaintenaceSql -from elt_common.testing.lakekeeper import Warehouse +from elt_common.testing.lakekeeper import RestCatalogWarehouse import pytest from pytest_mock import MockerFixture @@ -18,8 +18,9 @@ def create_views(trino_query_engine, namespace: str, query_table: str): ) +@pytest.mark.requires_trino def test_trino_query_engine_list_tables_returns_only_iceberg_tables( - warehouse: Warehouse, trino_engine: TrinoQueryEngine + warehouse: RestCatalogWarehouse, trino_engine: TrinoQueryEngine ): with warehouse.create_test_tables( namespace_count=2, @@ -31,6 +32,7 @@ def test_trino_query_engine_list_tables_returns_only_iceberg_tables( assert len(iceberg_tables) == 4 +@pytest.mark.requires_trino @pytest.mark.parametrize( "command,command_args", [ @@ -41,7 +43,7 @@ def test_trino_query_engine_list_tables_returns_only_iceberg_tables( ], ) def test_iceberg_maintenance_commands_run_expected_trino_alter_table_command( - warehouse: Warehouse, + warehouse: RestCatalogWarehouse, trino_engine: TrinoQueryEngine, mocker: MockerFixture, command: str, @@ -66,6 +68,7 @@ def test_iceberg_maintenance_commands_run_expected_trino_alter_table_command( assert key in command_match.group(2) +@pytest.mark.requires_trino def test_iceberg_maintenance_cli_runs_successfully(mocker: MockerFixture): mock_from_env = mocker.patch.object(TrinoCredentials, "from_env", spec=TrinoCredentials) mock_from_env.return_value = TrinoCredentials("host", "1234", "catalog", "user", "password") @@ -86,6 +89,7 @@ def test_iceberg_maintenance_cli_runs_successfully(mocker: MockerFixture): assert mock_trino_execute.call_count == 8 +@pytest.mark.requires_trino def test_iceberg_maintenance_cli_raises_error_on_invalid_retention_format(): runner = CliRunner() From c14fc120e5e9ea1b41e05bc9b3fc27f25a71cf5e Mon Sep 17 00:00:00 2001 From: Martyn Gigg Date: Tue, 13 Jan 2026 14:46:08 +0000 Subject: [PATCH 3/8] Remove requires_trino marker from tests with mocks --- elt-common/src/elt_common/iceberg/trino.py | 11 ++++++++--- .../e2e_tests/elt_common/iceberg/test_maintenance.py | 2 -- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/elt-common/src/elt_common/iceberg/trino.py b/elt-common/src/elt_common/iceberg/trino.py index 2b4b5625..a948de46 100644 --- a/elt-common/src/elt_common/iceberg/trino.py +++ b/elt-common/src/elt_common/iceberg/trino.py @@ -20,8 +20,8 @@ class TrinoCredentials: host: str port: str catalog: str - user: str - password: str + user: str | None + password: str | None http_scheme: str = "https" @classmethod @@ -104,10 +104,15 @@ def validate_retention_threshold(cls, retention_threshold: str): # private def _create_engine(self, credentials: TrinoCredentials) -> Engine: + if credentials.user is None or credentials.password is None: + auth = BasicAuthentication("trino", "") + else: + auth = BasicAuthentication(credentials.user, credentials.password) + return create_engine( self.url, connect_args={ - "auth": BasicAuthentication(credentials.user, credentials.password), + "auth": auth, "http_scheme": credentials.http_scheme, "verify": False, }, diff --git a/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py b/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py index c6c9b1b5..f9ff2b01 100644 --- a/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py +++ b/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py @@ -68,7 +68,6 @@ def test_iceberg_maintenance_commands_run_expected_trino_alter_table_command( assert key in command_match.group(2) -@pytest.mark.requires_trino def test_iceberg_maintenance_cli_runs_successfully(mocker: MockerFixture): mock_from_env = mocker.patch.object(TrinoCredentials, "from_env", spec=TrinoCredentials) mock_from_env.return_value = TrinoCredentials("host", "1234", "catalog", "user", "password") @@ -89,7 +88,6 @@ def test_iceberg_maintenance_cli_runs_successfully(mocker: MockerFixture): assert mock_trino_execute.call_count == 8 -@pytest.mark.requires_trino def test_iceberg_maintenance_cli_raises_error_on_invalid_retention_format(): runner = CliRunner() From 98518e74fad6874b877255c0191aa02d1d423c57 Mon Sep 17 00:00:00 2001 From: Martyn Gigg Date: Tue, 13 Jan 2026 15:22:02 +0000 Subject: [PATCH 4/8] Mock Trino and move iceberg/maintenance tests to unit_tests Removes the need for any external infrastructure for the tests. --- .../e2e_tests/elt_common/iceberg/conftest.py | 51 ----------------- .../pyiceberg/test_helpers.py | 11 +--- .../tests/unit_tests/iceberg/__init__.py | 0 .../iceberg/maintenance/__init__.py | 0 .../maintenance/test_table_maintenance.py} | 57 +++++-------------- .../tests/unit_tests/iceberg/test_trino.py | 21 +++++++ 6 files changed, 36 insertions(+), 104 deletions(-) delete mode 100644 elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py create mode 100644 elt-common/tests/unit_tests/iceberg/__init__.py create mode 100644 elt-common/tests/unit_tests/iceberg/maintenance/__init__.py rename elt-common/tests/{e2e_tests/elt_common/iceberg/test_maintenance.py => unit_tests/iceberg/maintenance/test_table_maintenance.py} (51%) create mode 100644 elt-common/tests/unit_tests/iceberg/test_trino.py diff --git a/elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py b/elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py deleted file mode 100644 index d3c0618d..00000000 --- a/elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py +++ /dev/null @@ -1,51 +0,0 @@ -from elt_common.iceberg.maintenance import TrinoCredentials, TrinoQueryEngine -import pytest - -from elt_common.testing.lakekeeper import RestCatalogWarehouse - - -@pytest.fixture(scope="session") -def trino_engine(warehouse: RestCatalogWarehouse): - server = warehouse.server - server_settings = server.settings - creds = TrinoCredentials( - host=server_settings.trino_host, - port=server_settings.trino_port, - user=server_settings.trino_user, - password=server_settings.trino_password, - catalog=warehouse.name, - http_scheme="https", - ) - # Use one connection to create the catalog - trino_catalog_creator = TrinoQueryEngine(creds) - trino_catalog_creator.execute( - f"""create catalog {warehouse.name} using iceberg -with ( - "iceberg.catalog.type" = 'rest', - "iceberg.rest-catalog.warehouse" = '{warehouse.server.settings.project_id}/{warehouse.name}', - "iceberg.rest-catalog.uri" = '{server.catalog_endpoint().value(use_internal_netloc=True)}', - "iceberg.rest-catalog.vended-credentials-enabled" = 'false', - "iceberg.rest-catalog.security" = 'OAUTH2', - "iceberg.rest-catalog.oauth2.server-uri" = '{server.token_endpoint.value(use_internal_netloc=True)}', - "iceberg.rest-catalog.oauth2.credential" = '{server_settings.openid_client_id}:{server.settings.openid_client_secret}', - "iceberg.rest-catalog.oauth2.scope" = 'lakekeeper offline_access', - "iceberg.expire-snapshots.min-retention" = '0d', - "iceberg.remove-orphan-files.min-retention" = '0d', - "fs.native-s3.enabled" = 'true', - "s3.endpoint" = '{server_settings.s3_endpoint}', - "s3.region" = '{server_settings.s3_region}', - "s3.path-style-access" = '{str(server_settings.s3_path_style_access).lower()}', - "s3.aws-access-key" = '{server_settings.s3_access_key}', - "s3.aws-secret-key" = '{server_settings.s3_secret_key}' -) -""" - ) - del trino_catalog_creator - - # Create another connector connected to the new catalog - creds.catalog = warehouse.name - trino_engine = TrinoQueryEngine(creds) - try: - yield trino_engine - finally: - trino_engine.execute(f"drop catalog {warehouse.name}") diff --git a/elt-common/tests/unit_tests/dlt_destinations/pyiceberg/test_helpers.py b/elt-common/tests/unit_tests/dlt_destinations/pyiceberg/test_helpers.py index 67f03584..db14c081 100644 --- a/elt-common/tests/unit_tests/dlt_destinations/pyiceberg/test_helpers.py +++ b/elt-common/tests/unit_tests/dlt_destinations/pyiceberg/test_helpers.py @@ -4,8 +4,6 @@ from elt_common.dlt_destinations.pyiceberg.helpers import ( PARTITION_HINT, PartitionTrBuilder, - RestCatalog, - create_catalog, create_iceberg_schema, create_partition_spec, dlt_type_to_iceberg, @@ -13,6 +11,7 @@ namespace_exists, transforms, ) +from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchNamespaceError as PyIcebergNoSuchNamespaceError import pyiceberg.types import pytest @@ -31,14 +30,6 @@ def dlt_schema() -> PreparedTableSchema: return PreparedTableSchema(name="table_name", columns=columns) -def test_create_catalog_passes_properties_to_catalog_impl(mocker: MockerFixture): - patched_catalog = mocker.patch("elt_common.dlt_destinations.pyiceberg.helpers.RestCatalog") - name, properties = "unit_test_catalog", {"a": 1, "c": 3} - create_catalog(name, **properties) - - patched_catalog.assert_called_once_with(name, **properties) - - def test_namespace_exists_returns_true_if_load_namespace_properties_succeeds(mocker: MockerFixture): mock_catalog = mocker.MagicMock(spec=RestCatalog) mock_catalog.load_namespace_properties.return_value = {"namespace": "analytics"} diff --git a/elt-common/tests/unit_tests/iceberg/__init__.py b/elt-common/tests/unit_tests/iceberg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/elt-common/tests/unit_tests/iceberg/maintenance/__init__.py b/elt-common/tests/unit_tests/iceberg/maintenance/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py b/elt-common/tests/unit_tests/iceberg/maintenance/test_table_maintenance.py similarity index 51% rename from elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py rename to elt-common/tests/unit_tests/iceberg/maintenance/test_table_maintenance.py index f9ff2b01..187e5bf5 100644 --- a/elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py +++ b/elt-common/tests/unit_tests/iceberg/maintenance/test_table_maintenance.py @@ -5,34 +5,10 @@ from click.testing import CliRunner from elt_common.iceberg.trino import TrinoCredentials, TrinoQueryEngine from elt_common.iceberg.maintenance import cli, IcebergTableMaintenaceSql -from elt_common.testing.lakekeeper import RestCatalogWarehouse import pytest from pytest_mock import MockerFixture -TEST_VIEW_PREFIX = "test_view_" - -def create_views(trino_query_engine, namespace: str, query_table: str): - trino_query_engine.execute( - f"create view {namespace}.{TEST_VIEW_PREFIX}0 as (select * from {namespace}.{query_table})" - ) - - -@pytest.mark.requires_trino -def test_trino_query_engine_list_tables_returns_only_iceberg_tables( - warehouse: RestCatalogWarehouse, trino_engine: TrinoQueryEngine -): - with warehouse.create_test_tables( - namespace_count=2, - table_count_per_ns=2, - ): - create_views(trino_engine, "test_ns_0", query_table="test_table_0") - - iceberg_tables = trino_engine.list_iceberg_tables() - assert len(iceberg_tables) == 4 - - -@pytest.mark.requires_trino @pytest.mark.parametrize( "command,command_args", [ @@ -43,29 +19,24 @@ def test_trino_query_engine_list_tables_returns_only_iceberg_tables( ], ) def test_iceberg_maintenance_commands_run_expected_trino_alter_table_command( - warehouse: RestCatalogWarehouse, - trino_engine: TrinoQueryEngine, mocker: MockerFixture, command: str, command_args: Dict[str, str], ): - with warehouse.create_test_tables(namespace_count=1, table_count_per_ns=1, snapshot_count=10): - iceberg_maint = IcebergTableMaintenaceSql(trino_engine) - table_id = trino_engine.list_iceberg_tables()[0] - - trino_execute_spy = mocker.spy(trino_engine, "execute") - getattr(iceberg_maint, command)(table_id, **command_args) - - assert trino_execute_spy.call_count == 1 - expected_cmd_re = re.compile( - rf"^alter table test_ns_0.test_table_0 execute ({command})(.+)?$" - ) - command_match = expected_cmd_re.match(trino_execute_spy.call_args[0][0]) - assert command_match is not None - assert command_match.group(1) == command - if command_args: - for key in command_args.keys(): - assert key in command_match.group(2) + trino_engine = mocker.MagicMock() + iceberg_maint = IcebergTableMaintenaceSql(trino_engine) + table_id = "ns1.table1" + trino_execute_spy = mocker.spy(trino_engine, "execute") + getattr(iceberg_maint, command)(table_id, **command_args) + + assert trino_execute_spy.call_count == 1 + expected_cmd_re = re.compile(rf"^alter table {table_id} execute ({command})(.+)?$") + command_match = expected_cmd_re.match(trino_execute_spy.call_args[0][0]) + assert command_match is not None + assert command_match.group(1) == command + if command_args: + for key in command_args.keys(): + assert key in command_match.group(2) def test_iceberg_maintenance_cli_runs_successfully(mocker: MockerFixture): diff --git a/elt-common/tests/unit_tests/iceberg/test_trino.py b/elt-common/tests/unit_tests/iceberg/test_trino.py new file mode 100644 index 00000000..5af81d49 --- /dev/null +++ b/elt-common/tests/unit_tests/iceberg/test_trino.py @@ -0,0 +1,21 @@ +import re + + +from elt_common.iceberg.trino import TrinoQueryEngine +from pytest_mock import MockerFixture + + +def test_trino_query_engine_list_tables_returns_only_iceberg_tables( + mocker: MockerFixture, +): + mock_creds = mocker.MagicMock() + mocker.patch.object(TrinoQueryEngine, "_create_engine", mocker.MagicMock()) + trino = TrinoQueryEngine(mock_creds) + trino_execute_spy = mocker.spy(trino, "execute") + + trino.list_iceberg_tables() + + assert trino_execute_spy.call_count == 1 + expected_cmd_re = re.compile(r"^select \* from system.iceberg_tables$") + command_match = expected_cmd_re.match(trino_execute_spy.call_args[0][0]) + assert command_match is not None From 49c8ff3ef6ae8cc1ca0c5201834763e6f69ce510 Mon Sep 17 00:00:00 2001 From: Martyn Gigg Date: Tue, 13 Jan 2026 15:26:38 +0000 Subject: [PATCH 5/8] Consolidates GitHub actions tests no none depend on external services --- .github/actions/run-pytest-with-uv/action.yml | 60 ------------------- .github/workflows/elt-common_e2e_tests.yml | 51 ---------------- .github/workflows/elt-common_tests.yml | 46 ++++++++++++++ .github/workflows/elt-common_unit_tests.yml | 33 ---------- 4 files changed, 46 insertions(+), 144 deletions(-) delete mode 100644 .github/actions/run-pytest-with-uv/action.yml delete mode 100644 .github/workflows/elt-common_e2e_tests.yml create mode 100644 .github/workflows/elt-common_tests.yml delete mode 100644 .github/workflows/elt-common_unit_tests.yml diff --git a/.github/actions/run-pytest-with-uv/action.yml b/.github/actions/run-pytest-with-uv/action.yml deleted file mode 100644 index 005586d2..00000000 --- a/.github/actions/run-pytest-with-uv/action.yml +++ /dev/null @@ -1,60 +0,0 @@ -name: Run pytest -description: Run pytest using uv for dependencies management - -inputs: - compose-file-path: - description: If supplied, brings up the services defined in the given docker compose file. - default: "" - pyproject-directory: - description: Path to the directory containing pyproject.toml - required: true - pytest-file-or-dir: - description: Path to a file or directory for pytest execution - required: true - python-version: - description: The version of Python to install. Defaults to 3.13 as 3.14 was released but not all dependencies work. - default: "3.13" - uv-cache-dependency-glob: - description: Cache dependencies based on the supplied glob - uv-version: - description: The version of uv to install e.g., `0.5.0` Defaults to the version in pyproject.toml or 'latest'. - default: "latest" - -runs: - using: composite - steps: - - name: Install uv - uses: astral-sh/setup-uv@v7 - with: - activate-environment: false - cache-dependency-glob: ${{ inputs.uv-cache-dependency-glob }} - version: ${{ inputs.uv-version }} - python-version: ${{ inputs.python-version }} - - - name: Bring up Docker Compose services - if: inputs.compose-file-path != '' - uses: hoverkraft-tech/compose-action@v2.3.0 - with: - compose-file: ${{ inputs.compose-file-path }} - up-flags: --quiet-pull --wait --wait-timeout 300 - down-flags: --volumes --remove-orphans - - - name: Install the project - shell: bash -l {0} - run: uv sync --locked --all-extras --dev - working-directory: ${{ inputs.pyproject-directory }} - - - name: Run tests - shell: bash -l {0} - run: uv run pytest --durations-min=0.5 --exitfirst "${{ inputs.pytest-file-or-dir }}" --cache-clear - working-directory: ${{ inputs.pyproject-directory }} - - - name: Dump Docker Compose logs on failure - if: failure() && inputs.compose-file-path != '' - shell: bash -l {0} - run: | - echo =====Compose services status ===== - docker compose -f "${{ inputs.compose-file-path }}" ps -a - echo - echo ===== Compose logs ===== - docker compose -f "${{ inputs.compose-file-path }}" logs --no-color --tail=200 diff --git a/.github/workflows/elt-common_e2e_tests.yml b/.github/workflows/elt-common_e2e_tests.yml deleted file mode 100644 index 9e489b93..00000000 --- a/.github/workflows/elt-common_e2e_tests.yml +++ /dev/null @@ -1,51 +0,0 @@ -name: End-to-end tests for elt-common package -on: - push: - branches: - - "main" - paths: - - ".github/workflows/elt-common_e2e_tests.yml" - - "ansible-docker/roles/lakekeeper/files/bootstrap-warehouse.py" - - "infra/local/**" - - "elt-common/**" - pull_request: - types: [opened, synchronize, reopened] - paths: - - ".github/workflows/elt-common_e2e_tests.yml" - - "ansible-docker/roles/lakekeeper/files/bootstrap-warehouse.py" - - "infra/local/**" - - "elt-common/**" - -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.run_id }} - cancel-in-progress: true - -env: - TRINO_CATALOG_STORE: memory - -jobs: - test: - name: elt-common end-to-end tests - runs-on: ubuntu-latest - - steps: - - name: Checkout - uses: actions/checkout@v6 - - - name: Add adp-router to /etc/hosts - shell: bash -l {0} - run: | - echo "127.0.0.1 adp-router" | sudo tee -a /etc/hosts - - - name: Run end-to-end tests - uses: ./.github/actions/run-pytest-with-uv - with: - compose-file-path: infra/local/docker-compose.yml - pyproject-directory: elt-common - pytest-file-or-dir: tests/e2e_tests - uv-cache-dependency-glob: elt-common/pyproject.toml - - - name: Remove adp-router from /etc/hosts - shell: bash -l {0} - run: | - sudo sed -i -e '/adp-router/d' /etc/hosts diff --git a/.github/workflows/elt-common_tests.yml b/.github/workflows/elt-common_tests.yml new file mode 100644 index 00000000..87529def --- /dev/null +++ b/.github/workflows/elt-common_tests.yml @@ -0,0 +1,46 @@ +name: Unit tests for elt-common package +on: + push: + branches: + - "main" + paths: + - ".github/workflows/elt-common_tests.yml" + - "elt-common/**" + pull_request: + types: [opened, synchronize, reopened] + paths: + - ".github/workflows/elt-common_tests.yml" + - "elt-common/**" + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.run_id }} + cancel-in-progress: true + +env: + PYTHON_VERSION: 3.13 + +jobs: + test: + name: elt-common unit tests + runs-on: ubuntu-slim + + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Install uv + uses: astral-sh/setup-uv@v7 + with: + activate-environment: false + cache-dependency-glob: elt-common/pyproject.toml + python-version: ${{ env.PYTHON_VERSION }} + + - name: Install the project + shell: bash -l {0} + run: uv sync --locked --all-extras --dev + working-directory: elt-common + + - name: Run tests + shell: bash -l {0} + run: uv run pytest --durations-min=0.5 --exitfirst tests/ --cache-clear + working-directory: elt-common diff --git a/.github/workflows/elt-common_unit_tests.yml b/.github/workflows/elt-common_unit_tests.yml deleted file mode 100644 index d63f5e4b..00000000 --- a/.github/workflows/elt-common_unit_tests.yml +++ /dev/null @@ -1,33 +0,0 @@ -name: Unit tests for elt-common package -on: - push: - branches: - - "main" - paths: - - ".github/workflows/elt-common_unit_tests.yml" - - "elt-common/**" - pull_request: - types: [opened, synchronize, reopened] - paths: - - ".github/workflows/elt-common_unit_tests.yml" - - "elt-common/**" - -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.run_id }} - cancel-in-progress: true - -jobs: - test: - name: elt-common unit tests - runs-on: ubuntu-latest - - steps: - - name: Checkout - uses: actions/checkout@v6 - - - name: Run unit tests - uses: ./.github/actions/run-pytest-with-uv - with: - pyproject-directory: elt-common - pytest-file-or-dir: tests/unit_tests - uv-cache-dependency-glob: elt-common/pyproject.toml From 97921a22ca3fc7a568d5a71b172b1d8d417c5753 Mon Sep 17 00:00:00 2001 From: Martyn Gigg Date: Tue, 13 Jan 2026 15:33:47 +0000 Subject: [PATCH 6/8] Tweak naming of Github actions --- .github/workflows/elt-common_tests.yml | 5 +++-- .github/workflows/warehouses_e2e_tests.yml | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/elt-common_tests.yml b/.github/workflows/elt-common_tests.yml index 87529def..43d096b2 100644 --- a/.github/workflows/elt-common_tests.yml +++ b/.github/workflows/elt-common_tests.yml @@ -1,4 +1,4 @@ -name: Unit tests for elt-common package +name: Tests for elt-common on: push: branches: @@ -18,10 +18,11 @@ concurrency: env: PYTHON_VERSION: 3.13 + UV_VERSION: latest jobs: test: - name: elt-common unit tests + name: test runs-on: ubuntu-slim steps: diff --git a/.github/workflows/warehouses_e2e_tests.yml b/.github/workflows/warehouses_e2e_tests.yml index d5c50727..4a6b309c 100644 --- a/.github/workflows/warehouses_e2e_tests.yml +++ b/.github/workflows/warehouses_e2e_tests.yml @@ -1,4 +1,4 @@ -name: End-to-end tests for extract and load scripts in the warehouse +name: End-to-end extract and load tests on: push: branches: @@ -24,7 +24,7 @@ env: jobs: test: - name: warehouses end-to-end tests + name: test runs-on: ubuntu-latest steps: From 7434d1cf7067af7d4ed1ba5b6e3e20e41cfad6a6 Mon Sep 17 00:00:00 2001 From: Martyn Gigg Date: Tue, 13 Jan 2026 15:44:35 +0000 Subject: [PATCH 7/8] Update elt-common readme testing section --- elt-common/README.md | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/elt-common/README.md b/elt-common/README.md index 09c74cc6..3ad688cd 100644 --- a/elt-common/README.md +++ b/elt-common/README.md @@ -8,7 +8,6 @@ to each pipeline. Development requires the following tools: - [uv](https://docs.astral.uv/uv/): Used to manage both Python installations and dependencies -- [docker & docker compose]: Used for standing up services for end-to-end testing. ### Setting up a Python virtual environment @@ -21,25 +20,13 @@ package in editable mode, along with the development dependencies: > uv pip install --editable . --group dev ``` -## Running unit tests +## Running the tests Run the unit tests using `pytest`: ```bash -> pytest tests/unit_tests +> pytest tests ``` -## Running end-to-end tests - -The end-to-end (e2e) tests for the `pyiceberg` destination require a running Iceberg -rest catalog to test complete functionality. -The local, docker-compose-based configuration provided by -[infra/local/docker-compose.yml](../infra/local/docker-compose.yml) is the easiest way to -spin up a set of services compatible with running the tests. -_Note the requirement to edit `/etc/hosts` described in [infra/local/README](../infra/local/README.md)._ - -Once the compose services are running, execute the e2e tests using `pytest`: - -```bash -> pytest tests/e2e_tests -``` +See ["How to invoke pytest"](https://docs.pytest.org/en/stable/how-to/usage.html#usage) fo +instructions on how to limit the tests that are executed. From e92390ae513fd531d8755a7a8929a277ceb53610 Mon Sep 17 00:00:00 2001 From: Martyn Gigg Date: Tue, 13 Jan 2026 15:49:04 +0000 Subject: [PATCH 8/8] Address issues with possible test instability --- .../src/elt_common/testing/lakekeeper.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/elt-common/src/elt_common/testing/lakekeeper.py b/elt-common/src/elt_common/testing/lakekeeper.py index bebc6481..183454e1 100644 --- a/elt-common/src/elt_common/testing/lakekeeper.py +++ b/elt-common/src/elt_common/testing/lakekeeper.py @@ -13,6 +13,8 @@ from . import DEFAULT_RETRY_ARGS, Endpoint, Settings +DEFAULT_REQUESTS_TIMEOUT = 10.0 + class Server: """Wraps a Lakekeeper instance. It is assumed that the instance is bootstrapped.""" @@ -90,7 +92,7 @@ def _request_with_auth(self, requests_method: Callable, url: Endpoint, **kwargs) """Make a request, adding in the auth token""" headers = kwargs.setdefault("headers", {}) headers.update({"Authorization": f"Bearer {self.access_token}"}) - kwargs.setdefault("timeout", 10.0) + kwargs.setdefault("timeout", DEFAULT_REQUESTS_TIMEOUT) return requests_method(url=str(url), **kwargs) @@ -168,12 +170,7 @@ def purge(self): catalog.drop_namespace(ns) -def token_endpoint(settings: Settings) -> str: - response = requests.get(str(settings.openid_provider_uri + "/.well-known/openid-configuration")) - response.raise_for_status() - return response.json()["token_endpoint"] - - +@tenacity.retry(**DEFAULT_RETRY_ARGS) def access_token(settings: Settings) -> str: response = requests.post( token_endpoint(settings), @@ -183,6 +180,16 @@ def access_token(settings: Settings) -> str: "client_secret": settings.openid_client_secret, "scope": settings.openid_scope, }, + timeout=DEFAULT_REQUESTS_TIMEOUT, ) response.raise_for_status() return response.json()["access_token"] + + +def token_endpoint(settings: Settings) -> str: + response = requests.get( + str(settings.openid_provider_uri + "/.well-known/openid-configuration"), + timeout=DEFAULT_REQUESTS_TIMEOUT, + ) + response.raise_for_status() + return response.json()["token_endpoint"]