Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.16.0b3 (Unreleased)

#### Features Added
* Added the hedging-detection API: three accessor methods (`is_hedging_started()`, `get_requested_regions()`, `get_responded_regions()`) on `CosmosDict`, `CosmosList`, `CosmosItemPaged`, `CosmosAsyncItemPaged`, `CosmosHttpResponseError`, `CosmosBatchOperationError`, and `CosmosClientTimeoutError`, plus the new public types `RequestedRegion` and `RequestedRegionReason`. Lets callers detect post-hoc whether a cross-region hedge was dispatched, which regions were dispatched to (with reason), and which regions responded. `RequestedRegionReason` is a non-exhaustive enum — unknown values map to `UNKNOWN` via `_missing_` for forward compatibility. Cross-SDK; matches the same shape shipped by the .NET / Java SDKs. See [issue 46899](https://github.com/Azure/azure-sdk-for-python/issues/46899).

#### Breaking Changes

Expand Down
5 changes: 4 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from ._version import VERSION
from ._cosmos_responses import CosmosDict, CosmosList
from ._diagnostics_types import RequestedRegion, RequestedRegionReason
from ._retry_utility import ConnectionRetryPolicy
from .container import ContainerProxy
from .cosmos_client import CosmosClient
Expand Down Expand Up @@ -66,6 +67,8 @@
"ConnectionRetryPolicy",
"ThroughputProperties",
"CosmosDict",
"CosmosList"
"CosmosList",
"RequestedRegion",
"RequestedRegionReason",
)
__version__ = VERSION
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from azure.core.pipeline.transport import HttpRequest # pylint: disable=no-legacy-azure-core-http-response-import

from ._availability_strategy_handler_base import AvailabilityStrategyHandlerMixin
from ._diagnostics import _HedgingDetectionState
from ._diagnostics_types import RequestedRegionReason
from ._global_partition_endpoint_manager_circuit_breaker import _GlobalPartitionEndpointManagerForCircuitBreaker
from ._request_object import RequestObject

