diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 3f82f78895..4ad6f8b1c7 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -348,6 +348,32 @@ catalog: | snapshot-loading-mode | refs | The snapshots to return in the body of the metadata. Setting the value to `all` would return the full set of snapshots currently valid for the table. Setting the value to `refs` would load all snapshots referenced by branches or tags. | | `header.X-Iceberg-Access-Delegation` | `vended-credentials` | Signal to the server that the client supports delegated access via a comma-separated list of access mechanisms. The server may choose to supply access via any or none of the requested mechanisms. When using `vended-credentials`, the server provides temporary credentials to the client. When using `remote-signing`, the server signs requests on behalf of the client. (default: `vended-credentials`) | +#### Retry and timeout + +The REST Catalog uses `requests` with no retries and no timeout by default, so transient +5xx / network failures bubble up immediately and slow servers can hang the client indefinitely. +Set a `connection:` block on the catalog to opt in to a per-request timeout and a retry policy. +Every key is optional; when none are set, the default `requests` behavior is preserved. + +```yaml +catalog: + default: + uri: http://rest-catalog/ws/ + connection: + timeout: 60 # seconds, applied to every HTTP call + retries: 5 # number of retry attempts on transient failures + backoff-factor: 1.0 # exponential backoff between retries +``` + +| Key | Example | Description | +| ---------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------ | +| connection.timeout | 60 | Per-request timeout in seconds. Must be a positive number. | +| connection.retries | 5 | Number of retry attempts for transient failures. Must be non-negative. | +| connection.backoff-factor | 1.0 | Backoff factor between retry attempts. Must be non-negative. See [`urllib3` Retry docs](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry) for the formula. | + +Retries are applied to idempotent methods only (`GET`, `HEAD`, `OPTIONS`) and to the +transient HTTP status codes `429`, `500`, `502`, `503`, `504`. Other failures are not retried. + #### Headers in REST Catalog To configure custom headers in REST Catalog, include them in the catalog properties with `header.`. This diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 39954ef561..97143e7a89 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -17,6 +17,7 @@ from __future__ import annotations from collections import deque +from collections.abc import Mapping from enum import Enum from typing import ( TYPE_CHECKING, @@ -25,9 +26,11 @@ from urllib.parse import quote, unquote from pydantic import ConfigDict, Field, TypeAdapter, field_validator -from requests import HTTPError, Session +from requests import HTTPError, PreparedRequest, Response, Session +from requests.adapters import HTTPAdapter from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt from typing_extensions import override +from urllib3.util.retry import Retry from pyiceberg import __version__ from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary @@ -255,6 +258,14 @@ class ScanPlanningMode(Enum): SIGV4_SERVICE = "rest.signing-name" SIGV4_MAX_RETRIES = "rest.sigv4.max-retries" SIGV4_MAX_RETRIES_DEFAULT = 10 +CONNECTION = "connection" +CONNECTION_TIMEOUT = "timeout" +CONNECTION_RETRIES = "retries" +CONNECTION_BACKOFF_FACTOR = "backoff-factor" +# Hard-coded internally so users cannot misconfigure the retry policy +# (e.g. setting raise_on_status=False would swallow 4xx errors silently). +_CONNECTION_RETRY_STATUS_FORCELIST = (429, 500, 502, 503, 504) +_CONNECTION_RETRY_ALLOWED_METHODS = frozenset({"GET", "HEAD", "OPTIONS"}) EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" OAUTH2_SERVER_URI = "oauth2-server-uri" SNAPSHOT_LOADING_MODE = "snapshot-loading-mode" @@ -392,6 +403,89 @@ class ListViewsResponse(IcebergBaseModel): _PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse) +class _RetryTimeoutHTTPAdapter(HTTPAdapter): + """HTTPAdapter that applies a default per-request timeout. + + requests does not provide a way to set a default timeout on a Session; + without this adapter, every call would have to thread `timeout=` through. + The adapter applies `self._timeout` whenever a per-call timeout is not set. + """ + + def __init__(self, timeout: float | None = None, max_retries: Retry | int | None = None) -> None: + self._timeout = timeout + if max_retries is not None: + super().__init__(max_retries=max_retries) + else: + super().__init__() + + def send( + self, + request: PreparedRequest, + stream: bool = False, + timeout: None | float | tuple[float, float] | tuple[float, None] = None, + verify: bool | str = True, + cert: None | bytes | str | tuple[bytes | str, bytes | str] = None, + proxies: Mapping[str, str] | None = None, + ) -> Response: + if timeout is None: + timeout = self._timeout + return super().send(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies) + + +def _create_connection_adapter(properties: Properties) -> _RetryTimeoutHTTPAdapter | None: + """Build a connection adapter from the optional `connection.*` properties. + + Returns None when no `connection` block is supplied, leaving the default + Session behavior unchanged. Raises ValueError on invalid input. + """ + connection_config = properties.get(CONNECTION) + if not connection_config: + return None + if not isinstance(connection_config, dict): + raise ValueError(f"`{CONNECTION}` must be a mapping, got: {type(connection_config).__name__}") + + timeout: float | None = None + if (raw_timeout := connection_config.get(CONNECTION_TIMEOUT)) is not None: + try: + timeout = float(raw_timeout) + except (TypeError, ValueError) as e: + raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a number, got: {raw_timeout!r}") from e + if timeout <= 0: + raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a positive number, got: {timeout}") + + retries: int | None = None + if (raw_retries := connection_config.get(CONNECTION_RETRIES)) is not None: + try: + retries = int(raw_retries) + except (TypeError, ValueError) as e: + raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be an integer, got: {raw_retries!r}") from e + if retries < 0: + raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be non-negative, got: {retries}") + + backoff_factor: float | None = None + if (raw_backoff := connection_config.get(CONNECTION_BACKOFF_FACTOR)) is not None: + try: + backoff_factor = float(raw_backoff) + except (TypeError, ValueError) as e: + raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must be a number, got: {raw_backoff!r}") from e + if backoff_factor < 0: + raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must be non-negative, got: {backoff_factor}") + + max_retries: Retry | None = None + if retries is not None or backoff_factor is not None: + max_retries = Retry( + total=retries if retries is not None else 0, + backoff_factor=backoff_factor if backoff_factor is not None else 0, + status_forcelist=list(_CONNECTION_RETRY_STATUS_FORCELIST), + allowed_methods=_CONNECTION_RETRY_ALLOWED_METHODS, + ) + + if timeout is None and max_retries is None: + return None + + return _RetryTimeoutHTTPAdapter(timeout=timeout, max_retries=max_retries) + + class RestCatalog(Catalog): uri: str _session: Session @@ -418,6 +512,12 @@ def _create_session(self) -> Session: """Create a request session with provided catalog configuration.""" session = Session() + # Mount the retry/timeout adapter when `connection.*` properties are set. + # SigV4's adapter mounted below at `self.uri` is a longer prefix and still wins for that host. + if (connection_adapter := _create_connection_adapter(self.properties)) is not None: + session.mount("http://", connection_adapter) + session.mount("https://", connection_adapter) + # Set HTTP headers self._config_headers(session) @@ -763,8 +863,6 @@ def _init_sigv4(self, session: Session) -> None: import boto3 from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest - from requests import PreparedRequest - from requests.adapters import HTTPAdapter class SigV4Adapter(HTTPAdapter): def __init__(self, **properties: str): diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index df2f96a392..6edeb1cadf 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -32,6 +32,10 @@ import pyiceberg from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog from pyiceberg.catalog.rest import ( + CONNECTION, + CONNECTION_BACKOFF_FACTOR, + CONNECTION_RETRIES, + CONNECTION_TIMEOUT, DEFAULT_ENDPOINTS, EMPTY_BODY_SHA256, OAUTH2_SERVER_URI, @@ -43,6 +47,7 @@ HttpMethod, RestCatalog, ScanPlanningMode, + _RetryTimeoutHTTPAdapter, ) from pyiceberg.exceptions import ( AuthorizationExpiredError, @@ -2019,6 +2024,131 @@ def test_request_session_with_ssl_client_cert() -> None: assert "Could not find the TLS certificate file, invalid path: path_to_client_cert" in str(e.value) +def test_session_without_connection_config_uses_default_adapter(rest_mock: Mocker) -> None: + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + for adapter in catalog._session.adapters.values(): + assert not isinstance(adapter, _RetryTimeoutHTTPAdapter) + + +def test_session_with_connection_timeout_and_retries(rest_mock: Mocker) -> None: + catalog_properties = { + "uri": TEST_URI, + "token": TEST_TOKEN, + CONNECTION: { + CONNECTION_TIMEOUT: 60, + CONNECTION_RETRIES: 5, + CONNECTION_BACKOFF_FACTOR: 1.0, + }, + } + catalog = RestCatalog("rest", **catalog_properties) # type: ignore + + https_adapter = catalog._session.adapters["https://"] + http_adapter = catalog._session.adapters["http://"] + assert isinstance(https_adapter, _RetryTimeoutHTTPAdapter) + assert https_adapter is http_adapter + assert https_adapter._timeout == 60.0 + assert https_adapter.max_retries.total == 5 + assert https_adapter.max_retries.backoff_factor == 1.0 + # Internal retry policy: transient codes and idempotent methods only. + assert https_adapter.max_retries.status_forcelist == [429, 500, 502, 503, 504] + allowed_methods = https_adapter.max_retries.allowed_methods or frozenset() + assert set(allowed_methods) == {"GET", "HEAD", "OPTIONS"} + + +def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None: + catalog_properties = { + "uri": TEST_URI, + "token": TEST_TOKEN, + CONNECTION: {CONNECTION_TIMEOUT: "30"}, + } + catalog = RestCatalog("rest", **catalog_properties) # type: ignore + adapter = catalog._session.adapters["https://"] + assert isinstance(adapter, _RetryTimeoutHTTPAdapter) + assert adapter._timeout == 30.0 + # No retry options set, so no Retry object is configured. + assert adapter.max_retries.total == 0 + + +def test_session_retries_on_transient_5xx_then_succeeds() -> None: + """Three real 503 responses followed by a 200; the catalog should make all four attempts. + + `requests_mock` would replace our HTTPAdapter, bypassing the retry logic we want to exercise, + so this test stands up an actual `http.server` on a loopback port instead. + """ + import json + import threading + from http.server import BaseHTTPRequestHandler, HTTPServer + + state = {"namespace_calls": 0} + config_body = json.dumps( + {"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for endpoint in TEST_SUPPORTED_ENDPOINTS]} + ).encode() + + class _Handler(BaseHTTPRequestHandler): + def do_GET(self) -> None: + if self.path.endswith("/v1/config"): + self._respond(200, config_body) + elif self.path.endswith("/v1/namespaces"): + state["namespace_calls"] += 1 + if state["namespace_calls"] <= 3: + self._respond(503, b"") + else: + self._respond(200, json.dumps({"namespaces": [["foo"]]}).encode()) + else: + self._respond(404, b"") + + def _respond(self, status: int, body: bytes) -> None: + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + if body: + self.wfile.write(body) + + def log_message(self, format: str, *args: Any) -> None: # silence default access logs + pass + + server = HTTPServer(("127.0.0.1", 0), _Handler) + port = server.server_address[1] + server_thread = threading.Thread(target=server.serve_forever, daemon=True) + server_thread.start() + try: + catalog = RestCatalog( + "rest", + **{ # type: ignore + "uri": f"http://127.0.0.1:{port}/", + "token": TEST_TOKEN, + # backoff-factor=0 keeps the test fast; retries=3 covers three 503s + the eventual 200. + CONNECTION: {CONNECTION_RETRIES: 3, CONNECTION_BACKOFF_FACTOR: 0}, + }, + ) + assert catalog.list_namespaces() == [("foo",)] + assert state["namespace_calls"] == 4 + finally: + server.shutdown() + server.server_close() + + +def test_session_with_invalid_connection_timeout_raises(rest_mock: Mocker) -> None: + catalog_properties = { + "uri": TEST_URI, + "token": TEST_TOKEN, + CONNECTION: {CONNECTION_TIMEOUT: -1}, + } + with pytest.raises(ValueError, match="`connection.timeout` must be a positive number"): + RestCatalog("rest", **catalog_properties) # type: ignore + + +def test_session_with_invalid_connection_retries_raises(rest_mock: Mocker) -> None: + catalog_properties = { + "uri": TEST_URI, + "token": TEST_TOKEN, + CONNECTION: {CONNECTION_RETRIES: -1}, + } + with pytest.raises(ValueError, match="`connection.retries` must be non-negative"): + RestCatalog("rest", **catalog_properties) # type: ignore + + def test_rest_catalog_with_basic_auth_type(rest_mock: Mocker) -> None: # Given rest_mock.get(