diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index e096470451..7d1be26cc6 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -32,6 +32,7 @@ CheckLockRequest, EnvironmentContext, FieldSchema, + InvalidObjectException, InvalidOperationException, LockComponent, LockLevel, @@ -83,6 +84,7 @@ ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( + AssertCreate, TableRequirement, TableUpdate, ) @@ -381,6 +383,9 @@ def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None open_client.create_table(hive_table) except AlreadyExistsException as e: raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e + except InvalidObjectException as e: + if "database" in e.message: + raise NoSuchNamespaceError(f"Namespace does not exists: {hive_table.dbName}]") from e def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable: try: @@ -535,6 +540,10 @@ def commit_table( try: hive_table = self._get_hive_table(open_client, database_name, table_name) current_table = self._convert_hive_into_iceberg(hive_table) + + if AssertCreate() in requirements: + raise TableAlreadyExistsError(f"Table already exists: {table_name}") + except NoSuchTableError: hive_table = None current_table = None diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 3b77fd47f0..2f9e573537 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -35,18 +35,11 @@ PropertiesUpdateSummary, ) from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager -from pyiceberg.catalog.rest.response import _handle_non_200_response +from pyiceberg.catalog.rest.response import ErrorHandlers from pyiceberg.exceptions import ( AuthorizationExpiredError, - CommitFailedException, - CommitStateUnknownException, - NamespaceAlreadyExistsError, - NamespaceNotEmptyError, NoSuchIdentifierError, NoSuchNamespaceError, - NoSuchTableError, - NoSuchViewError, - TableAlreadyExistsError, UnauthorizedError, ) from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN @@ -64,6 +57,7 @@ from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids from pyiceberg.table.update import ( + AssertCreate, TableRequirement, TableUpdate, ) @@ -361,7 +355,7 @@ def _fetch_config(self) -> None: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {}) + ErrorHandlers.default_error_handler(exc) config_response = ConfigResponse.model_validate_json(response.text) config = config_response.defaults @@ -519,7 +513,7 @@ def _create_table( try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {409: TableAlreadyExistsError, 404: NoSuchNamespaceError}) + ErrorHandlers.table_error_handler(exc) return TableResponse.model_validate_json(response.text) @retry(**_RETRY_ARGS) @@ -592,7 +586,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) - try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {409: TableAlreadyExistsError}) + ErrorHandlers.table_error_handler(exc) table_response = TableResponse.model_validate_json(response.text) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @@ -605,7 +599,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.namespace_error_handler(exc) return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers] @retry(**_RETRY_ARGS) @@ -623,7 +617,7 @@ def load_table(self, identifier: str | Identifier) -> Table: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError}) + ErrorHandlers.table_error_handler(exc) table_response = TableResponse.model_validate_json(response.text) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @@ -637,7 +631,7 @@ def drop_table(self, identifier: str | Identifier, purge_requested: bool = False try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError}) + ErrorHandlers.table_error_handler(exc) @retry(**_RETRY_ARGS) def purge_table(self, identifier: str | Identifier) -> None: @@ -663,7 +657,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError, 409: TableAlreadyExistsError}) + ErrorHandlers.table_error_handler(exc) return self.load_table(to_identifier) @@ -686,7 +680,7 @@ def list_views(self, namespace: str | Identifier) -> list[Identifier]: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.view_error_handler(exc) return [(*view.namespace, view.name) for view in ListViewsResponse.model_validate_json(response.text).identifiers] @retry(**_RETRY_ARGS) @@ -724,15 +718,10 @@ def commit_table( try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response( - exc, - { - 409: CommitFailedException, - 500: CommitStateUnknownException, - 502: CommitStateUnknownException, - 504: CommitStateUnknownException, - }, - ) + if AssertCreate() in requirements: + ErrorHandlers.table_error_handler(exc) + else: + ErrorHandlers.commit_error_handler(exc) return CommitTableResponse.model_validate_json(response.text) @retry(**_RETRY_ARGS) @@ -743,7 +732,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties = try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {409: NamespaceAlreadyExistsError}) + ErrorHandlers.namespace_error_handler(exc) @retry(**_RETRY_ARGS) def drop_namespace(self, namespace: str | Identifier) -> None: @@ -753,7 +742,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) + ErrorHandlers.drop_namespace_error_handler(exc) @retry(**_RETRY_ARGS) def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: @@ -768,7 +757,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.namespace_error_handler(exc) return ListNamespaceResponse.model_validate_json(response.text).namespaces @@ -780,7 +769,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.namespace_error_handler(exc) return NamespaceResponse.model_validate_json(response.text).properties @@ -795,7 +784,7 @@ def update_namespace_properties( try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + ErrorHandlers.namespace_error_handler(exc) parsed_response = UpdateNamespacePropertiesResponse.model_validate_json(response.text) return PropertiesUpdateSummary( removed=parsed_response.removed, @@ -817,7 +806,7 @@ def namespace_exists(self, namespace: str | Identifier) -> bool: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {}) + ErrorHandlers.namespace_error_handler(exc) return False @@ -843,7 +832,7 @@ def table_exists(self, identifier: str | Identifier) -> bool: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {}) + ErrorHandlers.table_error_handler(exc) return False @@ -868,7 +857,7 @@ def view_exists(self, identifier: str | Identifier) -> bool: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {}) + ErrorHandlers.view_error_handler(exc) return False @@ -880,7 +869,7 @@ def drop_view(self, identifier: str) -> None: try: response.raise_for_status() except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchViewError}) + ErrorHandlers.view_error_handler(exc) def close(self) -> None: """Close the catalog and release Session connection adapters. diff --git a/pyiceberg/catalog/rest/response.py b/pyiceberg/catalog/rest/response.py index 157e4bfa16..4d0160bb7b 100644 --- a/pyiceberg/catalog/rest/response.py +++ b/pyiceberg/catalog/rest/response.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. from json import JSONDecodeError -from typing import Literal +from typing import Literal, TypeAlias from pydantic import Field, ValidationError from requests import HTTPError @@ -23,12 +23,21 @@ from pyiceberg.exceptions import ( AuthorizationExpiredError, BadRequestError, + CommitFailedException, + CommitStateUnknownException, ForbiddenError, + NamespaceAlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + NoSuchViewError, OAuthError, RESTError, ServerError, ServiceUnavailableError, + TableAlreadyExistsError, UnauthorizedError, + ViewAlreadyExistsError, ) from pyiceberg.typedef import IcebergBaseModel @@ -60,33 +69,92 @@ class OAuthErrorResponse(IcebergBaseModel): error_uri: str | None = None -def _handle_non_200_response(exc: HTTPError, error_handler: dict[int, type[Exception]]) -> None: +_ErrorHandler: TypeAlias = dict[int, type[Exception]] + + +class ErrorHandlers: + """ + Utility class providing static methods to handle HTTP errors for table, namespace, and view operations. + + Maps HTTP error responses to appropriate custom exceptions, ensuring consistent error handling. + """ + + @staticmethod + def default_error_handler(exc: HTTPError) -> None: + _handle_non_200_response(exc, {}) + + @staticmethod + def namespace_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = { + 400: BadRequestError, + 404: NoSuchNamespaceError, + 409: NamespaceAlreadyExistsError, + 422: RESTError, + } + + if "NamespaceNotEmpty" in exc.response.text: + handler[400] = NamespaceNotEmptyError + + _handle_non_200_response(exc, handler) + + @staticmethod + def drop_namespace_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError} + + _handle_non_200_response(exc, handler) + + @staticmethod + def table_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = {404: NoSuchTableError, 409: TableAlreadyExistsError} + + if "NoSuchNamespace" in exc.response.text: + handler[404] = NoSuchNamespaceError + + _handle_non_200_response(exc, handler) + + @staticmethod + def commit_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = { + 404: NoSuchTableError, + 409: CommitFailedException, + 500: CommitStateUnknownException, + 502: CommitStateUnknownException, + 503: CommitStateUnknownException, + 504: CommitStateUnknownException, + } + + _handle_non_200_response(exc, handler) + + @staticmethod + def view_error_handler(exc: HTTPError) -> None: + handler: _ErrorHandler = {404: NoSuchViewError, 409: ViewAlreadyExistsError} + + if "NoSuchNamespace" in exc.response.text: + handler[404] = NoSuchNamespaceError + + _handle_non_200_response(exc, handler) + + +def _handle_non_200_response(exc: HTTPError, handler: _ErrorHandler) -> None: exception: type[Exception] if exc.response is None: raise ValueError("Did not receive a response") code = exc.response.status_code - if code in error_handler: - exception = error_handler[code] - elif code == 400: - exception = BadRequestError - elif code == 401: - exception = UnauthorizedError - elif code == 403: - exception = ForbiddenError - elif code == 422: - exception = RESTError - elif code == 419: - exception = AuthorizationExpiredError - elif code == 501: - exception = NotImplementedError - elif code == 503: - exception = ServiceUnavailableError - elif 500 <= code < 600: - exception = ServerError - else: - exception = RESTError + + default_handler: _ErrorHandler = { + 400: BadRequestError, + 401: UnauthorizedError, + 403: ForbiddenError, + 419: AuthorizationExpiredError, + 422: RESTError, + 501: NotImplementedError, + 503: ServiceUnavailableError, + } + + # Merge handler passed with default handler map, if no match exception will be ServerError or RESTError + exception = handler.get(code, default_handler.get(code, ServerError if 500 <= code < 600 else RESTError)) try: if exception == OAuthError: diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 2b6fa74517..e14674f50c 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -63,6 +63,7 @@ from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( + AssertCreate, TableRequirement, TableUpdate, ) @@ -423,6 +424,10 @@ def commit_table( current_table: Table | None try: current_table = self.load_table(table_identifier) + + if AssertCreate() in requirements: + raise TableAlreadyExistsError(f"Table already exists: {table_identifier}") + except NoSuchTableError: current_table = None diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index c80f104e46..d525ac7219 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -44,6 +44,10 @@ class NoSuchViewError(Exception): """Raises when the view can't be found in the REST catalog.""" +class ViewAlreadyExistsError(Exception): + """Raised when creating a view with a name that already exists.""" + + class NoSuchIdentifierError(Exception): """Raises when the identifier can't be found in the REST catalog.""" diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index b8bee00225..44c6fccb20 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -943,7 +943,7 @@ def test_load_table_404(rest_mock: Mocker) -> None: json={ "error": { "message": "Table does not exist: examples.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", - "type": "NoSuchNamespaceErrorException", + "type": "NoSuchTableError", "code": 404, } }, @@ -956,6 +956,25 @@ def test_load_table_404(rest_mock: Mocker) -> None: assert "Table does not exist" in str(e.value) +def test_load_table_404_non_existent_namespace(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists", + json={ + "error": { + "message": "Table does not exist: examples.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", + "type": "NoSuchNamespaceError", + "code": 404, + } + }, + status_code=404, + request_headers=TEST_HEADERS, + ) + + with pytest.raises(NoSuchNamespaceError) as e: + RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).load_table(("fokko", "does_not_exists")) + assert "Table does not exist" in str(e.value) + + def test_table_exists_200(rest_mock: Mocker) -> None: rest_mock.head( f"{TEST_URI}v1/namespaces/fokko/tables/table", @@ -1004,7 +1023,7 @@ def test_drop_table_404(rest_mock: Mocker) -> None: json={ "error": { "message": "Table does not exist: fokko.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", - "type": "NoSuchNamespaceErrorException", + "type": "NoSuchTableError", "code": 404, } }, @@ -1017,6 +1036,25 @@ def test_drop_table_404(rest_mock: Mocker) -> None: assert "Table does not exist" in str(e.value) +def test_drop_table_404_non_existent_namespace(rest_mock: Mocker) -> None: + rest_mock.delete( + f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists", + json={ + "error": { + "message": "Table does not exist: fokko.does_not_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", + "type": "NoSuchNamespaceErrorException", + "code": 404, + } + }, + status_code=404, + request_headers=TEST_HEADERS, + ) + + with pytest.raises(NoSuchNamespaceError) as e: + RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(("fokko", "does_not_exists")) + assert "Table does not exist" in str(e.value) + + def test_create_table_200( rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: dict[str, Any] ) -> None: diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 0c77666568..e4a5a78f93 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -401,7 +401,7 @@ def test_concurrent_create_transaction(test_catalog: Catalog, test_schema: Schem assert not test_catalog.table_exists(identifier) test_catalog.create_table(identifier, test_schema) - with pytest.raises(CommitFailedException): + with pytest.raises(TableAlreadyExistsError): table.commit_transaction() @@ -601,3 +601,31 @@ def test_register_table_existing(test_catalog: Catalog, table_schema_nested: Sch # Assert that registering the table again raises TableAlreadyExistsError with pytest.raises(TableAlreadyExistsError): test_catalog.register_table(identifier, metadata_location=table.metadata_location) + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_register_table_non_existing_namespace(test_catalog: Catalog, table_name: str, database_name: str) -> None: + identifier = (database_name, table_name) + + test_catalog.create_namespace_if_not_exists(database_name) + + table = test_catalog.create_table(identifier=identifier, schema=Schema()) + + assert test_catalog.table_exists(identifier) + + metadata_location = table.metadata_location + test_catalog.drop_table(identifier) + assert not test_catalog.table_exists(identifier) + + with pytest.raises(NoSuchNamespaceError): + test_catalog.register_table(("nonexistent", table_name), metadata_location=metadata_location) + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_create_table_non_existing_namespace(test_catalog: Catalog, table_name: str) -> None: + identifier = ("nonexisting", table_name) + + with pytest.raises(NoSuchNamespaceError): + test_catalog.create_table(identifier, schema=Schema())