Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sdk/core/corehttp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,22 @@
- `DistributedHttpTracingPolicy` and `distributed_trace`/`distributed_trace_async` decorators were added to support OpenTelemetry tracing for SDK operations.
- SDK clients can define an `_instrumentation_config` class variable to configure the OpenTelemetry tracer used in method span creation. Possible configuration options are `library_name`, `library_version`, `schema_url`, and `attributes`.
- Added a global settings object, `corehttp.settings`, to the `corehttp` package. This object can be used to set global settings for the `corehttp` package. Currently the only setting is `tracing_enabled` for enabling/disabling tracing. [#39172](https://github.com/Azure/azure-sdk-for-python/pull/39172)
- Added `start_time` and `context` keyword arguments to `OpenTelemetryTracer.start_span` and `start_as_current_span` methods.
- Added `set_span_error_status` static method to `OpenTelemetryTracer` for setting a span's status to ERROR.
- Added `is_generated_model`, `attribute_list`, and `TypeHandlerRegistry` to `corehttp.serialization` module for SDK model handling.

### Breaking Changes

### Bugs Fixed

- Fixed `retry_backoff_max` being ignored in retry policies when configuring retries.
- Raise correct exception if transport is used while already closed.
- A timeout error when using the `aiohttp` transport will now be raised as a `corehttp.exceptions.ServiceResponseTimeoutError`, a subtype of the previously raised `ServiceResponseError`.
- When using with `aiohttp` 3.10 or later, a connection timeout error will now be raised as a `corehttp.exceptions.ServiceRequestTimeoutError`, which can be retried.
- Fixed leaked requests and aiohttp exceptions for streamed responses.
- Improved granularity of `ServiceRequestError` and `ServiceResponseError` exceptions raised in timeout scenarios from the requests and aiohttp transports.
- `BearerTokenCredentialPolicy` and `AsyncBearerTokenCredentialPolicy` will now properly chain exceptions raised during claims challenge handling. If a credential raises an exception when attempting to acquire a token in response to a claims challenge, that exception will be raised with the original 401 response as the cause.

### Other Changes

- Added `opentelemetry-api` as an optional dependency for tracing. [#39172](https://github.com/Azure/azure-sdk-for-python/pull/39172)
Expand Down
19 changes: 14 additions & 5 deletions sdk/core/corehttp/corehttp/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptions] =
...

def close(self) -> None:
pass
"""Close the credential, releasing any resources it holds.

:return: None
:rtype: None
"""


class ServiceNamedKey(NamedTuple):
Expand All @@ -93,7 +97,7 @@ class ServiceKeyCredential:
It provides the ability to update the key without creating a new client.

:param str key: The key used to authenticate to a service
:raises: TypeError
:raises TypeError: If the key is not a string.
"""

def __init__(self, key: str) -> None:
Expand All @@ -117,7 +121,8 @@ def update(self, key: str) -> None:
to update long-lived clients.

:param str key: The key used to authenticate to a service
:raises: ValueError or TypeError
:raises ValueError: If the key is None or empty.
:raises TypeError: If the key is not a string.
"""
if not key:
raise ValueError("The key used for updating can not be None or empty")
Expand All @@ -132,7 +137,7 @@ class ServiceNamedKeyCredential:

:param str name: The name of the credential used to authenticate to a service.
:param str key: The key used to authenticate to a service.
:raises: TypeError
:raises TypeError: If the name or key is not a string.
"""

def __init__(self, name: str, key: str) -> None:
Expand Down Expand Up @@ -180,7 +185,11 @@ async def get_token_info(self, *scopes: str, options: Optional[TokenRequestOptio
...

async def close(self) -> None:
pass
"""Close the credential, releasing any resources.

:return: None
:rtype: None
"""

async def __aexit__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from __future__ import annotations
from contextlib import contextmanager
from contextvars import Token
from typing import Optional, Dict, Sequence, cast, Callable, Iterator, TYPE_CHECKING
from typing import Any, Optional, Dict, Sequence, cast, Callable, Iterator, TYPE_CHECKING

from opentelemetry import context as otel_context_module, trace
from opentelemetry.trace import (
Span,
SpanKind as OpenTelemetrySpanKind,
Link as OpenTelemetryLink,
StatusCode,
)
from opentelemetry.trace.propagation import get_current_span as get_current_span_otel
from opentelemetry.propagate import extract, inject
Expand Down Expand Up @@ -80,6 +81,8 @@ def start_span(
kind: SpanKind = _SpanKind.INTERNAL,
attributes: Optional[Attributes] = None,
links: Optional[Sequence[Link]] = None,
start_time: Optional[int] = None,
context: Optional[Dict[str, Any]] = None,
) -> Span:
"""Starts a span without setting it as the current span in the context.

Expand All @@ -91,17 +94,29 @@ def start_span(
:paramtype attributes: Mapping[str, AttributeValue]
:keyword links: Links to add to the span.
:paramtype links: list[~corehttp.instrumentation.tracing.Link]
:keyword start_time: The start time of the span in nanoseconds since the epoch.
:paramtype start_time: Optional[int]
:keyword context: A dictionary of context values corresponding to the parent span. If not provided,
the current global context will be used.
:paramtype context: Optional[Dict[str, any]]
:return: The span that was started
:rtype: ~opentelemetry.trace.Span
"""
otel_kind = _KIND_MAPPINGS.get(kind, OpenTelemetrySpanKind.INTERNAL)
otel_links = self._parse_links(links)

otel_context = None
if context:
otel_context = extract(context)

otel_span = self._tracer.start_span(
name,
context=otel_context,
kind=otel_kind,
attributes=attributes,
links=otel_links,
start_time=start_time,
record_exception=False,
)

return otel_span
Expand All @@ -114,12 +129,12 @@ def start_as_current_span(
kind: SpanKind = _SpanKind.INTERNAL,
attributes: Optional[Attributes] = None,
links: Optional[Sequence[Link]] = None,
start_time: Optional[int] = None,
context: Optional[Dict[str, Any]] = None,
end_on_exit: bool = True,
) -> Iterator[Span]:
"""Context manager that starts a span and sets it as the current span in the context.

Exiting the context manager will call the span's end method.

.. code:: python

with tracer.start_as_current_span("span_name") as span:
Expand All @@ -134,12 +149,19 @@ def start_as_current_span(
:paramtype attributes: Optional[Attributes]
:keyword links: Links to add to the span.
:paramtype links: Optional[Sequence[Link]]
:keyword start_time: The start time of the span in nanoseconds since the epoch.
:paramtype start_time: Optional[int]
:keyword context: A dictionary of context values corresponding to the parent span. If not provided,
the current global context will be used.
:paramtype context: Optional[Dict[str, any]]
:keyword end_on_exit: Whether to end the span when exiting the context manager. Defaults to True.
:paramtype end_on_exit: bool
:return: The span that was started
:rtype: ~opentelemetry.trace.Span
:rtype: Iterator[~opentelemetry.trace.Span]
"""
span = self.start_span(name, kind=kind, attributes=attributes, links=links)
span = self.start_span(
name, kind=kind, attributes=attributes, links=links, start_time=start_time, context=context
)
with trace.use_span( # pylint: disable=not-context-manager
span, record_exception=False, end_on_exit=end_on_exit
) as span:
Expand All @@ -162,6 +184,17 @@ def use_span(cls, span: Span, *, end_on_exit: bool = True) -> Iterator[Span]:
) as active_span:
yield active_span

@staticmethod
def set_span_error_status(span: Span, description: Optional[str] = None) -> None:
"""Set the status of a span to ERROR with the provided description, if any.

:param span: The span to set the ERROR status on.
:type span: ~opentelemetry.trace.Span
:param description: An optional description of the error.
:type description: str
"""
span.set_status(StatusCode.ERROR, description=description)

def _parse_links(self, links: Optional[Sequence[Link]]) -> Optional[Sequence[OpenTelemetryLink]]:
if not links:
return None
Expand Down
13 changes: 13 additions & 0 deletions sdk/core/corehttp/corehttp/paging.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ def __iter__(self) -> Iterator[Iterator[ReturnType]]:
return self

def __next__(self) -> Iterator[ReturnType]:
"""Get the next page in the iterator.

:returns: An iterator of objects in the next page.
:rtype: iterator[ReturnType]
:raises StopIteration: If there are no more pages to return.
:raises ~corehttp.exceptions.BaseError: If the request to get the next page fails.
"""
if self.continuation_token is None and self._did_a_call_already:
raise StopIteration("End of paging")
try:
Expand Down Expand Up @@ -129,6 +136,12 @@ def __iter__(self) -> Iterator[ReturnType]:
return self

def __next__(self) -> ReturnType:
"""Get the next item in the iterator.

:returns: The next item in the iterator.
:rtype: ReturnType
:raises StopIteration: If there are no more items to return.
"""
if self._page_iterator is None:
self._page_iterator = itertools.chain.from_iterable(self.by_page())
return next(self._page_iterator)
Expand Down
16 changes: 14 additions & 2 deletions sdk/core/corehttp/corehttp/rest/_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
ServiceRequestError,
ServiceResponseError,
IncompleteReadError,
ServiceResponseTimeoutError,
)
from ..runtime.pipeline import AsyncPipeline
from ..transport._base_async import _ResponseStopIteration
Expand Down Expand Up @@ -224,7 +225,18 @@ async def read(self) -> bytes:
"""
if not self._content:
self._stream_download_check()
self._content = await self._internal_response.read()
try:
self._content = await self._internal_response.read()
except aiohttp.client_exceptions.ClientPayloadError as err:
# This is the case that server closes connection before we finish the reading. aiohttp library
# raises ClientPayloadError.
raise IncompleteReadError(err, error=err) from err
except aiohttp.client_exceptions.ClientResponseError as err:
raise ServiceResponseError(err, error=err) from err
except asyncio.TimeoutError as err:
raise ServiceResponseTimeoutError(err, error=err) from err
except aiohttp.client_exceptions.ClientError as err:
raise ServiceRequestError(err, error=err) from err
await self._set_read_checks()
return _aiohttp_content_helper(self)

Expand Down Expand Up @@ -306,7 +318,7 @@ async def __anext__(self):
except aiohttp.client_exceptions.ClientResponseError as err:
raise ServiceResponseError(err, error=err) from err
except asyncio.TimeoutError as err:
raise ServiceResponseError(err, error=err) from err
raise ServiceResponseTimeoutError(err, error=err) from err
except aiohttp.client_exceptions.ClientError as err:
raise ServiceRequestError(err, error=err) from err
except Exception:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ async def read(self) -> bytes:

async def iter_raw(self, **kwargs: Any) -> AsyncIterator[bytes]:
"""Asynchronously iterates over the response's bytes. Will not decompress in the process

:return: An async iterator of bytes from the response
:rtype: AsyncIterator[bytes]
"""
Expand All @@ -79,6 +80,7 @@ async def iter_raw(self, **kwargs: Any) -> AsyncIterator[bytes]:

async def iter_bytes(self, **kwargs: Any) -> AsyncIterator[bytes]:
"""Asynchronously iterates over the response's bytes. Will decompress in the process

:return: An async iterator of bytes from the response
:rtype: AsyncIterator[bytes]
"""
Expand Down
12 changes: 10 additions & 2 deletions sdk/core/corehttp/corehttp/rest/_requests_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
from ..runtime.pipeline import Pipeline
from ._http_response_impl import _HttpResponseBaseImpl, HttpResponseImpl
from ..exceptions import (
ServiceRequestError,
ServiceResponseError,
ServiceResponseTimeoutError,
IncompleteReadError,
HttpResponseError,
DecodeError,
Expand Down Expand Up @@ -162,6 +162,14 @@ def __next__(self):
_LOGGER.warning("Unable to stream download.")
internal_response.close()
raise HttpResponseError(err, error=err) from err
except requests.ConnectionError as err:
internal_response.close()
if err.args and isinstance(err.args[0], ReadTimeoutError):
raise ServiceResponseTimeoutError(err, error=err) from err
raise ServiceResponseError(err, error=err) from err
except requests.RequestException as err:
internal_response.close()
raise ServiceResponseError(err, error=err) from err
except Exception as err:
_LOGGER.warning("Unable to stream download.")
internal_response.close()
Expand All @@ -178,7 +186,7 @@ def _read_raw_stream(response, chunk_size=1):
except CoreDecodeError as e:
raise DecodeError(e, error=e) from e
except ReadTimeoutError as e:
raise ServiceRequestError(e, error=e) from e
raise ServiceResponseTimeoutError(e, error=e) from e
else:
# Standard file-like object.
while True:
Expand Down
4 changes: 2 additions & 2 deletions sdk/core/corehttp/corehttp/rest/_rest_py3.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def json(self) -> Any:

:return: The JSON deserialized response body
:rtype: any
:raises json.decoder.JSONDecodeError or ValueError (in python 2.7) if object is not JSON decodable:
:raises json.decoder.JSONDecodeError: if the body is not valid JSON.
"""

@abc.abstractmethod
Expand All @@ -309,7 +309,7 @@ def raise_for_status(self) -> None:

If response is good, does nothing.

:raises ~corehttp.HttpResponseError if the object has an error status code.:
:raises ~corehttp.HttpResponseError: if the object has an error status code.
"""


Expand Down
4 changes: 2 additions & 2 deletions sdk/core/corehttp/corehttp/runtime/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __delitem__(self, key: str) -> None:
def clear(self) -> None: # pylint: disable=docstring-missing-return, docstring-missing-rtype
"""Context objects cannot be cleared.

:raises: TypeError
:raises TypeError: If context objects cannot be cleared.
"""
raise TypeError("Context objects cannot be cleared.")

Expand All @@ -106,7 +106,7 @@ def update( # pylint: disable=docstring-missing-return, docstring-missing-rtype
) -> None:
"""Context objects cannot be updated.

:raises: TypeError
:raises TypeError: If context objects cannot be updated.
"""
raise TypeError("Context objects cannot be updated.")

Expand Down
2 changes: 1 addition & 1 deletion sdk/core/corehttp/corehttp/runtime/pipeline/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def await_result(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
:type args: list
:rtype: any
:return: The result of the function
:raises: TypeError
:raises TypeError: If the function returns an awaitable object.
"""
result = func(*args, **kwargs)
if hasattr(result, "__await__"):
Expand Down
Loading
Loading