diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 802be28565..1bc5052666 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -40,6 +40,7 @@ PlanSubmitted, PlanTableScanRequest, ScanTasks, + StorageCredential, ) from pyiceberg.exceptions import ( AuthorizationExpiredError, @@ -256,6 +257,7 @@ class TableResponse(IcebergBaseModel): metadata_location: str | None = Field(alias="metadata-location", default=None) metadata: TableMetadata config: Properties = Field(default_factory=dict) + storage_credentials: list[StorageCredential] | None = Field(alias="storage-credentials", default=None) class CreateTableRequest(IcebergBaseModel): @@ -728,13 +730,40 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin session.mount(self.uri, SigV4Adapter(**self.properties)) + @staticmethod + def _get_credentials( + storage_credentials: list[StorageCredential] | None, + config: Properties, + metadata_location: str | None, + table_location: str | None, + ) -> Properties: + if not storage_credentials: + return config + target = metadata_location or table_location + if not target: + return config + matching = [sc for sc in storage_credentials if target.startswith(sc.prefix)] + if not matching: + return config + selected = max(matching, key=lambda sc: len(sc.prefix)) + return selected.config + def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table: return Table( identifier=identifier_tuple, metadata_location=table_response.metadata_location, # type: ignore metadata=table_response.metadata, io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + { + **table_response.metadata.properties, + **self._get_credentials( + table_response.storage_credentials, + table_response.config, + table_response.metadata_location, + getattr(table_response.metadata, "location", None), + ), + }, + table_response.metadata_location, ), catalog=self, config=table_response.config, @@ -746,7 +775,16 @@ def _response_to_staged_table(self, identifier_tuple: tuple[str, ...], table_res metadata_location=table_response.metadata_location, # type: ignore metadata=table_response.metadata, io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + { + **table_response.metadata.properties, + **self._get_credentials( + table_response.storage_credentials, + table_response.config, + table_response.metadata_location, + getattr(table_response.metadata, "location", None), + ), + }, + table_response.metadata_location, ), catalog=self, ) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 9fb1fa9af5..ecb9782dc9 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -2351,3 +2351,74 @@ def test_table_uuid_check_on_refresh(rest_mock: Mocker, example_table_metadata_v assert "Table UUID does not match" in str(exc_info.value) assert f"current={original_uuid}" in str(exc_info.value) assert f"refreshed={different_uuid}" in str(exc_info.value) + + +def test_storage_credentials_over_config( + rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any] +) -> None: + response_with_storage_creds = { + **example_table_metadata_with_snapshot_v1_rest_json, + "storage-credentials": [ + { + "prefix": "s3://warehouse/", + "config": { + "s3.access-key-id": "storage-cred-key", + "s3.secret-access-key": "storage-cred-secret", + }, + } + ], + } + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json=response_with_storage_creds, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("fokko", "table")) + assert table.io.properties["s3.access-key-id"] == "storage-cred-key" + assert table.io.properties["s3.secret-access-key"] == "storage-cred-secret" + + +def test_config_when_no_storage_credentials( + rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any] +) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json=example_table_metadata_with_snapshot_v1_rest_json, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("fokko", "table")) + # config from the fixture should be used since there are no storage-credentials + assert table.io.properties["region"] == "us-west-2" + + +def test_storage_credentials_no_prefix_match() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + creds = [StorageCredential(prefix="s3://other-bucket/", config={"key": "val"})] + result = RestCatalog._get_credentials( + storage_credentials=creds, + config={"fallback-key": "fallback-val"}, + metadata_location="s3://warehouse/database/table/metadata/file.json", + table_location="s3://warehouse/database/table", + ) + assert result == {"fallback-key": "fallback-val"} + + +def test_storage_credentials_longest_prefix_wins() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + creds = [ + StorageCredential(prefix="s3://warehouse/", config={"key": "short-prefix"}), + StorageCredential(prefix="s3://warehouse/database/table/", config={"key": "long-prefix"}), + ] + result = RestCatalog._get_credentials( + storage_credentials=creds, + config={"key": "fallback"}, + metadata_location="s3://warehouse/database/table/metadata/file.json", + table_location="s3://warehouse/database/table", + ) + assert result == {"key": "long-prefix"}