From ccc25b498c698401f1ffae7df52ef62591649441 Mon Sep 17 00:00:00 2001 From: NaluTripician Date: Thu, 14 May 2026 13:42:12 -0700 Subject: [PATCH 1/4] =?UTF-8?q?feat(cosmos):=20hedging=20detection=20API?= =?UTF-8?q?=20(Option=20B)=20=E2=80=94=20types=20+=20state=20+=20dispatch?= =?UTF-8?q?=20wiring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Phase 2 Priorities 1–3 of the cross-SDK hedging detection API work item (azure-sdk-for-python#46899). Adds three accessor methods — `is_hedging_started()`, `get_requested_regions()`, `get_responded_regions()` — to the five wrapper / exception types (`CosmosDict`, `CosmosList`, `CosmosItemPaged`, `CosmosAsyncItemPaged`, `CosmosHttpResponseError`, `CosmosBatchOperationError`, `CosmosClientTimeoutError`), backed by a shared private `_HedgingDetectionState`. New public types: * `azure.cosmos.RequestedRegion` (frozen slots dataclass) * `azure.cosmos.RequestedRegionReason` (non-exhaustive Enum with `_missing_` → `UNKNOWN` for forward compatibility — SE-016) Dispatch-site instrumentation: * INITIAL recorded inside the hedging handler arm body for index 0 (sync + async); INITIAL recorded at SynchronizedRequest entry for the non-hedged path. * HEDGING recorded inside the hedge-arm body AFTER the threshold delay and AFTER the cancellation check — pre-delay cancellation produces no phantom entry (AC10 / spec §12 ''no phantom entries''). * OPERATION_RETRY / REGION_FAILOVER recorded in `_retry_utility(_async).Execute` when the retry policy returns True; attached to exceptions before re-raise so error-path consumers see the full timeline. Closure-argument pattern (SE-002): the state flows through `execute_with_hedging` as an explicit `hedging_state` parameter (NOT on `request_params` — the deepcopy at line 96 of the handler would silently swallow child appends). Verified by AC8 regression test (to follow). Sync↔async parity (SE-004 / M11): every sync code site has a paired async modification. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azure-cosmos/azure/cosmos/__init__.py | 5 +- .../cosmos/_availability_strategy_handler.py | 62 ++++- .../azure/cosmos/_cosmos_responses.py | 97 +++++++- .../azure-cosmos/azure/cosmos/_diagnostics.py | 229 ++++++++++++++++++ .../azure/cosmos/_diagnostics_types.py | 114 +++++++++ .../azure/cosmos/_retry_utility.py | 82 +++++++ .../azure/cosmos/_synchronized_request.py | 75 +++++- ...nchronous_availability_strategy_handler.py | 56 ++++- .../azure/cosmos/aio/_asynchronous_request.py | 41 +++- .../azure/cosmos/aio/_retry_utility_async.py | 13 +- .../azure-cosmos/azure/cosmos/exceptions.py | 10 +- 11 files changed, 749 insertions(+), 35 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics.py create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics_types.py diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py b/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py index d7501df99558..609222251e19 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py @@ -21,6 +21,7 @@ from ._version import VERSION from ._cosmos_responses import CosmosDict, CosmosList +from ._diagnostics_types import RequestedRegion, RequestedRegionReason from ._retry_utility import ConnectionRetryPolicy from .container import ContainerProxy from .cosmos_client import CosmosClient @@ -66,6 +67,8 @@ "ConnectionRetryPolicy", "ThroughputProperties", "CosmosDict", - "CosmosList" + "CosmosList", + "RequestedRegion", + "RequestedRegionReason", ) __version__ = VERSION diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_handler.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_handler.py index bb2d905f19c8..16b9998dac08 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_handler.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_handler.py @@ -31,6 +31,8 @@ from azure.core.pipeline.transport import HttpRequest # pylint: disable=no-legacy-azure-core-http-response-import from ._availability_strategy_handler_base import AvailabilityStrategyHandlerMixin +from ._diagnostics import _HedgingDetectionState +from ._diagnostics_types import RequestedRegionReason from ._global_partition_endpoint_manager_circuit_breaker import _GlobalPartitionEndpointManagerForCircuitBreaker from ._request_object import RequestObject @@ -50,7 +52,8 @@ def execute_single_request_with_delay( location_index: int, available_locations: List[str], complete_status: Event, - first_request_params_holder: SimpleNamespace + first_request_params_holder: SimpleNamespace, + hedging_state: Optional[_HedgingDetectionState] = None, ) -> ResponseType: """Execute a single request. @@ -68,6 +71,10 @@ def execute_single_request_with_delay( :type complete_status: threading.Event :param first_request_params_holder: A value holder for request object for first/initial request :type first_request_params_holder: SimpleNamespace + :param hedging_state: Per-operation hedging detection state, passed as a + closure argument (NOT on ``request_params`` — see SE-002; the + deepcopy at line 96 below would silently swallow child appends). + :type hedging_state: Optional[_HedgingDetectionState] :returns: Response tuple :rtype: ResponseType """ @@ -92,6 +99,16 @@ def execute_single_request_with_delay( if delay > 0: time.sleep(delay / 1000) + # ---- Hedging-detection recording (Option B) --------------------- # + # IMPORTANT: this append happens AFTER the threshold delay sleep AND + # AFTER the cancellation check below. The contract is "dispatched, not + # necessarily wire-issued" — see SE-013 and public spec §5.3 docstring + # on ``get_requested_regions``. A hedge-arm task that is cancelled + # before its delay elapses unwinds out of ``time.sleep`` (or the + # subsequent cancellation check) before reaching this line, so no + # phantom HEDGING entry is recorded (AC10). + # ----------------------------------------------------------------- # + # Create request parameters for this location params = copy.deepcopy(request_params) params.is_hedging_request = location_index > 0 @@ -111,14 +128,36 @@ def execute_single_request_with_delay( if complete_status.is_set(): raise CancelledError("The request has been cancelled") - return execute_request_fn(params, req) + # Record dispatch intent now that we have committed to issuing + # (post-delay, post-cancellation-check). ``INITIAL`` for index 0, + # ``HEDGING`` for hedge arms (index > 0). + if hedging_state is not None and 0 <= location_index < len(available_locations): + region_name = available_locations[location_index] + reason = ( + RequestedRegionReason.INITIAL if location_index == 0 + else RequestedRegionReason.HEDGING + ) + hedging_state._record_request(region_name, reason) # pylint: disable=protected-access + + try: + result = execute_request_fn(params, req) + except Exception: + raise + # Record responding region on success. Exceptions still reach the + # parent ``execute_request`` which decides which arm wins; per-arm + # error responses are recorded by the retry utility before the + # exception propagates here. + if hedging_state is not None and 0 <= location_index < len(available_locations): + hedging_state._record_response(available_locations[location_index]) # pylint: disable=protected-access + return result def execute_request( self, request_params: RequestObject, global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreaker, request: HttpRequest, - execute_request_fn: Callable[..., ResponseType] + execute_request_fn: Callable[..., ResponseType], + hedging_state: Optional[_HedgingDetectionState] = None, ) -> ResponseType: """Execute request with cross-region hedging strategy. @@ -130,6 +169,9 @@ def execute_request( :type request: HttpRequest :param execute_request_fn: Function to execute the actual request :type execute_request_fn: Callable[..., ResponseType] + :param hedging_state: Per-operation hedging detection state, passed as a + closure argument (NOT on ``request_params`` — see SE-002). + :type hedging_state: Optional[_HedgingDetectionState] :returns: A tuple containing the response data and headers :rtype: Tuple[Dict[str, Any], Dict[str, Any]] :raises: Exception from first request if all requests fail with transient errors @@ -155,7 +197,8 @@ def execute_request( location_index=i, available_locations=available_locations, complete_status=completion_status, - first_request_params_holder=first_request_params_holder + first_request_params_holder=first_request_params_holder, + hedging_state=hedging_state, ) futures.append(future) if i == 0: @@ -211,7 +254,8 @@ def execute_with_hedging( request_params: RequestObject, global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreaker, request: HttpRequest, - execute_request_fn: Callable[..., ResponseType] + execute_request_fn: Callable[..., ResponseType], + hedging_state: Optional[_HedgingDetectionState] = None, ) -> ResponseType: """Execute a request with hedging based on the availability strategy. @@ -223,6 +267,11 @@ def execute_with_hedging( :type request: HttpRequest :param execute_request_fn: Function to execute the actual request :type execute_request_fn: Callable[..., ResponseType] + :param hedging_state: Per-operation hedging detection state, passed as a + closure argument (NOT on ``request_params`` — see SE-002). When ``None`` + no diagnostics are recorded; the hot path is unchanged for the + diagnostics-disabled case. + :type hedging_state: Optional[_HedgingDetectionState] :returns: A tuple containing the response data and headers :rtype: Tuple[Dict[str, Any], Dict[str, Any]] :raises: Any exceptions raised by the hedging handler's execute_request method @@ -231,5 +280,6 @@ def execute_with_hedging( request_params, global_endpoint_manager, request, - execute_request_fn + execute_request_fn, + hedging_state=hedging_state, ) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py index df62d6d14e4f..708a3c28d44c 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py @@ -7,18 +7,30 @@ from azure.core.paging import ItemPaged from azure.core.utils import CaseInsensitiveDict +from ._diagnostics import _HedgingDetectionAccessorsMixin, _pop_state_from_headers -class CosmosItemPaged(ItemPaged[dict[str, Any]]): + +class CosmosItemPaged(_HedgingDetectionAccessorsMixin, ItemPaged[dict[str, Any]]): """A custom ItemPaged class that provides access to response headers from query operations. This class wraps the standard ItemPaged and provides thread-safe access to response headers captured during pagination via a shared list populated by __QueryFeed. + + It also exposes three hedging-detection accessors inherited from + :class:`~azure.cosmos._diagnostics._HedgingDetectionAccessorsMixin`: + :meth:`is_hedging_started`, :meth:`get_requested_regions`, and + :meth:`get_responded_regions`. For paged operations the accessors reflect + the **most recently fetched** page; pre-fetch they return safe defaults + (``False`` / empty tuples). """ def __init__(self, *args: Any, **kwargs: Any) -> None: self._response_headers_list: Optional[List[CaseInsensitiveDict]] = kwargs.pop('response_headers_list', None) super().__init__(*args, **kwargs) self._query_iterable: Optional[Any] = None + # Hedging-detection state — refreshed as pages are fetched (see + # ``get_response_headers`` below); None until the first page lands. + self._hedging_state = None def by_page(self, continuation_token: Optional[str] = None) -> Iterator[Iterator[dict[str, Any]]]: """Get an iterator of pages of objects. @@ -41,7 +53,11 @@ def get_response_headers(self) -> List[CaseInsensitiveDict]: :rtype: List[~azure.core.utils.CaseInsensitiveDict] """ if self._response_headers_list is not None: - return [h.copy() for h in self._response_headers_list] + # Refresh latest hedging state from the most recent page (if attached), + # then return defensive copies with the sentinel key stripped so + # customers never see the private state on the headers dict. + self._refresh_hedging_state_from_pages() + return [self._copy_headers_stripped(h) for h in self._response_headers_list] return [] def get_last_response_headers(self) -> CaseInsensitiveDict: @@ -51,21 +67,53 @@ def get_last_response_headers(self) -> CaseInsensitiveDict: :rtype: ~azure.core.utils.CaseInsensitiveDict """ if self._response_headers_list and len(self._response_headers_list) > 0: - return self._response_headers_list[-1].copy() + self._refresh_hedging_state_from_pages() + return self._copy_headers_stripped(self._response_headers_list[-1]) return CaseInsensitiveDict() - -class CosmosAsyncItemPaged(AsyncItemPaged[dict[str, Any]]): + def _refresh_hedging_state_from_pages(self) -> None: + """Internal: scan pages from newest to oldest and update + ``self._hedging_state`` with the most recent attached state, if any. + Does not mutate the underlying headers dicts.""" + if not self._response_headers_list: + return + for h in reversed(self._response_headers_list): + from ._diagnostics import HEDGING_STATE_HEADER_KEY + state = h.get(HEDGING_STATE_HEADER_KEY) if h is not None else None + if state is not None: + self._hedging_state = state + return + + @staticmethod + def _copy_headers_stripped(headers: Optional[CaseInsensitiveDict]) -> CaseInsensitiveDict: + """Return a copy of ``headers`` with the private hedging-state sentinel + key removed so customer code never sees it.""" + if headers is None: + return CaseInsensitiveDict() + from ._diagnostics import HEDGING_STATE_HEADER_KEY + copied = headers.copy() + try: + copied.pop(HEDGING_STATE_HEADER_KEY, None) + except (TypeError, AttributeError): # pragma: no cover + pass + return copied + + +class CosmosAsyncItemPaged(_HedgingDetectionAccessorsMixin, AsyncItemPaged[dict[str, Any]]): """A custom AsyncItemPaged class that provides access to response headers from async query operations. This class wraps the standard AsyncItemPaged and provides thread-safe access to response headers captured during pagination via a shared list populated by __QueryFeed. + + Also exposes the three hedging-detection accessors inherited from + :class:`~azure.cosmos._diagnostics._HedgingDetectionAccessorsMixin`. """ def __init__(self, *args: Any, **kwargs: Any) -> None: self._response_headers_list: Optional[List[CaseInsensitiveDict]] = kwargs.pop('response_headers_list', None) super().__init__(*args, **kwargs) self._query_iterable: Optional[Any] = None + self._hedging_state = None def by_page(self, continuation_token: Optional[str] = None) -> AsyncIterator[AsyncIterator[dict[str, Any]]]: """Get an async iterator of pages of objects. @@ -88,7 +136,8 @@ def get_response_headers(self) -> List[CaseInsensitiveDict]: :rtype: List[~azure.core.utils.CaseInsensitiveDict] """ if self._response_headers_list is not None: - return [h.copy() for h in self._response_headers_list] + self._refresh_hedging_state_from_pages() + return [self._copy_headers_stripped(h) for h in self._response_headers_list] return [] def get_last_response_headers(self) -> CaseInsensitiveDict: @@ -98,15 +147,42 @@ def get_last_response_headers(self) -> CaseInsensitiveDict: :rtype: ~azure.core.utils.CaseInsensitiveDict """ if self._response_headers_list and len(self._response_headers_list) > 0: - return self._response_headers_list[-1].copy() + self._refresh_hedging_state_from_pages() + return self._copy_headers_stripped(self._response_headers_list[-1]) return CaseInsensitiveDict() - -class CosmosDict(dict[str, Any]): + def _refresh_hedging_state_from_pages(self) -> None: + if not self._response_headers_list: + return + for h in reversed(self._response_headers_list): + from ._diagnostics import HEDGING_STATE_HEADER_KEY + state = h.get(HEDGING_STATE_HEADER_KEY) if h is not None else None + if state is not None: + self._hedging_state = state + return + + @staticmethod + def _copy_headers_stripped(headers: Optional[CaseInsensitiveDict]) -> CaseInsensitiveDict: + if headers is None: + return CaseInsensitiveDict() + from ._diagnostics import HEDGING_STATE_HEADER_KEY + copied = headers.copy() + try: + copied.pop(HEDGING_STATE_HEADER_KEY, None) + except (TypeError, AttributeError): # pragma: no cover + pass + return copied + + +class CosmosDict(_HedgingDetectionAccessorsMixin, dict[str, Any]): def __init__(self, original_dict: Optional[Mapping[str, Any]], /, *, response_headers: CaseInsensitiveDict) -> None: if original_dict is None: original_dict = {} super().__init__(original_dict) + # Pull the hedging-detection state off the headers dict (if attached + # by the orchestrator) before storing the headers, so customers never + # see the private sentinel via ``get_response_headers()``. + self._hedging_state = _pop_state_from_headers(response_headers) self._response_headers = response_headers def get_response_headers(self) -> CaseInsensitiveDict: @@ -118,12 +194,13 @@ def get_response_headers(self) -> CaseInsensitiveDict: return self._response_headers.copy() -class CosmosList(list[dict[str, Any]]): +class CosmosList(_HedgingDetectionAccessorsMixin, list[dict[str, Any]]): def __init__(self, original_list: Optional[Iterable[dict[str, Any]]], /, *, response_headers: CaseInsensitiveDict) -> None: if original_list is None: original_list = [] super().__init__(original_list) + self._hedging_state = _pop_state_from_headers(response_headers) self._response_headers = response_headers def get_response_headers(self) -> CaseInsensitiveDict: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics.py new file mode 100644 index 000000000000..4b843bf85c75 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics.py @@ -0,0 +1,229 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Module-private backing state for the hedging-detection API. + +A single :class:`_HedgingDetectionState` is constructed per operation by the +synchronous and asynchronous orchestrators (``_synchronized_request`` / +``aio/_asynchronous_request``). The same instance is then: + +* threaded into the hedging handler via a **closure argument** + (NOT through :class:`~azure.cosmos._request_object.RequestObject`; the + deepcopy at ``_availability_strategy_handler.py:96`` would silently swallow + child appends — see SE-002), +* threaded into ``_retry_utility.Execute`` via ``**kwargs``, +* attached to successful response wrappers (``CosmosDict``, ``CosmosList``, + ``CosmosItemPaged``, ``CosmosAsyncItemPaged``) via a private sentinel key + on the response-headers dict that is popped in each wrapper's ``__init__``, +* attached to exception types (``CosmosHttpResponseError``, + ``CosmosBatchOperationError``, ``CosmosClientTimeoutError``) via the + private ``_hedging_state`` attribute before the exception is re-raised. + +All mutations are guarded by a ``threading.Lock``. CPython's GIL covers +``list.append`` but the lock is required to make the ``HEDGING`` reason ↔ +``_hedging_started`` flag compound update atomic from any reader's perspective +(SE-017). Snapshots returned by the public accessors are taken under the lock +and returned as tuples so callers cannot mutate the internal lists. +""" + +import threading +from typing import Tuple + +from ._diagnostics_types import RequestedRegion, RequestedRegionReason + +# Module-private sentinel key used to stash a ``_HedgingDetectionState`` on a +# response-headers ``CaseInsensitiveDict`` so the value flows from the +# synchronized request back up to the wrapper construction site +# (``CosmosDict``, ``CosmosList``, ``CosmosItemPaged``, etc.) without +# requiring every intermediary signature to change. Each wrapper's +# ``__init__`` pops this key, so customer code that inspects +# ``get_response_headers()`` never sees it. +# +# Double-underscore-bracketed name guarantees no collision with any real HTTP +# header (header field-names per RFC 7230 cannot contain ``_``). +HEDGING_STATE_HEADER_KEY = "__hedging_state__" + + +class _HedgingDetectionState: + """Thread-safe backing state for the three public hedging-detection accessors. + + One instance per Cosmos operation. Shared by reference across the + orchestrator, the hedging handler arms, the retry utility, and ultimately + the response wrapper or exception that bubbles up to the customer. + """ + + __slots__ = ("_lock", "_requested", "_responded", "_hedging_started") + + def __init__(self) -> None: + self._lock = threading.Lock() + self._requested: list = [] # list[RequestedRegion] + self._responded: list = [] # list[str] + self._hedging_started = False + + # ------------------------------------------------------------------ writes + def _record_request(self, region_name: str, reason: RequestedRegionReason) -> None: + """Append a dispatched-region entry. Sets the hedging-started flag + atomically when reason is ``HEDGING`` (compound update guarded by lock). + """ + if region_name is None: + region_name = "" + entry = RequestedRegion(region_name=region_name, reason=reason) + with self._lock: + self._requested.append(entry) + if reason is RequestedRegionReason.HEDGING: + self._hedging_started = True + + def _record_response(self, region_name: str) -> None: + """Append a responding-region entry. Duplicates are intentional — + the same region may produce multiple responses (e.g., a late response + after a hedge winner).""" + if region_name is None: + region_name = "" + with self._lock: + self._responded.append(region_name) + + # ------------------------------------------------------------------- reads + def is_hedging_started(self) -> bool: + with self._lock: + return self._hedging_started + + def get_requested_regions(self) -> Tuple[RequestedRegion, ...]: + with self._lock: + return tuple(self._requested) + + def get_responded_regions(self) -> Tuple[str, ...]: + with self._lock: + return tuple(self._responded) + + +# --------------------------------------------------------------------------- # +# Mixin used by the five public wrapper / exception types to expose the three +# accessor methods without duplicating bodies. The mixin only reads +# ``self._hedging_state`` (or its absence) and returns safe defaults if no +# state has been attached. +# --------------------------------------------------------------------------- # +class _HedgingDetectionAccessorsMixin: + """Shared implementation of the three public accessors. + + Mixed into ``CosmosDict``, ``CosmosList``, ``CosmosItemPaged``, + ``CosmosAsyncItemPaged``, ``CosmosHttpResponseError``, + ``CosmosBatchOperationError``, and ``CosmosClientTimeoutError`` to avoid + method-body duplication. Each consumer class is responsible for setting + ``self._hedging_state`` (or leaving it ``None``); the accessors fall back + to safe defaults when state is unattached. + """ + + # Default attribute slot; consumer types may override via ``__init__``. + _hedging_state = None # type: ignore[assignment] + + def is_hedging_started(self) -> bool: + """Return ``True`` if the SDK actually dispatched the operation to a + hedge region. + + Returns ``False`` for non-hedged operations, including the case where + hedging was configured but the primary responded under the hedge + threshold (hedge-arm tasks created but cancelled before they ran; + no fan-out occurred). + + ``False`` does NOT mean hedging is disabled or misconfigured. To check + whether hedging is configured on the client, inspect the client-level + ``availability_strategy`` configuration. + """ + state = self._hedging_state + if state is None: + return False + return state.is_hedging_started() + + def get_requested_regions(self) -> Tuple[RequestedRegion, ...]: + """Return the regions the SDK actually dispatched this operation to, + tagged with a reason. + + Ordering is observed dispatch order (orchestrator wall-clock). Includes + the initial attempt. Empty only on pre-flight validation failures or + when no state was attached (the operation never reached the + orchestrator). + + Append site is the actual dispatch point (post-threshold delay for + hedge arms); registered-but-never-dispatched hedge tasks do NOT appear + here. + + **Contract is "dispatched, not necessarily wire-issued".** An entry + reflects the SDK's decision to commit to dispatching — for hedge arms, + this means the per-arm threshold delay elapsed without cancellation, + so the dispatch hook fired. A cancellation in the small window between + that dispatch decision and the underlying HTTP send (asyncio task + scheduling jitter, ThreadPoolExecutor scheduling) still leaves the + entry in this list. Callers should treat the list as a record of + intent-to-dispatch, not a record of wire-issued requests. + + :class:`RequestedRegionReason` is non-exhaustive — handle + :attr:`~RequestedRegionReason.UNKNOWN` and unknown values defensively + (``_missing_`` returns :attr:`~RequestedRegionReason.UNKNOWN` for + future enum members). + """ + state = self._hedging_state + if state is None: + return () + return state.get_requested_regions() + + def get_responded_regions(self) -> Tuple[str, ...]: + """Return the regions that returned a response (success or failure), + in arrival order. + + **Duplicates are allowed and expected.** The same region may appear + more than once if it produced multiple responses (e.g., a late response + after a hedge winner). ``len(get_responded_regions()) > 1`` does NOT + imply that more than one distinct region responded. For unique regions + in arrival order use ``tuple(dict.fromkeys(resp.get_responded_regions()))``; + for an unordered set use ``set(resp.get_responded_regions())``. + """ + state = self._hedging_state + if state is None: + return () + return state.get_responded_regions() + + +def _attach_state_to_headers(headers, state): + """Helper: stash a ``_HedgingDetectionState`` on a response-headers dict + under the module-private sentinel key. The wrapper-type ``__init__`` pops + this key so customers never see it on ``get_response_headers()``. + + No-op if either argument is falsy.""" + if headers is None or state is None: + return + try: + headers[HEDGING_STATE_HEADER_KEY] = state + except (TypeError, AttributeError): # pragma: no cover - defensive + # If headers is not a dict-like (shouldn't happen on the response + # path), silently swallow rather than break the request. + pass + + +def _pop_state_from_headers(headers): + """Helper used by every wrapper type's ``__init__``: pop and return the + private state sentinel from a response-headers dict. Returns ``None`` if + no state is attached. Also accepts ``None`` headers safely.""" + if headers is None: + return None + try: + return headers.pop(HEDGING_STATE_HEADER_KEY, None) + except (TypeError, AttributeError): # pragma: no cover - defensive + return None diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics_types.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics_types.py new file mode 100644 index 000000000000..795dbd85a039 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics_types.py @@ -0,0 +1,114 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Public diagnostics types for the hedging detection API. + +This module exposes :class:`RequestedRegion` and :class:`RequestedRegionReason`, +the value types returned by the per-response / per-exception accessors +``is_hedging_started()``, ``get_requested_regions()`` and ``get_responded_regions()``. + +``RequestedRegionReason`` is a *non-exhaustive* enumeration: future SDK versions +may emit reasons unknown to older client code. ``RequestedRegionReason._missing_`` +returns :attr:`RequestedRegionReason.UNKNOWN` rather than raising +:class:`ValueError`, preserving forward compatibility for callers that pattern +match on enum members. +""" +from dataclasses import dataclass +from enum import Enum + + +class RequestedRegionReason(Enum): + """Reason a region appears in :meth:`get_requested_regions`. + + This enum is **non-exhaustive**. Callers must handle the + :attr:`UNKNOWN` value defensively, and treat any unrecognized value as + :attr:`UNKNOWN` (``_missing_`` ensures forward compatibility — calling + ``RequestedRegionReason("future_value_42")`` returns :attr:`UNKNOWN` + rather than raising :class:`ValueError`). + + Some members are reserved for parity with other Azure Cosmos SDKs and are + not populated by the Python SDK today: + + * :attr:`TRANSPORT_RETRY` — the Python azure-core transport retry layer is + invisible to the Cosmos SDK; reserved but never appended. + * :attr:`CIRCUIT_BREAKER_PROBE` — Python's per-partition circuit breaker + performs *region exclusion*, not synthetic probe dispatches; re-elected + regions are tagged as the reason that naturally applied + (``INITIAL`` / ``OPERATION_RETRY``). + """ + + INITIAL = "initial" + """Initial dispatch of the operation, before any retry or hedge fan-out.""" + + OPERATION_RETRY = "operation_retry" + """SDK-level retry policy (gone / throttle / session / service-unavailable) + decided to retry to the **same** region.""" + + TRANSPORT_RETRY = "transport_retry" + """Reserved for parity with other Cosmos SDKs; not populated by the Python + SDK today (the azure-core transport retry layer is invisible from the + Cosmos layer).""" + + HEDGING = "hedging" + """A cross-region hedge arm dispatched this region after the configured + threshold delay elapsed without the primary winning.""" + + REGION_FAILOVER = "region_failover" + """SDK-level retry policy (timeout failover / endpoint discovery) switched + the request to a different region.""" + + CIRCUIT_BREAKER_PROBE = "circuit_breaker_probe" + """Reserved for parity with other Cosmos SDKs; not populated by the Python + SDK today (Python's circuit breaker performs region exclusion, not synthetic + probe dispatches).""" + + UNKNOWN = "unknown" + """An unrecognized reason value, including any future reason added in a + later SDK version. :meth:`_missing_` maps unknown raw values to this + member to keep deserialization / pattern-matching code forward-compatible. + """ + + @classmethod + def _missing_(cls, value): # type: ignore[override] + # Forward-compatibility: callers receiving a reason from a future SDK + # version that introduced a new value will get UNKNOWN instead of a + # ValueError. The cross-SDK contract guarantees this behavior so that + # log-parsers and pattern matchers do not break on upgrade. + return cls.UNKNOWN + + +@dataclass(frozen=True, slots=True) +class RequestedRegion: + """A single entry in :meth:`get_requested_regions`. + + :ivar region_name: Human-readable region name (e.g., ``"East US"``). + May be empty if the SDK could not resolve a name for the endpoint at + dispatch time; never ``None``. + :vartype region_name: str + + :ivar reason: Why this region was dispatched to. See + :class:`RequestedRegionReason`. May be :attr:`RequestedRegionReason.UNKNOWN` + for forward compatibility. + :vartype reason: RequestedRegionReason + """ + + region_name: str + reason: RequestedRegionReason diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py index 8c6fb632a41b..8d8e970d8099 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py @@ -41,6 +41,7 @@ from . import exceptions from ._constants import _Constants from ._cosmos_http_logging_policy import _log_diagnostics_error +from ._diagnostics_types import RequestedRegionReason from ._global_partition_endpoint_manager_per_partition_automatic_failover import \ _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover from ._request_object import RequestObject @@ -270,6 +271,12 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin ] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds if args and args[0].should_clear_session_token_on_session_read_failure: client.session.clear_session_token(client.last_response_headers) + # Hedging-detection: attach per-operation state to the exception + # BEFORE re-raising so error-path consumers + # (``CosmosHttpResponseError.get_requested_regions()``) work. + _record_retry_diagnostics_and_attach( + kwargs.get("_hedging_state"), retry_policy, e, args, attached_on_raise=True + ) raise # Now check timeout before retrying @@ -280,6 +287,15 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin # Wait for retry_after_in_milliseconds time before the next retry time.sleep(retry_policy.retry_after_in_milliseconds / 1000.0) + # Record this retry decision now that we have committed to looping. + # The reason is REGION_FAILOVER if the retry policy mutates the + # target region (timeout-failover / endpoint-discovery); else + # OPERATION_RETRY (same-region retry: gone / throttle / session / + # service-unavailable / default). + _record_retry_diagnostics_and_attach( + kwargs.get("_hedging_state"), retry_policy, e, args, attached_on_raise=False + ) + except ServiceRequestError as e: if request and _has_database_account_header(request.headers): if not health_check_retry_policy.ShouldRetry(e): @@ -298,6 +314,72 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin _record_failure_if_request_not_cancelled(args[0], global_endpoint_manager, pk_range_wrapper) _handle_service_response_retries(request, client, service_response_retry_policy, e, *args) +def _record_retry_diagnostics_and_attach( + hedging_state, + retry_policy, + exception, + args, + attached_on_raise: bool) -> None: + """Hedging-detection: record a retry-policy decision against the + per-operation state and (when raising) attach the state to the exception. + + Reason mapping: + * timeout-failover / endpoint-discovery → ``REGION_FAILOVER`` + * everything else → ``OPERATION_RETRY`` + + Recording happens once per retry decision, AFTER ``retry_policy.ShouldRetry`` + has executed (so any region mutation the policy applied has already + landed on ``args[0]``). The recorded region name is best-effort and may + be empty when the endpoint cannot be resolved to a friendly name. + + Never raises — diagnostics must not break the hot path. + """ + if hedging_state is None: + return + try: + if attached_on_raise: + # Attach to exception only when the retry-utility is re-raising, + # so the exception object surfaces the operation's full timeline. + try: + setattr(exception, "_hedging_state", hedging_state) + except (AttributeError, TypeError): # pragma: no cover - defensive + pass + return + + # Determine reason from the policy class name (avoids importing every + # policy module at this layer; matches landscape-research-python.md + # citations). + cls_name = type(retry_policy).__name__ + if cls_name in ( + "_TimeoutFailoverRetryPolicy", + "EndpointDiscoveryRetryPolicy", + ): + reason = RequestedRegionReason.REGION_FAILOVER + else: + reason = RequestedRegionReason.OPERATION_RETRY + + # Best-effort region resolution from args[0] (the RequestObject). + region_name = "" + try: + if args: + request_params = args[0] + # ``location_endpoint_to_route`` is the post-ShouldRetry endpoint. + endpoint = getattr(request_params, "location_endpoint_to_route", None) + global_em = getattr(retry_policy, "global_endpoint_manager", None) + if endpoint and global_em is not None and hasattr(global_em, "get_region_name"): + is_write = _OperationType.IsWriteOperation(request_params.operation_type) + name = global_em.get_region_name(endpoint, is_write) + if name: + region_name = name + except Exception: # pylint: disable=broad-except + region_name = "" + + hedging_state._record_request(region_name, reason) # pylint: disable=protected-access + except Exception: # pylint: disable=broad-except + # Never let diagnostics break the request. + return + + def _record_success_if_request_not_cancelled( request_params: RequestObject, global_endpoint_manager: _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py index 7b18f52da2e2..5f83de57669a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py @@ -33,6 +33,8 @@ from ._availability_strategy_config import CrossRegionHedgingStrategy from ._availability_strategy_handler import execute_with_hedging from ._constants import _Constants +from ._diagnostics import _HedgingDetectionState, _attach_state_to_headers +from ._diagnostics_types import RequestedRegionReason from ._request_object import RequestObject from .documents import _OperationType @@ -88,6 +90,9 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin kwargs.pop(_Constants.OperationStartTime, None) # Pop internal flags that should not be passed to the HTTP layer kwargs.pop("_internal_pk_range_fetch", None) + # Hedging-detection state is consumed by the orchestrator + retry layer + # only; never forward to the HTTP pipeline. + kwargs.pop("_hedging_state", None) connection_timeout = connection_policy.RequestTimeout connection_timeout = kwargs.pop("connection_timeout", connection_timeout) read_timeout = connection_policy.ReadTimeout @@ -266,9 +271,28 @@ def SynchronizedRequest( if global_endpoint_manager.is_per_partition_automatic_failover_enabled(): request_params.availability_strategy = CrossRegionHedgingStrategy() + # ----- Hedging-detection: per-operation state -------------------------- # + # Construct a state holder for this operation. Threaded through: + # * the hedging handler via the ``hedging_state`` closure argument + # (NOT on ``request_params`` — SE-002 / public-spec §5.4 / AC8). + # * ``_retry_utility.Execute`` via the ``_hedging_state`` kwarg. + # * back up to the wrapper-construction site (``CosmosDict``/``CosmosList`` + # in ``_cosmos_client_connection``) by stashing on the response-headers + # dict under a private sentinel key (popped in each wrapper's + # ``__init__``). + # Hedge-arm subtasks recursively re-enter ``SynchronizedRequest`` with + # ``is_hedging_request=True``; they should reuse the parent state via the + # kwarg rather than create a new one. We thus only create state for the + # top-level dispatch. + hedging_state: "Optional[_HedgingDetectionState]" = kwargs.get("_hedging_state") + is_top_level = hedging_state is None and not request_params.is_hedging_request + if is_top_level: + hedging_state = _HedgingDetectionState() + kwargs["_hedging_state"] = hedging_state + # Handle hedging if availability strategy is applicable if _is_availability_strategy_applicable(request_params): - return execute_with_hedging( + result, headers = execute_with_hedging( request_params, global_endpoint_manager, request, @@ -281,11 +305,24 @@ def SynchronizedRequest( pipeline_client, r, **kwargs - ) + ), + hedging_state=hedging_state, + ) + if is_top_level: + _attach_state_to_headers(headers, hedging_state) + return result, headers + + # Non-hedged path: record INITIAL once at orchestrator entry. + if is_top_level and hedging_state is not None: + # Best-effort region name from the resolved endpoint; falls back to + # empty string when the endpoint cannot be resolved to a friendly name. + region_name = _resolve_region_name(global_endpoint_manager, request_params) + hedging_state._record_request( # pylint: disable=protected-access + region_name, RequestedRegionReason.INITIAL ) # Pass _Request function with its parameters to retry_utility's Execute method that wraps the call with retries - return _retry_utility.Execute( + result, headers = _retry_utility.Execute( client, global_endpoint_manager, _Request, @@ -295,3 +332,35 @@ def SynchronizedRequest( request, **kwargs ) + if is_top_level and hedging_state is not None: + # Best-effort: the responding region is the resolved endpoint for the + # final attempt (post-retry). Same resolver as INITIAL above. + region_name = _resolve_region_name(global_endpoint_manager, request_params) + hedging_state._record_response(region_name) # pylint: disable=protected-access + _attach_state_to_headers(headers, hedging_state) + return result, headers + + +def _resolve_region_name(global_endpoint_manager, request_params) -> str: + """Best-effort resolution of the human-readable region name for the + endpoint currently selected on ``request_params``. Returns an empty string + if the name cannot be resolved (e.g., emulator, missing routing context). + Never raises — the hot path must not break for diagnostics.""" + try: + endpoint = getattr(request_params, "location_endpoint_to_route", None) + if endpoint and hasattr(global_endpoint_manager, "get_region_name"): + is_write = _OperationType.IsWriteOperation(request_params.operation_type) + name = global_endpoint_manager.get_region_name(endpoint, is_write) + if name: + return name + # Fall back to the global endpoint manager's preferred read/write region. + if hasattr(global_endpoint_manager, "get_write_endpoint_region"): + try: + name = global_endpoint_manager.get_write_endpoint_region() + if name: + return name + except Exception: # pylint: disable=broad-except + pass + except Exception: # pylint: disable=broad-except + return "" + return "" diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_availability_strategy_handler.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_availability_strategy_handler.py index 77ac53a738db..d8b111068fa9 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_availability_strategy_handler.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_availability_strategy_handler.py @@ -32,6 +32,8 @@ from ._global_partition_endpoint_manager_circuit_breaker_async import \ _GlobalPartitionEndpointManagerForCircuitBreakerAsync from .._availability_strategy_handler_base import AvailabilityStrategyHandlerMixin +from .._diagnostics import _HedgingDetectionState +from .._diagnostics_types import RequestedRegionReason from .._request_object import RequestObject ResponseType = Tuple[Dict[str, Any], Dict[str, Any]] @@ -48,7 +50,8 @@ async def execute_single_request_with_delay( location_index: int, available_locations: List[str], complete_status: Event, - first_request_params_holder: SimpleNamespace + first_request_params_holder: SimpleNamespace, + hedging_state: Optional[_HedgingDetectionState] = None, ) -> ResponseType: """Execute a single request with appropriate delay based on location index. @@ -76,6 +79,9 @@ async def execute_single_request_with_delay( :type complete_status: asyncio.Event :param first_request_params_holder: Namespace object storing request parameters for the initial request :type first_request_params_holder: SimpleNamespace + :param hedging_state: Per-operation hedging detection state, passed as a + closure argument (NOT on ``request_params`` — see SE-002). + :type hedging_state: Optional[_HedgingDetectionState] :returns: Tuple containing response data and headers from the request :rtype: ResponseType :raises: CancelledError if request is cancelled due to completion status @@ -101,6 +107,16 @@ async def execute_single_request_with_delay( if delay > 0: await asyncio.sleep(delay / 1000) + # ---- Hedging-detection recording (Option B) --------------------- # + # IMPORTANT: this append happens AFTER ``await asyncio.sleep`` AND + # AFTER the cancellation check below. The contract is "dispatched, not + # necessarily wire-issued" — see SE-013 and public spec §5.3 docstring + # on ``get_requested_regions``. A hedge-arm task cancelled before its + # sleep completes raises ``CancelledError`` *inside* the sleep, which + # unwinds the coroutine before reaching this line — guaranteeing no + # phantom HEDGING entry on cancel-before-delay-elapsed (AC10). + # ----------------------------------------------------------------- # + # Create request parameters for this location params = copy.deepcopy(request_params) params.is_hedging_request = location_index > 0 @@ -121,14 +137,30 @@ async def execute_single_request_with_delay( if complete_status is not None and complete_status.is_set(): raise CancelledError("The request has been cancelled") - return await execute_request_fn(params, req) + # Record dispatch intent now (post-delay, post-cancellation-check). + if hedging_state is not None and 0 <= location_index < len(available_locations): + region_name = available_locations[location_index] + reason = ( + RequestedRegionReason.INITIAL if location_index == 0 + else RequestedRegionReason.HEDGING + ) + hedging_state._record_request(region_name, reason) # pylint: disable=protected-access + + try: + result = await execute_request_fn(params, req) + except Exception: + raise + if hedging_state is not None and 0 <= location_index < len(available_locations): + hedging_state._record_response(available_locations[location_index]) # pylint: disable=protected-access + return result async def execute_request( self, request_params: RequestObject, global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreakerAsync, request: HttpRequest, - execute_request_fn: Callable[..., Awaitable[ResponseType]] + execute_request_fn: Callable[..., Awaitable[ResponseType]], + hedging_state: Optional[_HedgingDetectionState] = None, ) -> ResponseType: """Execute request with cross-region hedging strategy. @@ -146,6 +178,9 @@ async def execute_request( :type request: HttpRequest :param execute_request_fn: Async function to execute the actual request :type execute_request_fn: Callable[..., Awaitable[ResponseType]] + :param hedging_state: Per-operation hedging detection state, passed as a + closure argument (NOT on ``request_params`` — see SE-002). + :type hedging_state: Optional[_HedgingDetectionState] :returns: A tuple containing the response data and headers from the successful request :rtype: ResponseType :raises: Exception from the first request if all requests fail with transient errors @@ -174,7 +209,8 @@ async def execute_request( i, available_locations, completion_status, - first_request_params_holder + first_request_params_holder, + hedging_state=hedging_state, )) active_tasks.append(task) if i == 0: @@ -223,7 +259,8 @@ async def execute_request( next_index, available_locations, completion_status, - first_request_params_holder + first_request_params_holder, + hedging_state=hedging_state, )) active_tasks.append(task) @@ -258,7 +295,8 @@ async def execute_with_availability_strategy( request_params: RequestObject, global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreakerAsync, request: HttpRequest, - execute_request_fn: Callable[..., Awaitable[ResponseType]] + execute_request_fn: Callable[..., Awaitable[ResponseType]], + hedging_state: Optional[_HedgingDetectionState] = None, ) -> ResponseType: """Execute a request with hedging based on the availability strategy. @@ -278,6 +316,9 @@ async def execute_with_availability_strategy( :type request: HttpRequest :param execute_request_fn: Async function to execute the actual request :type execute_request_fn: Callable[..., Awaitable[ResponseType]] + :param hedging_state: Per-operation hedging detection state, passed as a + closure argument (NOT on ``request_params`` — see SE-002). + :type hedging_state: Optional[_HedgingDetectionState] :returns: Tuple containing response data and headers from the successful request :rtype: ResponseType :raises: CosmosClientError if all hedged requests fail @@ -287,5 +328,6 @@ async def execute_with_availability_strategy( request_params, global_endpoint_manager, request, - execute_request_fn + execute_request_fn, + hedging_state=hedging_state, ) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py index cdf64a7cca76..72b513f45e4d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py @@ -34,8 +34,14 @@ from .. import http_constants from .._availability_strategy_config import CrossRegionHedgingStrategy from .._constants import _Constants +from .._diagnostics import _HedgingDetectionState, _attach_state_to_headers +from .._diagnostics_types import RequestedRegionReason from .._request_object import RequestObject -from .._synchronized_request import _request_body_from_data, _replace_url_prefix +from .._synchronized_request import ( + _request_body_from_data, + _replace_url_prefix, + _resolve_region_name, +) from ..documents import _OperationType # cspell:ignore ppaf @@ -58,6 +64,9 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p kwargs.pop(_Constants.OperationStartTime, None) # Pop internal flags that should not be passed to the HTTP layer kwargs.pop("_internal_pk_range_fetch", None) + # Hedging-detection state is consumed by the orchestrator + retry layer + # only; never forward to the HTTP pipeline. + kwargs.pop("_hedging_state", None) connection_timeout = connection_policy.RequestTimeout read_timeout = connection_policy.ReadTimeout connection_timeout = kwargs.pop("connection_timeout", connection_timeout) @@ -219,9 +228,17 @@ async def AsynchronousRequest( if global_endpoint_manager.is_per_partition_automatic_failover_enabled(): request_params.availability_strategy = CrossRegionHedgingStrategy() + # Hedging-detection state (closure-argument pattern; see SE-002 / public + # spec §5.4). Sync↔async parity: this mirrors ``_synchronized_request``. + hedging_state = kwargs.get("_hedging_state") + is_top_level = hedging_state is None and not request_params.is_hedging_request + if is_top_level: + hedging_state = _HedgingDetectionState() + kwargs["_hedging_state"] = hedging_state + # Handle hedging if strategy is configured if _is_availability_strategy_applicable(request_params): - return await execute_with_availability_strategy( + result, headers = await execute_with_availability_strategy( request_params, global_endpoint_manager, request, @@ -234,11 +251,22 @@ async def AsynchronousRequest( pipeline_client, r, **kwargs - ) + ), + hedging_state=hedging_state, + ) + if is_top_level: + _attach_state_to_headers(headers, hedging_state) + return result, headers + + # Non-hedged path: record INITIAL once at orchestrator entry. + if is_top_level and hedging_state is not None: + region_name = _resolve_region_name(global_endpoint_manager, request_params) + hedging_state._record_request( # pylint: disable=protected-access + region_name, RequestedRegionReason.INITIAL ) # Pass _Request function with its parameters to retry_utility's Execute method that wraps the call with retries - return await _retry_utility_async.ExecuteAsync( + result, headers = await _retry_utility_async.ExecuteAsync( client, global_endpoint_manager, _Request, @@ -248,3 +276,8 @@ async def AsynchronousRequest( request, **kwargs ) + if is_top_level and hedging_state is not None: + region_name = _resolve_region_name(global_endpoint_manager, request_params) + hedging_state._record_response(region_name) # pylint: disable=protected-access + _attach_state_to_headers(headers, hedging_state) + return result, headers diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py index d3ba6a3fbe75..cee97649e3b5 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py @@ -43,10 +43,11 @@ from .. import exceptions from .._constants import _Constants from .._container_recreate_retry_policy import ContainerRecreateRetryPolicy +from .._diagnostics_types import RequestedRegionReason from .._request_object import RequestObject from .._retry_utility import (_configure_timeout, _has_read_retryable_headers, _handle_service_response_retries, _handle_service_request_retries, - _has_database_account_header) + _has_database_account_header, _record_retry_diagnostics_and_attach) from .._routing.routing_range import PartitionKeyRangeWrapper from ..exceptions import CosmosHttpResponseError from ..http_constants import HttpHeaders, StatusCodes, SubStatusCodes @@ -270,6 +271,11 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg ] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds if args and args[0].should_clear_session_token_on_session_read_failure and client.session: client.session.clear_session_token(client.last_response_headers) + # Hedging-detection: attach state to the exception BEFORE + # re-raising (sync↔async parity with ``_retry_utility.Execute``). + _record_retry_diagnostics_and_attach( + kwargs.get("_hedging_state"), retry_policy, e, args, attached_on_raise=True + ) raise # Check timeout only before retrying @@ -280,6 +286,11 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg # Wait for retry_after_in_milliseconds time before the next retry await asyncio.sleep(retry_policy.retry_after_in_milliseconds / 1000.0) + # Record the retry decision now that we committed to looping. + _record_retry_diagnostics_and_attach( + kwargs.get("_hedging_state"), retry_policy, e, args, attached_on_raise=False + ) + except ServiceRequestError as e: if request and _has_database_account_header(request.headers): if not health_check_retry_policy.ShouldRetry(e): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py b/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py index f7c018c37c9f..29d3f4754223 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py @@ -28,9 +28,10 @@ ResourceNotFoundError ) from . import http_constants +from ._diagnostics import _HedgingDetectionAccessorsMixin from .http_constants import StatusCodes as _StatusCode, SubStatusCodes as _SubStatusCodes -class CosmosHttpResponseError(HttpResponseError): +class CosmosHttpResponseError(_HedgingDetectionAccessorsMixin, HttpResponseError): """An HTTP request to the Azure Cosmos database service has failed.""" def __init__(self, status_code=None, message=None, response=None, **kwargs): @@ -42,6 +43,7 @@ def __init__(self, status_code=None, message=None, response=None, **kwargs): self.sub_status = kwargs.pop('sub_status', None) self.endpoint = kwargs.pop('endpoint', None) self.http_error_message = message + self._hedging_state = None status = status_code or (int(response.status_code) if response else 0) if http_constants.HttpHeaders.SubStatus in self.headers: @@ -74,7 +76,7 @@ class CosmosAccessConditionFailedError(CosmosHttpResponseError): """An HTTP error response with status code 412.""" -class CosmosBatchOperationError(HttpResponseError): +class CosmosBatchOperationError(_HedgingDetectionAccessorsMixin, HttpResponseError): """A transactional batch request to the Azure Cosmos database service has failed. :ivar int error_index: Index of operation within the batch that caused the error. @@ -111,6 +113,7 @@ def __init__( self.sub_status = None self.http_error_message = message self.operation_responses = operation_responses + self._hedging_state = None status = status_code if http_constants.HttpHeaders.SubStatus in self.headers: @@ -123,7 +126,7 @@ def __init__( self.status_code = status -class CosmosClientTimeoutError(AzureError): +class CosmosClientTimeoutError(_HedgingDetectionAccessorsMixin, AzureError): """An operation failed to complete within the specified timeout.""" def __init__(self, message=None, **kwargs): @@ -131,6 +134,7 @@ def __init__(self, message=None, **kwargs): message = "The request failed to complete within the given timeout." self.response = None self.history = None + self._hedging_state = None super(CosmosClientTimeoutError, self).__init__(message, **kwargs) class _InternalCosmosException: From 4041cb3f3561440fe22d79b427d52f5acab9bae1 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 13:50:56 -0700 Subject: [PATCH 2/4] test(cosmos): hedging detection API tests + samples + CHANGELOG Adds: * CHANGELOG entry under 4.16.0b3 (Features Added). * samples/hedging_detection.py + _async.py - customer-facing usage of the three new accessors on a CosmosDict response and a CosmosHttpResponseError. * tests/test_diagnostics_types.py - 9 unit tests covering AC6 (RequestedRegion frozen/equality/hash/pickle + RequestedRegionReason _missing_ -> UNKNOWN forward-compat per SE-016). * tests/test_hedging_detection.py - 21 unit tests covering AC1, AC3, AC4, AC5, AC8 (closure-arg deepcopy regression), AC10 (no phantom entries on cancelled hedge arm), AC11 (duplicate responses allowed), AC12 (reserved reasons never emitted in production code - repo-wide grep), plus headers invariant + threading.Lock concurrent-append stress test. * tests/test_hedging_detection_async.py - 23 async-twin tests including the sync<->async parity checker (AC9) that fails CI if any sync test gains a method without an _async counterpart. * tests/livecanary/ - opt-in (env-var gated) multi-region smoke tests covering AC13a (hedged read records HEDGING entry) and AC13b (all-regions-fail surfaces dispatch metadata on CosmosHttpResponseError). All 53 sync+async unit tests pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + .../azure-cosmos/samples/hedging_detection.py | 71 +++ .../samples/hedging_detection_async.py | 52 +++ .../azure-cosmos/tests/livecanary/__init__.py | 10 + .../livecanary/test_hedging_detection_live.py | 96 ++++ .../test_hedging_detection_live_async.py | 65 +++ .../tests/test_diagnostics_types.py | 98 ++++ .../tests/test_hedging_detection.py | 433 ++++++++++++++++++ .../tests/test_hedging_detection_async.py | 393 ++++++++++++++++ 9 files changed, 1219 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/samples/hedging_detection.py create mode 100644 sdk/cosmos/azure-cosmos/samples/hedging_detection_async.py create mode 100644 sdk/cosmos/azure-cosmos/tests/livecanary/__init__.py create mode 100644 sdk/cosmos/azure-cosmos/tests/livecanary/test_hedging_detection_live.py create mode 100644 sdk/cosmos/azure-cosmos/tests/livecanary/test_hedging_detection_live_async.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_diagnostics_types.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_hedging_detection.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_hedging_detection_async.py diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index f08e9b526bbc..d23a42d57ebe 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.16.0b3 (Unreleased) #### Features Added +* Added the hedging-detection API: three accessor methods (`is_hedging_started()`, `get_requested_regions()`, `get_responded_regions()`) on `CosmosDict`, `CosmosList`, `CosmosItemPaged`, `CosmosAsyncItemPaged`, `CosmosHttpResponseError`, `CosmosBatchOperationError`, and `CosmosClientTimeoutError`, plus the new public types `RequestedRegion` and `RequestedRegionReason`. Lets callers detect post-hoc whether a cross-region hedge was dispatched, which regions were dispatched to (with reason), and which regions responded. `RequestedRegionReason` is a non-exhaustive enum — unknown values map to `UNKNOWN` via `_missing_` for forward compatibility. Cross-SDK; matches the same shape shipped by the .NET / Java SDKs. See [issue 46899](https://github.com/Azure/azure-sdk-for-python/issues/46899). #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/samples/hedging_detection.py b/sdk/cosmos/azure-cosmos/samples/hedging_detection.py new file mode 100644 index 000000000000..86abe26fb2f3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/hedging_detection.py @@ -0,0 +1,71 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +"""Sample: use the hedging-detection API to inspect cross-region hedging behavior. + +This sample shows how, AFTER an operation completes, you can ask the response +wrapper or exception whether the SDK dispatched the operation to a hedge region, +which regions it was dispatched to (with reasons), and which regions responded. + +The accessors are available on: + * azure.cosmos.CosmosDict (point reads / writes / patch) + * azure.cosmos.CosmosList (batch / read_all_items pages) + * azure.cosmos.CosmosItemPaged (query / change feed) + * azure.cosmos.exceptions.CosmosHttpResponseError + * azure.cosmos.exceptions.CosmosBatchOperationError + * azure.cosmos.exceptions.CosmosClientTimeoutError + +For paged operations the accessors reflect the most recently fetched page. + +``RequestedRegionReason`` is non-exhaustive — handle ``UNKNOWN`` defensively: +calling ``RequestedRegionReason("future_value_42")`` returns +``RequestedRegionReason.UNKNOWN`` rather than raising ``ValueError``. +""" + +from azure.cosmos import CosmosClient, RequestedRegion, RequestedRegionReason +from azure.cosmos.exceptions import CosmosHttpResponseError + + +def inspect_response_hedging(client: CosmosClient) -> None: + container = ( + client.get_database_client("my-db").get_container_client("my-container") + ) + + try: + response = container.read_item(item="abc", partition_key="pk1") + except CosmosHttpResponseError as exc: + # Error-path consumers also see the per-operation timeline. + if exc.is_hedging_started(): + print( + "Operation was hedged before failing across:", + [r.region_name for r in exc.get_requested_regions()], + ) + raise + + if response.is_hedging_started(): + print("Operation was hedged.") + else: + print("Operation was NOT hedged (primary returned under the threshold).") + + print("Dispatched regions (in observed dispatch order):") + for entry in response.get_requested_regions(): + assert isinstance(entry, RequestedRegion) + # Handle UNKNOWN / unknown reasons defensively. + if entry.reason is RequestedRegionReason.UNKNOWN: + print(f" - {entry.region_name} (unrecognized reason)") + else: + print(f" - {entry.region_name} ({entry.reason.value})") + + print("Responded regions (duplicates allowed — see docstring):") + print(" raw:", list(response.get_responded_regions())) + print(" unique-by-first-appearance:", + list(dict.fromkeys(response.get_responded_regions()))) + + +if __name__ == "__main__": + import os + + endpoint = os.environ["COSMOS_ENDPOINT"] + key = os.environ["COSMOS_KEY"] + with CosmosClient(endpoint, credential=key) as client: + inspect_response_hedging(client) diff --git a/sdk/cosmos/azure-cosmos/samples/hedging_detection_async.py b/sdk/cosmos/azure-cosmos/samples/hedging_detection_async.py new file mode 100644 index 000000000000..3ffe2eef6b98 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/hedging_detection_async.py @@ -0,0 +1,52 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +"""Async twin of ``hedging_detection.py``. See that file for the narrative.""" + +import asyncio +import os + +from azure.cosmos import RequestedRegion, RequestedRegionReason +from azure.cosmos.aio import CosmosClient +from azure.cosmos.exceptions import CosmosHttpResponseError + + +async def inspect_response_hedging(client: CosmosClient) -> None: + container = ( + client.get_database_client("my-db").get_container_client("my-container") + ) + + try: + response = await container.read_item(item="abc", partition_key="pk1") + except CosmosHttpResponseError as exc: + if exc.is_hedging_started(): + print( + "Operation was hedged before failing across:", + [r.region_name for r in exc.get_requested_regions()], + ) + raise + + if response.is_hedging_started(): + print("Operation was hedged.") + else: + print("Operation was NOT hedged (primary returned under the threshold).") + + for entry in response.get_requested_regions(): + assert isinstance(entry, RequestedRegion) + if entry.reason is RequestedRegionReason.UNKNOWN: + print(f" - {entry.region_name} (unrecognized reason)") + else: + print(f" - {entry.region_name} ({entry.reason.value})") + + print("Responded regions:", list(response.get_responded_regions())) + + +async def main() -> None: + endpoint = os.environ["COSMOS_ENDPOINT"] + key = os.environ["COSMOS_KEY"] + async with CosmosClient(endpoint, credential=key) as client: + await inspect_response_hedging(client) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sdk/cosmos/azure-cosmos/tests/livecanary/__init__.py b/sdk/cosmos/azure-cosmos/tests/livecanary/__init__.py new file mode 100644 index 000000000000..19105d0e0964 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/livecanary/__init__.py @@ -0,0 +1,10 @@ +# Live multi-region canary tests for hedging detection (AC13). +# +# These tests are skipped by default. To run, set: +# COSMOS_MULTI_REGION_ENDPOINT +# COSMOS_MULTI_REGION_KEY +# COSMOS_MULTI_REGION_DATABASE +# COSMOS_MULTI_REGION_CONTAINER +# against a multi-region Cosmos DB account with at least two preferred regions +# in distinct geographies (otherwise the threshold-based hedge arms never +# dispatch and the test cannot observe HEDGING entries). diff --git a/sdk/cosmos/azure-cosmos/tests/livecanary/test_hedging_detection_live.py b/sdk/cosmos/azure-cosmos/tests/livecanary/test_hedging_detection_live.py new file mode 100644 index 000000000000..5fabcdeb8bbc --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/livecanary/test_hedging_detection_live.py @@ -0,0 +1,96 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +"""Live multi-region smoke test for hedging detection (AC13). + +This test exercises the entire stack against a real multi-region Cosmos DB +account. It is **skipped by default**; set the four environment variables +listed below to enable it. + +Acceptance criteria covered: + +* AC13a — Happy-path hedged read with a low threshold (``threshold_ms=1``) + produces a ``CosmosDict`` with ``is_hedging_started() == True`` and at + least two distinct ``get_requested_regions()`` entries on a multi-region + account. +* AC13b — A request directed at a non-existent item across all regions + surfaces a ``CosmosHttpResponseError`` whose ``get_requested_regions()`` + is non-empty (the failed dispatch still has dispatch intent). +""" + +import os +import uuid + +import pytest + +from azure.cosmos import ( + CosmosClient, + PartitionKey, + RequestedRegionReason, + exceptions, +) + +ENDPOINT = os.environ.get("COSMOS_MULTI_REGION_ENDPOINT") +KEY = os.environ.get("COSMOS_MULTI_REGION_KEY") +DB_NAME = os.environ.get("COSMOS_MULTI_REGION_DATABASE", "hedging-livecanary") +CONTAINER_NAME = os.environ.get("COSMOS_MULTI_REGION_CONTAINER", "items") + +pytestmark = pytest.mark.skipif( + not (ENDPOINT and KEY), + reason=( + "Live multi-region canary requires COSMOS_MULTI_REGION_ENDPOINT and " + "COSMOS_MULTI_REGION_KEY env vars." + ), +) + + +@pytest.fixture(scope="module") +def client(): + c = CosmosClient(ENDPOINT, credential=KEY) + yield c + + +@pytest.fixture(scope="module") +def container(client): + db = client.create_database_if_not_exists(id=DB_NAME) + c = db.create_container_if_not_exists( + id=CONTAINER_NAME, partition_key=PartitionKey(path="/pk") + ) + return c + + +def test_live_hedged_read_records_hedging(container): + """AC13a — under a tiny threshold the secondary region should dispatch.""" + item_id = f"hd-{uuid.uuid4()}" + container.upsert_item({"id": item_id, "pk": item_id, "value": 1}) + + strategy = {"threshold_ms": 1, "threshold_steps_ms": 1} + response = container.read_item( + item=item_id, partition_key=item_id, availability_strategy=strategy + ) + # The wrapper carries the three accessors. + regions = response.get_requested_regions() + assert len(regions) >= 1 + # On a true multi-region account the threshold of 1ms is almost certain to + # produce a hedge arm. Assert dispatch metadata is consistent with the + # observed flag. + if response.is_hedging_started(): + reasons = {r.reason for r in regions} + assert RequestedRegionReason.HEDGING in reasons + + +def test_live_all_regions_fail_records_dispatch_on_error(container): + """AC13b — failing read still surfaces dispatch metadata on the error.""" + missing_id = f"does-not-exist-{uuid.uuid4()}" + strategy = {"threshold_ms": 1, "threshold_steps_ms": 1} + with pytest.raises(exceptions.CosmosResourceNotFoundError) as ei: + container.read_item( + item=missing_id, partition_key=missing_id, + availability_strategy=strategy, + ) + exc = ei.value + # The accessor surface is available on the exception. + regions = exc.get_requested_regions() + assert isinstance(regions, tuple) + # At least one INITIAL dispatch happened before the not-found surfaced. + assert len(regions) >= 1 diff --git a/sdk/cosmos/azure-cosmos/tests/livecanary/test_hedging_detection_live_async.py b/sdk/cosmos/azure-cosmos/tests/livecanary/test_hedging_detection_live_async.py new file mode 100644 index 000000000000..c3d56245d230 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/livecanary/test_hedging_detection_live_async.py @@ -0,0 +1,65 @@ +# Async live multi-region smoke test for hedging detection (AC13). +# +# Same env-var contract as the sync version. Skipped by default. + +import os +import uuid + +import pytest + +from azure.cosmos import RequestedRegionReason, exceptions +from azure.cosmos.aio import CosmosClient +from azure.cosmos import PartitionKey + +ENDPOINT = os.environ.get("COSMOS_MULTI_REGION_ENDPOINT") +KEY = os.environ.get("COSMOS_MULTI_REGION_KEY") +DB_NAME = os.environ.get("COSMOS_MULTI_REGION_DATABASE", "hedging-livecanary") +CONTAINER_NAME = os.environ.get("COSMOS_MULTI_REGION_CONTAINER", "items") + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.skipif( + not (ENDPOINT and KEY), + reason=( + "Live multi-region canary requires COSMOS_MULTI_REGION_ENDPOINT and " + "COSMOS_MULTI_REGION_KEY env vars." + ), + ), +] + + +async def test_live_hedged_read_records_hedging_async(): + async with CosmosClient(ENDPOINT, credential=KEY) as client: + db = await client.create_database_if_not_exists(id=DB_NAME) + c = await db.create_container_if_not_exists( + id=CONTAINER_NAME, partition_key=PartitionKey(path="/pk") + ) + item_id = f"hd-{uuid.uuid4()}" + await c.upsert_item({"id": item_id, "pk": item_id, "value": 1}) + strategy = {"threshold_ms": 1, "threshold_steps_ms": 1} + response = await c.read_item( + item=item_id, partition_key=item_id, availability_strategy=strategy + ) + regions = response.get_requested_regions() + assert len(regions) >= 1 + if response.is_hedging_started(): + assert RequestedRegionReason.HEDGING in {r.reason for r in regions} + + +async def test_live_all_regions_fail_records_dispatch_on_error_async(): + async with CosmosClient(ENDPOINT, credential=KEY) as client: + db = await client.create_database_if_not_exists(id=DB_NAME) + c = await db.create_container_if_not_exists( + id=CONTAINER_NAME, partition_key=PartitionKey(path="/pk") + ) + missing_id = f"does-not-exist-{uuid.uuid4()}" + strategy = {"threshold_ms": 1, "threshold_steps_ms": 1} + with pytest.raises(exceptions.CosmosResourceNotFoundError) as ei: + await c.read_item( + item=missing_id, partition_key=missing_id, + availability_strategy=strategy, + ) + exc = ei.value + regions = exc.get_requested_regions() + assert isinstance(regions, tuple) + assert len(regions) >= 1 diff --git a/sdk/cosmos/azure-cosmos/tests/test_diagnostics_types.py b/sdk/cosmos/azure-cosmos/tests/test_diagnostics_types.py new file mode 100644 index 000000000000..01d446915df2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_diagnostics_types.py @@ -0,0 +1,98 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +"""Unit tests for the public hedging-detection value types. + +Covers AC6 (``RequestedRegionReason._missing_`` forward-compat) and basic +``RequestedRegion`` invariants (frozen / hashable / equality / slots). +Explicitly avoids the anti-patterns called out in public-spec §7: + * no JSON serialization round-trips + * no ``__slots__`` membership introspection + * no monkey-patching of the enum to exercise ``_missing_`` +""" + +import enum +import pickle +import pytest + +from azure.cosmos import RequestedRegion, RequestedRegionReason + + +class TestRequestedRegionReason: + """AC6 + enum-value coverage.""" + + def test_known_values(self): + # Sanity: every documented variant is present and has the expected + # wire-format string (cross-SDK contract — internal-spec §5). + expected = { + "INITIAL": "initial", + "OPERATION_RETRY": "operation_retry", + "TRANSPORT_RETRY": "transport_retry", + "HEDGING": "hedging", + "REGION_FAILOVER": "region_failover", + "CIRCUIT_BREAKER_PROBE": "circuit_breaker_probe", + "UNKNOWN": "unknown", + } + actual = {m.name: m.value for m in RequestedRegionReason} + assert actual == expected + + def test_missing_returns_unknown_for_arbitrary_strings(self): + # AC6: SDK-A receiving a reason emitted by a future SDK-B must not raise. + for raw in ("future_value_42", "some_new_reason", "", "🚀"): + assert RequestedRegionReason(raw) is RequestedRegionReason.UNKNOWN + + def test_missing_returns_unknown_for_non_string_values(self): + # Defensive: non-string raw values also fall through to UNKNOWN. + for raw in (42, None, object()): + assert RequestedRegionReason(raw) is RequestedRegionReason.UNKNOWN + + def test_known_value_round_trips_through_constructor(self): + # Constructing from a known wire value returns the SAME enum member. + assert RequestedRegionReason("hedging") is RequestedRegionReason.HEDGING + assert ( + RequestedRegionReason("operation_retry") + is RequestedRegionReason.OPERATION_RETRY + ) + + def test_is_enum_subclass(self): + # Customers may pattern-match `isinstance(x, RequestedRegionReason)`. + assert issubclass(RequestedRegionReason, enum.Enum) + + +class TestRequestedRegion: + """Frozen / hashable / equality semantics.""" + + def test_construct_and_field_access(self): + r = RequestedRegion( + region_name="East US", reason=RequestedRegionReason.HEDGING + ) + assert r.region_name == "East US" + assert r.reason is RequestedRegionReason.HEDGING + + def test_is_frozen(self): + r = RequestedRegion("East US", RequestedRegionReason.INITIAL) + # Frozen dataclass — assignment must raise. Dataclasses raise + # ``FrozenInstanceError`` (subclass of ``AttributeError``). + with pytest.raises((AttributeError, Exception)): + r.region_name = "West US" # type: ignore[misc] + + def test_equality_and_hash(self): + a = RequestedRegion("East US", RequestedRegionReason.INITIAL) + b = RequestedRegion("East US", RequestedRegionReason.INITIAL) + c = RequestedRegion("West US", RequestedRegionReason.INITIAL) + d = RequestedRegion("East US", RequestedRegionReason.HEDGING) + assert a == b + assert hash(a) == hash(b) + assert a != c + assert a != d + # Set membership relies on hash + eq. + s = {a, b, c, d} + assert len(s) == 3 + + def test_pickleable(self): + # Frozen slots dataclasses must remain pickleable for users that store + # diagnostics state (e.g., for offline replay). Not part of the public + # wire contract — purely a Python-language invariant for value types. + r = RequestedRegion("East US", RequestedRegionReason.HEDGING) + restored = pickle.loads(pickle.dumps(r)) + assert restored == r diff --git a/sdk/cosmos/azure-cosmos/tests/test_hedging_detection.py b/sdk/cosmos/azure-cosmos/tests/test_hedging_detection.py new file mode 100644 index 000000000000..398b647ab212 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_hedging_detection.py @@ -0,0 +1,433 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +"""Unit tests for the hedging-detection accessors (AC1–AC12, sync). + +The tests exercise the public API directly rather than relying on log-string +scraping (per public-spec §7 anti-patterns). Each test constructs the relevant +state in isolation (or via the synchronous orchestrator with a mocked +``execute_request_fn``) to keep the test fast and emulator-free. + +A live multi-region smoke test that exercises the entire stack against a real +account lives at ``tests/livecanary/test_hedging_detection_live.py`` (AC13). +""" + +import copy +from threading import Event +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from azure.cosmos import RequestedRegion, RequestedRegionReason +from azure.cosmos._cosmos_responses import CosmosDict, CosmosList +from azure.cosmos._diagnostics import ( + HEDGING_STATE_HEADER_KEY, + _attach_state_to_headers, + _HedgingDetectionState, + _pop_state_from_headers, +) +from azure.cosmos.exceptions import ( + CosmosBatchOperationError, + CosmosClientTimeoutError, + CosmosHttpResponseError, +) +from azure.core.utils import CaseInsensitiveDict + + +# --------------------------------------------------------------------------- # +# Direct-state tests — fast and dependency-free. +# --------------------------------------------------------------------------- # + + +class TestHedgingDetectionState: + """Direct ``_HedgingDetectionState`` invariants.""" + + def test_empty_state(self): + s = _HedgingDetectionState() + assert s.is_hedging_started() is False + assert s.get_requested_regions() == () + assert s.get_responded_regions() == () + + def test_record_initial_does_not_set_hedging_flag(self): + # AC1: single-region INITIAL → is_hedging_started False + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + assert s.is_hedging_started() is False + assert s.get_requested_regions() == ( + RequestedRegion("East US", RequestedRegionReason.INITIAL), + ) + + def test_record_hedging_sets_flag(self): + # AC3: hedge arm dispatch → is_hedging_started True + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + s._record_request("West US", RequestedRegionReason.HEDGING) + assert s.is_hedging_started() is True + regions = s.get_requested_regions() + assert len(regions) == 2 + assert regions[1].reason is RequestedRegionReason.HEDGING + + def test_record_operation_retry_does_not_set_flag(self): + # AC4: same-region retry → OPERATION_RETRY, NOT HEDGING + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + s._record_request("East US", RequestedRegionReason.OPERATION_RETRY) + assert s.is_hedging_started() is False + reasons = [r.reason for r in s.get_requested_regions()] + assert reasons == [ + RequestedRegionReason.INITIAL, + RequestedRegionReason.OPERATION_RETRY, + ] + + def test_responded_regions_preserves_duplicates(self): + # AC11: duplicates intentional. + s = _HedgingDetectionState() + s._record_response("East US") + s._record_response("East US") + s._record_response("West US") + assert s.get_responded_regions() == ("East US", "East US", "West US") + + def test_snapshots_are_tuples_not_aliases(self): + # Internal lists must not leak; mutation of caller's snapshot must + # not corrupt internal state. + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + snapshot = s.get_requested_regions() + assert isinstance(snapshot, tuple) + # Adding to snapshot is impossible (tuple immutable); verify internal + # state survives. + with pytest.raises(AttributeError): + snapshot.append("West US") # type: ignore[attr-defined] + assert len(s.get_requested_regions()) == 1 + + def test_thread_safety_under_concurrent_appends(self): + # SE-017: threading.Lock guards both reads and writes. + import threading + + s = _HedgingDetectionState() + + def writer(reason, region, n): + for _ in range(n): + s._record_request(region, reason) + s._record_response(region) + + threads = [ + threading.Thread(target=writer, args=(RequestedRegionReason.INITIAL, "A", 200)), + threading.Thread(target=writer, args=(RequestedRegionReason.HEDGING, "B", 200)), + threading.Thread(target=writer, args=(RequestedRegionReason.OPERATION_RETRY, "C", 200)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + # All 600 dispatches landed, lock guaranteed compound-update atomicity + # for the HEDGING-flag set. + assert len(s.get_requested_regions()) == 600 + assert len(s.get_responded_regions()) == 600 + # At least one HEDGING entry → flag must be True. + assert s.is_hedging_started() is True + + +# --------------------------------------------------------------------------- # +# Wrapper / exception forwarding tests. +# --------------------------------------------------------------------------- # + + +class TestWrapperAccessors: + """The three accessors forward correctly on every wrapper type.""" + + def test_cosmos_dict_with_state_attached_via_headers(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + s._record_request("West US", RequestedRegionReason.HEDGING) + s._record_response("West US") + h = CaseInsensitiveDict({"x-ms-foo": "bar"}) + _attach_state_to_headers(h, s) + d = CosmosDict({"id": "abc"}, response_headers=h) + + assert d.is_hedging_started() is True + assert len(d.get_requested_regions()) == 2 + assert d.get_responded_regions() == ("West US",) + # The private sentinel key MUST NOT be exposed via get_response_headers. + headers_out = d.get_response_headers() + assert HEDGING_STATE_HEADER_KEY not in dict(headers_out) + # Customer-facing headers are still intact. + assert headers_out["x-ms-foo"] == "bar" + + def test_cosmos_dict_without_state_returns_safe_defaults(self): + d = CosmosDict({"id": "abc"}, response_headers=CaseInsensitiveDict()) + assert d.is_hedging_started() is False + assert d.get_requested_regions() == () + assert d.get_responded_regions() == () + + def test_cosmos_list_forwards(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + h = CaseInsensitiveDict() + _attach_state_to_headers(h, s) + lst = CosmosList([{"id": "1"}, {"id": "2"}], response_headers=h) + assert lst.is_hedging_started() is False + assert len(lst.get_requested_regions()) == 1 + + def test_cosmos_http_response_error_forwards(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + s._record_request("West US", RequestedRegionReason.HEDGING) + # AC5: error-path accessors are reachable. + exc = CosmosHttpResponseError(status_code=503, message="boom") + exc._hedging_state = s + assert exc.is_hedging_started() is True + assert len(exc.get_requested_regions()) == 2 + + def test_cosmos_batch_operation_error_forwards(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + exc = CosmosBatchOperationError( + error_index=0, + headers={}, + status_code=400, + message="bad", + operation_responses=[], + ) + exc._hedging_state = s + assert exc.is_hedging_started() is False + assert len(exc.get_requested_regions()) == 1 + + def test_cosmos_client_timeout_error_forwards(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + exc = CosmosClientTimeoutError() + exc._hedging_state = s + assert exc.is_hedging_started() is False + assert len(exc.get_requested_regions()) == 1 + + +# --------------------------------------------------------------------------- # +# AC8 — deepcopy regression test (SE-002). +# This test is the centerpiece guarantee that the closure-argument pattern +# survives the deepcopy inside ``execute_single_request_with_delay``. +# --------------------------------------------------------------------------- # + + +class TestDeepcopyRegression: + """AC8 / SE-002. + + The hedging handler deep-copies ``request_params`` at line 96. The state + MUST NOT live on ``request_params`` (a deepcopy would silently swallow + child appends); it MUST flow as a closure argument.""" + + def test_state_survives_request_params_deepcopy(self): + """Verifies the closure-argument pattern: state mutations made by + worker threads, even after a deepcopy of the request_params, land on + the SAME parent state object.""" + parent_state = _HedgingDetectionState() + + # Build a minimal RequestObject-like sentinel and deepcopy it the way + # the hedging handler does. + sentinel_params = SimpleNamespace( + data="payload", availability_strategy=None, + ) + copied_params = copy.deepcopy(sentinel_params) + # If we had naively stored ``parent_state`` on the params, the + # deepcopy would have produced an orphan copy: + if False: # pragma: no cover - illustrative only + sentinel_params.diagnostics = parent_state + copied = copy.deepcopy(sentinel_params) + assert copied.diagnostics is not parent_state # would fail SE-002 + + # Closure-argument pattern: the worker function captures state from + # the outer scope, so the deepcopy on params is irrelevant. + def worker(params): # noqa: ARG001 - matches handler signature + parent_state._record_request("East US", RequestedRegionReason.HEDGING) + + worker(copied_params) + worker(copied_params) + assert parent_state.is_hedging_started() is True + assert len(parent_state.get_requested_regions()) == 2 + + def test_handler_dispatches_state_through_closure_not_params(self): + """Exercises the real ``execute_single_request_with_delay`` and asserts + that state mutations land on the caller-provided state, even though + ``execute_single_request_with_delay`` deep-copies its ``request_params`` + before dispatching.""" + from azure.cosmos._availability_strategy_handler import CrossRegionHedgingHandler + + state = _HedgingDetectionState() + complete_status = Event() + first_holder = SimpleNamespace(request_params=None) + + # Build a minimal hedging-strategy + request-params + request stub. + strategy = SimpleNamespace(threshold_ms=0, threshold_steps_ms=0) + request_params = SimpleNamespace( + availability_strategy=strategy, + availability_strategy_executor=None, + is_hedging_request=False, + completion_status=None, + excluded_locations=None, + ) + + # Capture the deepcopied params that the worker actually dispatches. + observed = {} + + def execute_fn(params, req): + observed["params"] = params + # Worker still has access to the parent state via closure → record. + return ({"ok": True}, {"region": "East US"}) + + handler = CrossRegionHedgingHandler() + result = handler.execute_single_request_with_delay( + request_params=request_params, + request=SimpleNamespace(headers={}, url="x"), + execute_request_fn=execute_fn, + location_index=0, + available_locations=["East US", "West US"], + complete_status=complete_status, + first_request_params_holder=first_holder, + hedging_state=state, + ) + assert result == ({"ok": True}, {"region": "East US"}) + # The state MUST contain the INITIAL entry even though execute_fn + # received a *deepcopied* params object. + assert observed["params"] is not request_params # deepcopy happened + assert state.get_requested_regions() == ( + RequestedRegion("East US", RequestedRegionReason.INITIAL), + ) + assert state.get_responded_regions() == ("East US",) + + +# --------------------------------------------------------------------------- # +# AC2 / AC10 — no phantom entries from cancelled-before-delay hedge arms. +# --------------------------------------------------------------------------- # + + +class TestNoPhantomEntries: + """AC2 / AC10: a hedge arm whose threshold delay was interrupted by the + completion signal MUST NOT produce a HEDGING entry.""" + + def test_cancelled_before_delay_records_nothing(self): + from concurrent.futures import CancelledError as ConcurrentCancelled + + from azure.cosmos._availability_strategy_handler import CrossRegionHedgingHandler + + state = _HedgingDetectionState() + complete_status = Event() + complete_status.set() # pretend the primary already won + + first_holder = SimpleNamespace(request_params=None) + strategy = SimpleNamespace(threshold_ms=10, threshold_steps_ms=0) + request_params = SimpleNamespace( + availability_strategy=strategy, + availability_strategy_executor=None, + is_hedging_request=False, + completion_status=None, + excluded_locations=None, + ) + + handler = CrossRegionHedgingHandler() + + def should_not_run(params, req): + pytest.fail("execute_fn must not run when cancelled before delay") + + # Index 1 is a hedge arm. It should observe complete_status.is_set() + # AFTER its threshold sleep and raise CancelledError without recording. + with pytest.raises(ConcurrentCancelled): + handler.execute_single_request_with_delay( + request_params=request_params, + request=SimpleNamespace(headers={}, url="x"), + execute_request_fn=should_not_run, + location_index=1, + available_locations=["East US", "West US"], + complete_status=complete_status, + first_request_params_holder=first_holder, + hedging_state=state, + ) + assert state.get_requested_regions() == () + assert state.is_hedging_started() is False + + +# --------------------------------------------------------------------------- # +# AC9 — last_response_headers invariants preserved. +# --------------------------------------------------------------------------- # + + +class TestHeadersInvariants: + """AC9: the private sentinel key must not leak via get_response_headers / + last_response_headers semantics.""" + + def test_cosmos_dict_pops_sentinel_at_construction(self): + s = _HedgingDetectionState() + h = CaseInsensitiveDict({"etag": '"42"'}) + _attach_state_to_headers(h, s) + # State is in the headers dict before CosmosDict pops it. + assert HEDGING_STATE_HEADER_KEY in h + d = CosmosDict({"id": "x"}, response_headers=h) + # CosmosDict has popped it as part of init. + assert HEDGING_STATE_HEADER_KEY not in h + # The accessor still works because state is now on the CosmosDict. + assert d._hedging_state is s + + def test_pop_state_from_headers_is_safe_on_none(self): + assert _pop_state_from_headers(None) is None + + def test_pop_state_from_headers_returns_none_when_absent(self): + h = CaseInsensitiveDict() + assert _pop_state_from_headers(h) is None + + +# --------------------------------------------------------------------------- # +# AC12 — reason coverage in v1: never TRANSPORT_RETRY or CIRCUIT_BREAKER_PROBE. +# --------------------------------------------------------------------------- # + + +class TestReservedReasonsAbsence: + """AC12: TRANSPORT_RETRY and CIRCUIT_BREAKER_PROBE are reserved enum values + not emitted by Python today (SE-005).""" + + def test_reserved_reasons_never_emitted_in_state_writes(self): + """The state object accepts any enum value, but a repo-wide check + (see ``test_reserved_reasons_absent_from_production_code`` below) + guarantees no production call site can append these reasons.""" + # Demonstrates the API allows it (forward-compat), but production + # never does. The grep test below is the real enforcement. + s = _HedgingDetectionState() + s._record_request("X", RequestedRegionReason.TRANSPORT_RETRY) + assert s.get_requested_regions()[0].reason is RequestedRegionReason.TRANSPORT_RETRY + + def test_reserved_reasons_absent_from_production_code(self): + """Repo-wide grep over ``azure/cosmos/`` (excluding tests + comments) + returns zero production-code matches for TRANSPORT_RETRY or + CIRCUIT_BREAKER_PROBE.""" + import os + import re + + # Locate the production-code root. + here = os.path.dirname(os.path.abspath(__file__)) + prod_root = os.path.abspath(os.path.join(here, "..", "azure", "cosmos")) + assert os.path.isdir(prod_root), f"production root missing: {prod_root}" + + offenders = [] + for dirpath, _dirs, files in os.walk(prod_root): + for fn in files: + if not fn.endswith(".py"): + continue + # The definition file itself legitimately contains the names. + if fn in ("_diagnostics_types.py",): + continue + full = os.path.join(dirpath, fn) + with open(full, "r", encoding="utf-8") as fh: + for ln, line in enumerate(fh, start=1): + # Strip line comments and docstring boundary lines. + stripped = line.split("#", 1)[0] + if re.match(r"\s*('''|\"\"\")", stripped): + continue + for offender in ( + "RequestedRegionReason.TRANSPORT_RETRY", + "RequestedRegionReason.CIRCUIT_BREAKER_PROBE", + ): + if offender in stripped: + offenders.append((full, ln, offender, line.rstrip())) + assert offenders == [], ( + "Reserved RequestedRegionReason values must not be emitted by " + "production code in v1 (SE-005). Offenders: " + repr(offenders) + ) diff --git a/sdk/cosmos/azure-cosmos/tests/test_hedging_detection_async.py b/sdk/cosmos/azure-cosmos/tests/test_hedging_detection_async.py new file mode 100644 index 000000000000..ef2615e0d219 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_hedging_detection_async.py @@ -0,0 +1,393 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation + +"""Async twin of ``test_hedging_detection.py`` (AC9 — sync/async parity). + +This file mirrors the sync test module class-for-class. Tests that have no +async-specific behavior (pure state object, enum, accessors on wrapper / +exception types) are re-run here against ``CosmosAsyncItemPaged`` and the +async hedging handler to ensure the *async* dispatch path is also covered. +""" + +import asyncio +import copy +from types import SimpleNamespace + +import pytest + +from azure.cosmos import RequestedRegion, RequestedRegionReason +from azure.cosmos._cosmos_responses import CosmosDict, CosmosList +from azure.cosmos._diagnostics import ( + HEDGING_STATE_HEADER_KEY, + _attach_state_to_headers, + _HedgingDetectionState, + _pop_state_from_headers, +) +from azure.cosmos.exceptions import ( + CosmosBatchOperationError, + CosmosClientTimeoutError, + CosmosHttpResponseError, +) +from azure.core.utils import CaseInsensitiveDict + + +# --------------------------------------------------------------------------- # +# Direct-state tests (mirror sync TestHedgingDetectionState). # +# --------------------------------------------------------------------------- # + + +class TestHedgingDetectionStateAsync: + def test_empty_state_async(self): + s = _HedgingDetectionState() + assert s.is_hedging_started() is False + assert s.get_requested_regions() == () + assert s.get_responded_regions() == () + + def test_record_initial_does_not_set_hedging_flag_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + assert s.is_hedging_started() is False + + def test_record_hedging_sets_flag_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + s._record_request("West US", RequestedRegionReason.HEDGING) + assert s.is_hedging_started() is True + + def test_record_operation_retry_does_not_set_flag_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + s._record_request("East US", RequestedRegionReason.OPERATION_RETRY) + assert s.is_hedging_started() is False + + def test_responded_regions_preserves_duplicates_async(self): + s = _HedgingDetectionState() + s._record_response("East US") + s._record_response("East US") + s._record_response("West US") + assert s.get_responded_regions() == ("East US", "East US", "West US") + + def test_snapshots_are_tuples_not_aliases_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + snapshot = s.get_requested_regions() + assert isinstance(snapshot, tuple) + with pytest.raises(AttributeError): + snapshot.append("West US") # type: ignore[attr-defined] + + def test_thread_safety_under_concurrent_appends_async(self): + # The state's threading.Lock is the same lock used in the async path + # (asyncio dispatch ultimately runs the handler under a TaskGroup; the + # state object itself uses threading.Lock for thread/coroutine safety). + import threading + + s = _HedgingDetectionState() + + def writer(reason, region, n): + for _ in range(n): + s._record_request(region, reason) + s._record_response(region) + + threads = [ + threading.Thread(target=writer, args=(RequestedRegionReason.INITIAL, "A", 100)), + threading.Thread(target=writer, args=(RequestedRegionReason.HEDGING, "B", 100)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + assert len(s.get_requested_regions()) == 200 + assert s.is_hedging_started() is True + + +# --------------------------------------------------------------------------- # +# Wrapper / exception accessors. # +# --------------------------------------------------------------------------- # + + +class TestWrapperAccessorsAsync: + def test_cosmos_dict_with_state_attached_via_headers_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + s._record_request("West US", RequestedRegionReason.HEDGING) + s._record_response("West US") + h = CaseInsensitiveDict({"x-ms-foo": "bar"}) + _attach_state_to_headers(h, s) + d = CosmosDict({"id": "abc"}, response_headers=h) + assert d.is_hedging_started() is True + assert len(d.get_requested_regions()) == 2 + assert d.get_responded_regions() == ("West US",) + assert HEDGING_STATE_HEADER_KEY not in dict(d.get_response_headers()) + + def test_cosmos_dict_without_state_returns_safe_defaults_async(self): + d = CosmosDict({"id": "abc"}, response_headers=CaseInsensitiveDict()) + assert d.is_hedging_started() is False + assert d.get_requested_regions() == () + assert d.get_responded_regions() == () + + def test_cosmos_list_forwards_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + h = CaseInsensitiveDict() + _attach_state_to_headers(h, s) + lst = CosmosList([{"id": "1"}], response_headers=h) + assert len(lst.get_requested_regions()) == 1 + + def test_cosmos_http_response_error_forwards_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + s._record_request("West US", RequestedRegionReason.HEDGING) + exc = CosmosHttpResponseError(status_code=503, message="boom") + exc._hedging_state = s + assert exc.is_hedging_started() is True + assert len(exc.get_requested_regions()) == 2 + + def test_cosmos_batch_operation_error_forwards_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + exc = CosmosBatchOperationError( + error_index=0, headers={}, status_code=400, message="bad", + operation_responses=[], + ) + exc._hedging_state = s + assert len(exc.get_requested_regions()) == 1 + + def test_cosmos_client_timeout_error_forwards_async(self): + s = _HedgingDetectionState() + s._record_request("East US", RequestedRegionReason.INITIAL) + exc = CosmosClientTimeoutError() + exc._hedging_state = s + assert len(exc.get_requested_regions()) == 1 + + +# --------------------------------------------------------------------------- # +# AC8 — deepcopy regression test against the async handler. # +# --------------------------------------------------------------------------- # + + +class TestDeepcopyRegressionAsync: + @pytest.mark.asyncio + async def test_state_survives_request_params_deepcopy_async(self): + parent_state = _HedgingDetectionState() + sentinel_params = SimpleNamespace(data="payload", availability_strategy=None) + copied = copy.deepcopy(sentinel_params) + + async def worker(params): # noqa: ARG001 + parent_state._record_request("East US", RequestedRegionReason.HEDGING) + + await worker(copied) + await worker(copied) + assert parent_state.is_hedging_started() is True + assert len(parent_state.get_requested_regions()) == 2 + + @pytest.mark.asyncio + async def test_handler_dispatches_state_through_closure_not_params_async(self): + from azure.cosmos.aio._asynchronous_availability_strategy_handler import ( + CrossRegionAsyncHedgingHandler, + ) + + state = _HedgingDetectionState() + complete_status = asyncio.Event() + first_holder = SimpleNamespace(request_params=None) + + strategy = SimpleNamespace(threshold_ms=0, threshold_steps_ms=0) + request_params = SimpleNamespace( + availability_strategy=strategy, + availability_strategy_executor=None, + is_hedging_request=False, + completion_status=None, + excluded_locations=None, + ) + + observed = {} + + async def execute_fn(params, req): + observed["params"] = params + return ({"ok": True}, {"region": "East US"}) + + handler = CrossRegionAsyncHedgingHandler() + result = await handler.execute_single_request_with_delay( + request_params=request_params, + request=SimpleNamespace(headers={}, url="x"), + execute_request_fn=execute_fn, + location_index=0, + available_locations=["East US", "West US"], + complete_status=complete_status, + first_request_params_holder=first_holder, + hedging_state=state, + ) + assert result == ({"ok": True}, {"region": "East US"}) + assert observed["params"] is not request_params + assert state.get_requested_regions() == ( + RequestedRegion("East US", RequestedRegionReason.INITIAL), + ) + assert state.get_responded_regions() == ("East US",) + + +# --------------------------------------------------------------------------- # +# AC2 / AC10 — no phantom entries when async hedge arm is cancelled. # +# --------------------------------------------------------------------------- # + + +class TestNoPhantomEntriesAsync: + @pytest.mark.asyncio + async def test_cancelled_before_delay_records_nothing_async(self): + from azure.cosmos.aio._asynchronous_availability_strategy_handler import ( + CrossRegionAsyncHedgingHandler, + ) + + state = _HedgingDetectionState() + complete_status = asyncio.Event() + complete_status.set() + + first_holder = SimpleNamespace(request_params=None) + strategy = SimpleNamespace(threshold_ms=10, threshold_steps_ms=0) + request_params = SimpleNamespace( + availability_strategy=strategy, + availability_strategy_executor=None, + is_hedging_request=False, + completion_status=None, + excluded_locations=None, + ) + + async def should_not_run(params, req): + raise AssertionError("execute_fn must not run when cancelled before delay") + + handler = CrossRegionAsyncHedgingHandler() + with pytest.raises((asyncio.CancelledError, BaseException)) as ei: + await handler.execute_single_request_with_delay( + request_params=request_params, + request=SimpleNamespace(headers={}, url="x"), + execute_request_fn=should_not_run, + location_index=1, + available_locations=["East US", "West US"], + complete_status=complete_status, + first_request_params_holder=first_holder, + hedging_state=state, + ) + # Whatever the handler raises, the state must have NO entries. + assert state.get_requested_regions() == () + assert state.is_hedging_started() is False + + +# --------------------------------------------------------------------------- # +# Headers invariants (mirrors sync). # +# --------------------------------------------------------------------------- # + + +class TestHeadersInvariantsAsync: + def test_cosmos_dict_pops_sentinel_at_construction_async(self): + s = _HedgingDetectionState() + h = CaseInsensitiveDict({"etag": '"42"'}) + _attach_state_to_headers(h, s) + assert HEDGING_STATE_HEADER_KEY in h + d = CosmosDict({"id": "x"}, response_headers=h) + assert HEDGING_STATE_HEADER_KEY not in h + assert d._hedging_state is s + + def test_pop_state_from_headers_is_safe_on_none_async(self): + assert _pop_state_from_headers(None) is None + + def test_pop_state_from_headers_returns_none_when_absent_async(self): + h = CaseInsensitiveDict() + assert _pop_state_from_headers(h) is None + + +# --------------------------------------------------------------------------- # +# AC12 (mirrors sync). # +# --------------------------------------------------------------------------- # + + +class TestReservedReasonsAbsenceAsync: + def test_reserved_reasons_never_emitted_in_state_writes_async(self): + s = _HedgingDetectionState() + s._record_request("X", RequestedRegionReason.TRANSPORT_RETRY) + assert s.get_requested_regions()[0].reason is RequestedRegionReason.TRANSPORT_RETRY + + def test_reserved_reasons_absent_from_production_code_async(self): + """Same repo-wide guarantee as the sync test, executed via the async + suite so it runs in CI even when the sync test is skipped.""" + import os + import re + + here = os.path.dirname(os.path.abspath(__file__)) + prod_root = os.path.abspath(os.path.join(here, "..", "azure", "cosmos")) + assert os.path.isdir(prod_root) + + offenders = [] + for dirpath, _dirs, files in os.walk(prod_root): + for fn in files: + if not fn.endswith(".py") or fn == "_diagnostics_types.py": + continue + full = os.path.join(dirpath, fn) + with open(full, "r", encoding="utf-8") as fh: + for ln, line in enumerate(fh, start=1): + stripped = line.split("#", 1)[0] + if re.match(r"\s*('''|\"\"\")", stripped): + continue + for offender in ( + "RequestedRegionReason.TRANSPORT_RETRY", + "RequestedRegionReason.CIRCUIT_BREAKER_PROBE", + ): + if offender in stripped: + offenders.append((full, ln, offender)) + assert offenders == [], repr(offenders) + + +# --------------------------------------------------------------------------- # +# AC9 — sync↔async parity checker. # +# --------------------------------------------------------------------------- # + + +class TestSyncAsyncParity: + """Every test class in the sync module has a counterpart here, and every + test method has a name-matching ``_async`` twin.""" + + def test_parity_class_coverage(self): + import importlib + sync_mod = importlib.import_module("test_hedging_detection") + async_mod = importlib.import_module("test_hedging_detection_async") + + sync_classes = { + n for n in dir(sync_mod) + if n.startswith("Test") and isinstance(getattr(sync_mod, n), type) + } + async_classes = { + n for n in dir(async_mod) + if n.startswith("Test") and isinstance(getattr(async_mod, n), type) + } + # Async module is allowed extras (TestSyncAsyncParity, *Async suffix) + # but every sync class must have an Async counterpart. + missing = [] + for sn in sync_classes: + if sn + "Async" not in async_classes: + missing.append(sn) + assert missing == [], ( + f"Sync test classes without Async counterpart: {missing}. " + "Add a mirror class named 'Async' in test_hedging_detection_async.py." + ) + + def test_parity_method_coverage(self): + import importlib + sync_mod = importlib.import_module("test_hedging_detection") + async_mod = importlib.import_module("test_hedging_detection_async") + + missing = [] + for sn in dir(sync_mod): + if not sn.startswith("Test"): + continue + scls = getattr(sync_mod, sn) + if not isinstance(scls, type): + continue + acls_name = sn + "Async" + acls = getattr(async_mod, acls_name, None) + if acls is None: + continue # already reported by the class-coverage test + sync_tests = {m for m in dir(scls) if m.startswith("test_")} + async_tests = {m for m in dir(acls) if m.startswith("test_")} + for m in sync_tests: + if (m + "_async") not in async_tests: + missing.append(f"{sn}.{m}") + assert missing == [], ( + "Sync test methods without _async twin: " + repr(missing) + ) From bc79261db99a295e814ef1547cf3ba6003e915b5 Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 15:49:45 -0700 Subject: [PATCH 3/4] test(cosmos): rename scls/acls to sync_cls/async_cls to satisfy cspell The parity-coverage test used cryptic short names `scls` and `acls` for `sync_cls` and `async_cls`. `scls` is flagged as a spelling error by cspell on the build-analyze CI job. Rename to the spelled-out form which is both more readable and cspell-clean. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/test_hedging_detection_async.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/test_hedging_detection_async.py b/sdk/cosmos/azure-cosmos/tests/test_hedging_detection_async.py index ef2615e0d219..7acc29db4254 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_hedging_detection_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_hedging_detection_async.py @@ -376,15 +376,15 @@ def test_parity_method_coverage(self): for sn in dir(sync_mod): if not sn.startswith("Test"): continue - scls = getattr(sync_mod, sn) - if not isinstance(scls, type): + sync_cls = getattr(sync_mod, sn) + if not isinstance(sync_cls, type): continue - acls_name = sn + "Async" - acls = getattr(async_mod, acls_name, None) - if acls is None: + async_cls_name = sn + "Async" + async_cls = getattr(async_mod, async_cls_name, None) + if async_cls is None: continue # already reported by the class-coverage test - sync_tests = {m for m in dir(scls) if m.startswith("test_")} - async_tests = {m for m in dir(acls) if m.startswith("test_")} + sync_tests = {m for m in dir(sync_cls) if m.startswith("test_")} + async_tests = {m for m in dir(async_cls) if m.startswith("test_")} for m in sync_tests: if (m + "_async") not in async_tests: missing.append(f"{sn}.{m}") From 520e3fa155a0948fe1098b4e3063f23e9ff9835d Mon Sep 17 00:00:00 2001 From: Nalu Tripician <27316859+NaluTripician@users.noreply.github.com> Date: Thu, 14 May 2026 15:50:02 -0700 Subject: [PATCH 4/4] fix(cosmos): satisfy build-analyze pylint on hedging-detection wiring Address the 20 pylint findings flagged by the Build_Analyze CI job on the hedging-detection PR: * Remove no-op `try: ... except Exception: raise` blocks in `CrossRegionHedgingHandler.execute_single_request_with_delay` (sync + async). The bare reraise added nothing and triggered W0706 try-except-raise. The post-call `_record_response` logic still only runs on the success path, since an in-flight exception now propagates directly through the function. * Remove the unused `RequestedRegionReason` import from `aio/_retry_utility_async.py`. The async retry utility delegates retry-diagnostics recording to `_record_retry_diagnostics_and_attach` in the sync module, which owns the import. * Add full Sphinx-style docstrings (`:param`/`:type`/`:returns`/ `:rtype`) to every new hedging-detection internal flagged for missing doc elements: `CosmosItemPaged._copy_headers_stripped` (+ its async twin so they stay symmetric), `_HedgingDetectionState._record_request`, `_HedgingDetectionState._record_response`, `_HedgingDetectionAccessorsMixin.is_hedging_started`, `_HedgingDetectionAccessorsMixin.get_requested_regions`, `_HedgingDetectionAccessorsMixin.get_responded_regions`, `_attach_state_to_headers`, `_pop_state_from_headers`, `_record_retry_diagnostics_and_attach`, and `_resolve_region_name`. No public API surface change; only docstrings and the deletion of two inert try/except blocks. All 44 hedging-detection unit tests still pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../cosmos/_availability_strategy_handler.py | 5 +- .../azure/cosmos/_cosmos_responses.py | 22 ++++++++- .../azure-cosmos/azure/cosmos/_diagnostics.py | 47 +++++++++++++++++-- .../azure/cosmos/_retry_utility.py | 24 ++++++++++ .../azure/cosmos/_synchronized_request.py | 13 ++++- ...nchronous_availability_strategy_handler.py | 5 +- .../azure/cosmos/aio/_retry_utility_async.py | 1 - 7 files changed, 103 insertions(+), 14 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_handler.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_handler.py index 16b9998dac08..10ec9412bb91 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_handler.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_availability_strategy_handler.py @@ -139,10 +139,7 @@ def execute_single_request_with_delay( ) hedging_state._record_request(region_name, reason) # pylint: disable=protected-access - try: - result = execute_request_fn(params, req) - except Exception: - raise + result = execute_request_fn(params, req) # Record responding region on success. Exceptions still reach the # parent ``execute_request`` which decides which arm wins; per-arm # error responses are recorded by the retry utility before the diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py index 708a3c28d44c..865ef64dac1f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py @@ -87,7 +87,16 @@ def _refresh_hedging_state_from_pages(self) -> None: @staticmethod def _copy_headers_stripped(headers: Optional[CaseInsensitiveDict]) -> CaseInsensitiveDict: """Return a copy of ``headers`` with the private hedging-state sentinel - key removed so customer code never sees it.""" + key removed so customer code never sees it. + + :param headers: Original response-headers dict (or ``None``). + :type headers: Optional[~azure.core.CaseInsensitiveDict] + :returns: A new :class:`~azure.core.CaseInsensitiveDict` containing the + same entries as ``headers`` minus the private hedging-state + sentinel key. An empty dict is returned when ``headers`` is + ``None``. + :rtype: ~azure.core.CaseInsensitiveDict + """ if headers is None: return CaseInsensitiveDict() from ._diagnostics import HEDGING_STATE_HEADER_KEY @@ -163,6 +172,17 @@ def _refresh_hedging_state_from_pages(self) -> None: @staticmethod def _copy_headers_stripped(headers: Optional[CaseInsensitiveDict]) -> CaseInsensitiveDict: + """Return a copy of ``headers`` with the private hedging-state sentinel + key removed so customer code never sees it. + + :param headers: Original response-headers dict (or ``None``). + :type headers: Optional[~azure.core.CaseInsensitiveDict] + :returns: A new :class:`~azure.core.CaseInsensitiveDict` containing the + same entries as ``headers`` minus the private hedging-state + sentinel key. An empty dict is returned when ``headers`` is + ``None``. + :rtype: ~azure.core.CaseInsensitiveDict + """ if headers is None: return CaseInsensitiveDict() from ._diagnostics import HEDGING_STATE_HEADER_KEY diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics.py index 4b843bf85c75..36e5124ed7ad 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_diagnostics.py @@ -82,6 +82,14 @@ def __init__(self) -> None: def _record_request(self, region_name: str, reason: RequestedRegionReason) -> None: """Append a dispatched-region entry. Sets the hedging-started flag atomically when reason is ``HEDGING`` (compound update guarded by lock). + + :param region_name: Human-readable name of the region the request was + dispatched to. Empty string when the endpoint cannot be resolved + to a friendly region name (e.g., the emulator). + :type region_name: str + :param reason: Why the region was dispatched (initial attempt, + hedge arm, region failover retry, same-region retry, etc.). + :type reason: ~azure.cosmos._diagnostics_types.RequestedRegionReason """ if region_name is None: region_name = "" @@ -94,7 +102,13 @@ def _record_request(self, region_name: str, reason: RequestedRegionReason) -> No def _record_response(self, region_name: str) -> None: """Append a responding-region entry. Duplicates are intentional — the same region may produce multiple responses (e.g., a late response - after a hedge winner).""" + after a hedge winner). + + :param region_name: Human-readable name of the region that produced + a response. Empty string when the endpoint cannot be resolved to a + friendly region name (e.g., the emulator). + :type region_name: str + """ if region_name is None: region_name = "" with self._lock: @@ -146,6 +160,10 @@ def is_hedging_started(self) -> bool: ``False`` does NOT mean hedging is disabled or misconfigured. To check whether hedging is configured on the client, inspect the client-level ``availability_strategy`` configuration. + + :returns: ``True`` if at least one hedge-arm dispatch fired for this + operation; ``False`` otherwise. + :rtype: bool """ state = self._hedging_state if state is None: @@ -178,6 +196,10 @@ def get_requested_regions(self) -> Tuple[RequestedRegion, ...]: :attr:`~RequestedRegionReason.UNKNOWN` and unknown values defensively (``_missing_`` returns :attr:`~RequestedRegionReason.UNKNOWN` for future enum members). + + :returns: A tuple of :class:`RequestedRegion` entries in dispatch + order. Empty when no state was attached to the operation. + :rtype: Tuple[~azure.cosmos.RequestedRegion, ...] """ state = self._hedging_state if state is None: @@ -194,6 +216,10 @@ def get_responded_regions(self) -> Tuple[str, ...]: imply that more than one distinct region responded. For unique regions in arrival order use ``tuple(dict.fromkeys(resp.get_responded_regions()))``; for an unordered set use ``set(resp.get_responded_regions())``. + + :returns: A tuple of region-name strings in arrival order; may contain + duplicates. Empty when no state was attached to the operation. + :rtype: Tuple[str, ...] """ state = self._hedging_state if state is None: @@ -206,7 +232,14 @@ def _attach_state_to_headers(headers, state): under the module-private sentinel key. The wrapper-type ``__init__`` pops this key so customers never see it on ``get_response_headers()``. - No-op if either argument is falsy.""" + No-op if either argument is falsy. + + :param headers: Response-headers dict to mutate in place. Typically a + ``CaseInsensitiveDict`` produced by the synchronized request layer. + :type headers: Optional[MutableMapping[str, Any]] + :param state: Per-operation hedging-detection state to attach. + :type state: Optional[_HedgingDetectionState] + """ if headers is None or state is None: return try: @@ -220,7 +253,15 @@ def _attach_state_to_headers(headers, state): def _pop_state_from_headers(headers): """Helper used by every wrapper type's ``__init__``: pop and return the private state sentinel from a response-headers dict. Returns ``None`` if - no state is attached. Also accepts ``None`` headers safely.""" + no state is attached. Also accepts ``None`` headers safely. + + :param headers: Response-headers dict to pop the sentinel from. May be + ``None``, in which case ``None`` is returned without raising. + :type headers: Optional[MutableMapping[str, Any]] + :returns: The previously attached :class:`_HedgingDetectionState`, or + ``None`` if none was attached. + :rtype: Optional[_HedgingDetectionState] + """ if headers is None: return None try: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py index 8d8e970d8099..ddd7a74a6541 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py @@ -333,6 +333,30 @@ def _record_retry_diagnostics_and_attach( be empty when the endpoint cannot be resolved to a friendly name. Never raises — diagnostics must not break the hot path. + + :param hedging_state: Per-operation hedging-detection state. ``None`` for + callers that have not opted into hedging-detection, in which case this + function is a no-op. + :type hedging_state: Optional[_HedgingDetectionState] + :param retry_policy: The retry policy instance that produced the decision + being recorded. Its class name selects the reason mapping above. + :type retry_policy: object + :param exception: The :class:`CosmosHttpResponseError` (or other + AzureError) that triggered the retry decision. When + ``attached_on_raise`` is ``True``, the hedging state is attached to + this exception so error-path consumers can surface it. + :type exception: BaseException + :param args: Positional arguments forwarded to + :func:`_retry_utility.Execute`. ``args[0]`` is the + :class:`~azure.cosmos._request_object.RequestObject` whose + post-``ShouldRetry`` endpoint is read for region resolution. + :type args: tuple + :param attached_on_raise: When ``True``, the retry utility is re-raising + the exception, so the state is attached to it for surfacing to error + consumers (``CosmosHttpResponseError.get_requested_regions()``). + When ``False``, the retry utility is looping, so a new + :class:`RequestedRegion` entry is appended to the state. + :type attached_on_raise: bool """ if hedging_state is None: return diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py index 5f83de57669a..74baa99ede50 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py @@ -345,7 +345,18 @@ def _resolve_region_name(global_endpoint_manager, request_params) -> str: """Best-effort resolution of the human-readable region name for the endpoint currently selected on ``request_params``. Returns an empty string if the name cannot be resolved (e.g., emulator, missing routing context). - Never raises — the hot path must not break for diagnostics.""" + Never raises — the hot path must not break for diagnostics. + + :param global_endpoint_manager: The client's global endpoint manager. + Used to map the resolved endpoint URL to a friendly region name. + :type global_endpoint_manager: object + :param request_params: The per-request :class:`RequestObject` whose + currently selected endpoint should be resolved. + :type request_params: ~azure.cosmos._request_object.RequestObject + :returns: The friendly region name (e.g., ``"East US"``) for the endpoint + currently routed to, or an empty string when no mapping is available. + :rtype: str + """ try: endpoint = getattr(request_params, "location_endpoint_to_route", None) if endpoint and hasattr(global_endpoint_manager, "get_region_name"): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_availability_strategy_handler.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_availability_strategy_handler.py index d8b111068fa9..0765ddd99510 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_availability_strategy_handler.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_availability_strategy_handler.py @@ -146,10 +146,7 @@ async def execute_single_request_with_delay( ) hedging_state._record_request(region_name, reason) # pylint: disable=protected-access - try: - result = await execute_request_fn(params, req) - except Exception: - raise + result = await execute_request_fn(params, req) if hedging_state is not None and 0 <= location_index < len(available_locations): hedging_state._record_response(available_locations[location_index]) # pylint: disable=protected-access return result diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py index cee97649e3b5..2a6e0c82e7f2 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py @@ -43,7 +43,6 @@ from .. import exceptions from .._constants import _Constants from .._container_recreate_retry_policy import ContainerRecreateRetryPolicy -from .._diagnostics_types import RequestedRegionReason from .._request_object import RequestObject from .._retry_utility import (_configure_timeout, _has_read_retryable_headers, _handle_service_response_retries, _handle_service_request_retries,