Expand All @@ -50,7 +52,8 @@ def execute_single_request_with_delay(
location_index: int,
available_locations: List[str],
complete_status: Event,
first_request_params_holder: SimpleNamespace
first_request_params_holder: SimpleNamespace,
hedging_state: Optional[_HedgingDetectionState] = None,
) -> ResponseType:
"""Execute a single request.

Expand All @@ -68,6 +71,10 @@ def execute_single_request_with_delay(
:type complete_status: threading.Event
:param first_request_params_holder: A value holder for request object for first/initial request
:type first_request_params_holder: SimpleNamespace
:param hedging_state: Per-operation hedging detection state, passed as a
closure argument (NOT on ``request_params`` — see SE-002; the
deepcopy at line 96 below would silently swallow child appends).
:type hedging_state: Optional[_HedgingDetectionState]
:returns: Response tuple
:rtype: ResponseType
"""
Expand All @@ -92,6 +99,16 @@ def execute_single_request_with_delay(
if delay > 0:
time.sleep(delay / 1000)

# ---- Hedging-detection recording (Option B) --------------------- #
# IMPORTANT: this append happens AFTER the threshold delay sleep AND
# AFTER the cancellation check below. The contract is "dispatched, not
# necessarily wire-issued" — see SE-013 and public spec §5.3 docstring
# on ``get_requested_regions``. A hedge-arm task that is cancelled
# before its delay elapses unwinds out of ``time.sleep`` (or the
# subsequent cancellation check) before reaching this line, so no
# phantom HEDGING entry is recorded (AC10).
# ----------------------------------------------------------------- #

# Create request parameters for this location
params = copy.deepcopy(request_params)
params.is_hedging_request = location_index > 0
Expand All @@ -111,14 +128,33 @@ def execute_single_request_with_delay(
if complete_status.is_set():
raise CancelledError("The request has been cancelled")

return execute_request_fn(params, req)
# Record dispatch intent now that we have committed to issuing
# (post-delay, post-cancellation-check). ``INITIAL`` for index 0,
# ``HEDGING`` for hedge arms (index > 0).
if hedging_state is not None and 0 <= location_index < len(available_locations):
region_name = available_locations[location_index]
reason = (
RequestedRegionReason.INITIAL if location_index == 0
else RequestedRegionReason.HEDGING
)
hedging_state._record_request(region_name, reason) # pylint: disable=protected-access

result = execute_request_fn(params, req)
# Record responding region on success. Exceptions still reach the
# parent ``execute_request`` which decides which arm wins; per-arm
# error responses are recorded by the retry utility before the
# exception propagates here.
if hedging_state is not None and 0 <= location_index < len(available_locations):
hedging_state._record_response(available_locations[location_index]) # pylint: disable=protected-access
return result

def execute_request(
self,
request_params: RequestObject,
global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreaker,
request: HttpRequest,
execute_request_fn: Callable[..., ResponseType]
execute_request_fn: Callable[..., ResponseType],
hedging_state: Optional[_HedgingDetectionState] = None,
) -> ResponseType:
"""Execute request with cross-region hedging strategy.

Expand All @@ -130,6 +166,9 @@ def execute_request(
:type request: HttpRequest
:param execute_request_fn: Function to execute the actual request
:type execute_request_fn: Callable[..., ResponseType]
:param hedging_state: Per-operation hedging detection state, passed as a
closure argument (NOT on ``request_params`` — see SE-002).
:type hedging_state: Optional[_HedgingDetectionState]
:returns: A tuple containing the response data and headers
:rtype: Tuple[Dict[str, Any], Dict[str, Any]]
:raises: Exception from first request if all requests fail with transient errors
Expand All @@ -155,7 +194,8 @@ def execute_request(
location_index=i,
available_locations=available_locations,
complete_status=completion_status,
first_request_params_holder=first_request_params_holder
first_request_params_holder=first_request_params_holder,
hedging_state=hedging_state,
)
futures.append(future)
if i == 0:
Expand Down Expand Up @@ -211,7 +251,8 @@ def execute_with_hedging(
request_params: RequestObject,
global_endpoint_manager: _GlobalPartitionEndpointManagerForCircuitBreaker,
request: HttpRequest,
execute_request_fn: Callable[..., ResponseType]
execute_request_fn: Callable[..., ResponseType],
hedging_state: Optional[_HedgingDetectionState] = None,
) -> ResponseType:
"""Execute a request with hedging based on the availability strategy.

Expand All @@ -223,6 +264,11 @@ def execute_with_hedging(
:type request: HttpRequest
:param execute_request_fn: Function to execute the actual request
:type execute_request_fn: Callable[..., ResponseType]
:param hedging_state: Per-operation hedging detection state, passed as a
closure argument (NOT on ``request_params`` — see SE-002). When ``None``
no diagnostics are recorded; the hot path is unchanged for the
diagnostics-disabled case.
:type hedging_state: Optional[_HedgingDetectionState]
:returns: A tuple containing the response data and headers
:rtype: Tuple[Dict[str, Any], Dict[str, Any]]
:raises: Any exceptions raised by the hedging handler's execute_request method
Expand All @@ -231,5 +277,6 @@ def execute_with_hedging(
request_params,
global_endpoint_manager,
request,
execute_request_fn
execute_request_fn,
hedging_state=hedging_state,
)
117 changes: 107 additions & 10 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,30 @@
from azure.core.paging import ItemPaged
from azure.core.utils import CaseInsensitiveDict

from ._diagnostics import _HedgingDetectionAccessorsMixin, _pop_state_from_headers

class CosmosItemPaged(ItemPaged[dict[str, Any]]):

class CosmosItemPaged(_HedgingDetectionAccessorsMixin, ItemPaged[dict[str, Any]]):
"""A custom ItemPaged class that provides access to response headers from query operations.

This class wraps the standard ItemPaged and provides thread-safe access to response
headers captured during pagination via a shared list populated by __QueryFeed.

It also exposes three hedging-detection accessors inherited from
:class:`~azure.cosmos._diagnostics._HedgingDetectionAccessorsMixin`:
:meth:`is_hedging_started`, :meth:`get_requested_regions`, and
:meth:`get_responded_regions`. For paged operations the accessors reflect
the **most recently fetched** page; pre-fetch they return safe defaults
(``False`` / empty tuples).
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
self._response_headers_list: Optional[List[CaseInsensitiveDict]] = kwargs.pop('response_headers_list', None)
super().__init__(*args, **kwargs)
self._query_iterable: Optional[Any] = None
# Hedging-detection state — refreshed as pages are fetched (see
# ``get_response_headers`` below); None until the first page lands.
self._hedging_state = None

def by_page(self, continuation_token: Optional[str] = None) -> Iterator[Iterator[dict[str, Any]]]:
"""Get an iterator of pages of objects.
Expand All @@ -41,7 +53,11 @@ def get_response_headers(self) -> List[CaseInsensitiveDict]:
:rtype: List[~azure.core.utils.CaseInsensitiveDict]
"""
if self._response_headers_list is not None:
return [h.copy() for h in self._response_headers_list]
# Refresh latest hedging state from the most recent page (if attached),
# then return defensive copies with the sentinel key stripped so
# customers never see the private state on the headers dict.
self._refresh_hedging_state_from_pages()
return [self._copy_headers_stripped(h) for h in self._response_headers_list]
return []

def get_last_response_headers(self) -> CaseInsensitiveDict:
Expand All @@ -51,21 +67,62 @@ def get_last_response_headers(self) -> CaseInsensitiveDict:
:rtype: ~azure.core.utils.CaseInsensitiveDict
"""
if self._response_headers_list and len(self._response_headers_list) > 0:
return self._response_headers_list[-1].copy()
self._refresh_hedging_state_from_pages()
return self._copy_headers_stripped(self._response_headers_list[-1])
return CaseInsensitiveDict()


class CosmosAsyncItemPaged(AsyncItemPaged[dict[str, Any]]):
def _refresh_hedging_state_from_pages(self) -> None:
"""Internal: scan pages from newest to oldest and update
``self._hedging_state`` with the most recent attached state, if any.
Does not mutate the underlying headers dicts."""
if not self._response_headers_list:
return
for h in reversed(self._response_headers_list):
from ._diagnostics import HEDGING_STATE_HEADER_KEY
state = h.get(HEDGING_STATE_HEADER_KEY) if h is not None else None
if state is not None:
self._hedging_state = state
return

@staticmethod
def _copy_headers_stripped(headers: Optional[CaseInsensitiveDict]) -> CaseInsensitiveDict:
"""Return a copy of ``headers`` with the private hedging-state sentinel
key removed so customer code never sees it.

:param headers: Original response-headers dict (or ``None``).
:type headers: Optional[~azure.core.CaseInsensitiveDict]
:returns: A new :class:`~azure.core.CaseInsensitiveDict` containing the
same entries as ``headers`` minus the private hedging-state
sentinel key. An empty dict is returned when ``headers`` is
``None``.
:rtype: ~azure.core.CaseInsensitiveDict
"""
if headers is None:
return CaseInsensitiveDict()
from ._diagnostics import HEDGING_STATE_HEADER_KEY
copied = headers.copy()
try:
copied.pop(HEDGING_STATE_HEADER_KEY, None)
except (TypeError, AttributeError): # pragma: no cover
pass
return copied


class CosmosAsyncItemPaged(_HedgingDetectionAccessorsMixin, AsyncItemPaged[dict[str, Any]]):
"""A custom AsyncItemPaged class that provides access to response headers from async query operations.

This class wraps the standard AsyncItemPaged and provides thread-safe access to response
headers captured during pagination via a shared list populated by __QueryFeed.

Also exposes the three hedging-detection accessors inherited from
:class:`~azure.cosmos._diagnostics._HedgingDetectionAccessorsMixin`.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
self._response_headers_list: Optional[List[CaseInsensitiveDict]] = kwargs.pop('response_headers_list', None)
super().__init__(*args, **kwargs)
self._query_iterable: Optional[Any] = None
self._hedging_state = None

def by_page(self, continuation_token: Optional[str] = None) -> AsyncIterator[AsyncIterator[dict[str, Any]]]:
"""Get an async iterator of pages of objects.
Expand All @@ -88,7 +145,8 @@ def get_response_headers(self) -> List[CaseInsensitiveDict]:
:rtype: List[~azure.core.utils.CaseInsensitiveDict]
"""
if self._response_headers_list is not None:
return [h.copy() for h in self._response_headers_list]
self._refresh_hedging_state_from_pages()
return [self._copy_headers_stripped(h) for h in self._response_headers_list]
return []

def get_last_response_headers(self) -> CaseInsensitiveDict:
Expand All @@ -98,15 +156,53 @@ def get_last_response_headers(self) -> CaseInsensitiveDict:
:rtype: ~azure.core.utils.CaseInsensitiveDict
"""
if self._response_headers_list and len(self._response_headers_list) > 0:
return self._response_headers_list[-1].copy()
self._refresh_hedging_state_from_pages()
return self._copy_headers_stripped(self._response_headers_list[-1])
return CaseInsensitiveDict()


class CosmosDict(dict[str, Any]):
def _refresh_hedging_state_from_pages(self) -> None:
if not self._response_headers_list:
return
for h in reversed(self._response_headers_list):
from ._diagnostics import HEDGING_STATE_HEADER_KEY
state = h.get(HEDGING_STATE_HEADER_KEY) if h is not None else None
if state is not None:
self._hedging_state = state
return

@staticmethod
def _copy_headers_stripped(headers: Optional[CaseInsensitiveDict]) -> CaseInsensitiveDict:
"""Return a copy of ``headers`` with the private hedging-state sentinel
key removed so customer code never sees it.

:param headers: Original response-headers dict (or ``None``).
:type headers: Optional[~azure.core.CaseInsensitiveDict]
:returns: A new :class:`~azure.core.CaseInsensitiveDict` containing the
same entries as ``headers`` minus the private hedging-state
sentinel key. An empty dict is returned when ``headers`` is
``None``.
:rtype: ~azure.core.CaseInsensitiveDict
"""
if headers is None:
return CaseInsensitiveDict()
from ._diagnostics import HEDGING_STATE_HEADER_KEY
copied = headers.copy()
try:
copied.pop(HEDGING_STATE_HEADER_KEY, None)
except (TypeError, AttributeError): # pragma: no cover
pass
return copied


class CosmosDict(_HedgingDetectionAccessorsMixin, dict[str, Any]):
def __init__(self, original_dict: Optional[Mapping[str, Any]], /, *, response_headers: CaseInsensitiveDict) -> None:
if original_dict is None:
original_dict = {}
super().__init__(original_dict)
# Pull the hedging-detection state off the headers dict (if attached
# by the orchestrator) before storing the headers, so customers never
# see the private sentinel via ``get_response_headers()``.
self._hedging_state = _pop_state_from_headers(response_headers)
self._response_headers = response_headers

def get_response_headers(self) -> CaseInsensitiveDict:
Expand All @@ -118,12 +214,13 @@ def get_response_headers(self) -> CaseInsensitiveDict:
return self._response_headers.copy()


class CosmosList(list[dict[str, Any]]):
class CosmosList(_HedgingDetectionAccessorsMixin, list[dict[str, Any]]):
def __init__(self, original_list: Optional[Iterable[dict[str, Any]]], /, *,
response_headers: CaseInsensitiveDict) -> None:
if original_list is None:
original_list = []
super().__init__(original_list)
self._hedging_state = _pop_state_from_headers(response_headers)
self._response_headers = response_headers

def get_response_headers(self) -> CaseInsensitiveDict:
Expand Down
Loading
Loading