Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private void generateService(PythonWriter writer) {
}

writer.addDependency(SmithyPythonDependency.SMITHY_CORE);
writer.addImport("smithy_core.retries", "RetryStrategyResolver");
writer.addImport("smithy_core.aio.retries", "RetryStrategyResolver");
writer.write("""
def __init__(self, config: $1T | None = None, plugins: list[$2T] | None = None):
$3C
Expand Down Expand Up @@ -215,7 +215,7 @@ private void writeSharedOperationInit(
writer.addImport("smithy_core.aio.client", "RequestPipeline");
writer.addImport("smithy_core.exceptions", "ExpectationNotMetError");
writer.addImport("smithy_core.retries", "RetryStrategyOptions");
writer.addImport("smithy_core.interfaces.retries", "RetryStrategy");
writer.addImport("smithy_core.aio.interfaces.retries", "RetryStrategy");
writer.addStdlibImport("copy", "deepcopy");

writer.write("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private void generateRequestTest(OperationShape operation, HttpRequestTestCase t
} else {
path = "";
}
writer.addImport("smithy_core.retries", "SimpleRetryStrategy");
writer.addImport("smithy_core.aio.retries", "SimpleRetryStrategy");
writeClientBlock(context.symbolProvider().toSymbol(service), testCase, Optional.of(() -> {
writer.write("""
config = $T(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public final class ConfigGenerator implements Runnable {
.name("RetryStrategy | RetryStrategyOptions")
.addReference(Symbol.builder()
.name("RetryStrategy")
.namespace("smithy_core.interfaces.retries", ".")
.namespace("smithy_core.aio.interfaces.retries", ".")
.addDependency(SmithyPythonDependency.SMITHY_CORE)
.build())
.addReference(Symbol.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

from smithy_core import URI
from smithy_core.aio.interfaces.identity import IdentityResolver
from smithy_core.aio.interfaces.retries import RetryStrategy
from smithy_core.aio.retries import SimpleRetryStrategy
from smithy_core.exceptions import SmithyIdentityError
from smithy_core.interfaces.retries import RetryStrategy
from smithy_core.retries import SimpleRetryStrategy
from smithy_http import Field, Fields
from smithy_http.aio import HTTPRequest
from smithy_http.aio.interfaces import HTTPClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
TokenCache,
)
from smithy_core import URI
from smithy_core.retries import SimpleRetryStrategy
from smithy_core.aio.retries import SimpleRetryStrategy
from smithy_http.aio import HTTPRequest


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "breaking",
"description": "Refactored retry strategies to be async, allowing them to wait internally or use async synchronization primitives if necessary."
}
8 changes: 4 additions & 4 deletions packages/smithy-core/src/smithy_core/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
)
from ..interfaces import Endpoint, TypedProperties
from ..interfaces.auth import AuthOption, AuthSchemeResolver
from ..interfaces.retries import RetryStrategy
from ..schemas import APIOperation
from ..serializers import SerializeableShape
from ..shapes import ShapeID
Expand All @@ -37,6 +36,7 @@
)
from .interfaces.auth import AuthScheme
from .interfaces.eventstream import EventReceiver
from .interfaces.retries import RetryStrategy
from .utils import seek

if TYPE_CHECKING:
Expand Down Expand Up @@ -330,7 +330,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](
return await self._handle_attempt(call, request_context, request_future)

retry_strategy = call.retry_strategy
retry_token = retry_strategy.acquire_initial_retry_token(
retry_token = await retry_strategy.acquire_initial_retry_token(
token_scope=call.retry_scope
)

Expand All @@ -349,7 +349,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](

if isinstance(output_context.response, Exception):
try:
retry_token = retry_strategy.refresh_retry_token_for_retry(
retry_token = await retry_strategy.refresh_retry_token_for_retry(
token_to_renew=retry_token,
error=output_context.response,
)
Expand All @@ -364,7 +364,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](

await seek(request_context.transport_request.body, 0)
else:
retry_strategy.record_success(token=retry_token)
await retry_strategy.record_success(token=retry_token)
return output_context

async def _handle_attempt[I: SerializeableShape, O: DeserializeableShape](
Expand Down
56 changes: 56 additions & 0 deletions packages/smithy-core/src/smithy_core/aio/interfaces/retries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Protocol, runtime_checkable

from ...interfaces.retries import RetryBackoffStrategy, RetryToken


@runtime_checkable
class RetryStrategy(Protocol):
"""Issuer of :py:class:`RetryToken`s."""

backoff_strategy: RetryBackoffStrategy
"""The strategy used by returned tokens to compute delay duration values."""

max_attempts: int
"""Upper limit on total attempt count (initial attempt plus retries)."""

async def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> RetryToken:
"""Create a base retry token for the start of a request.

:param token_scope: An arbitrary string accepted by the retry strategy to
separate tokens into scopes.
:returns: A retry token, to be used for determining the retry delay, refreshing
the token after a failure, and recording success after success.
:raises RetryError: If the retry strategy has no available tokens.
"""
...

async def refresh_retry_token_for_retry(
self, *, token_to_renew: RetryToken, error: Exception
) -> RetryToken:
"""Replace an existing retry token from a failed attempt with a new token.

After a failed operation call, this method is called to exchange a retry token
that was previously obtained by calling :py:func:`acquire_initial_retry_token`
or this method with a new retry token for the next attempt. This method can
either choose to allow another retry and send a new or updated token, or reject
the retry attempt and raise the error.

:param token_to_renew: The token used for the previous failed attempt.
:param error: The error that triggered the need for a retry.
:raises RetryError: If no further retry attempts are allowed.
"""
...

async def record_success(self, *, token: RetryToken) -> None:
"""Return token after successful completion of an operation.

Upon successful completion of the operation, a user calls this function to
record that the operation was successful.

:param token: The token used for the previous successful attempt.
"""
...
227 changes: 227 additions & 0 deletions packages/smithy-core/src/smithy_core/aio/retries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from functools import lru_cache
from typing import Any, Literal

from ..exceptions import RetryError
from ..interfaces import retries as retries_interface
from ..retries import (
ExponentialBackoffJitterType,
ExponentialRetryBackoffStrategy,
RetryStrategyOptions,
SimpleRetryToken,
StandardRetryQuota,
StandardRetryToken,
)
from .interfaces.retries import RetryStrategy

RetryStrategyType = Literal["simple", "standard"]


class RetryStrategyResolver:
"""Retry strategy resolver that caches retry strategies based on configuration options.

This resolver caches retry strategy instances based on their configuration to reuse existing
instances of RetryStrategy with the same settings. Uses LRU cache for thread-safe caching.
"""

async def resolve_retry_strategy(
self, *, retry_strategy: RetryStrategy | RetryStrategyOptions | None
) -> RetryStrategy:
"""Resolve a retry strategy from the provided options, using cache when possible.

:param retry_strategy: An explicitly configured retry strategy or options for creating one.
"""
if isinstance(retry_strategy, RetryStrategy):
return retry_strategy
elif retry_strategy is None:
retry_strategy = RetryStrategyOptions()
elif not isinstance(retry_strategy, RetryStrategyOptions): # type: ignore[reportUnnecessaryIsInstance]
raise TypeError(
f"retry_strategy must be RetryStrategy, RetryStrategyOptions, or None, "
f"got {type(retry_strategy).__name__}"
)
return self._create_retry_strategy(
retry_strategy.retry_mode, retry_strategy.max_attempts
)

@lru_cache
def _create_retry_strategy(
self, retry_mode: RetryStrategyType, max_attempts: int | None
) -> RetryStrategy:
kwargs = {"max_attempts": max_attempts}
filtered_kwargs: dict[str, Any] = {
k: v for k, v in kwargs.items() if v is not None
}
match retry_mode:
case "simple":
return SimpleRetryStrategy(**filtered_kwargs)
case "standard":
return StandardRetryStrategy(**filtered_kwargs)
case _:
raise ValueError(f"Unknown retry mode: {retry_mode}")


class SimpleRetryStrategy:
def __init__(
self,
*,
backoff_strategy: retries_interface.RetryBackoffStrategy | None = None,
max_attempts: int = 5,
):
"""Retry strategy that simply invokes the given backoff strategy.

:param backoff_strategy: The backoff strategy used by returned tokens to compute
the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`.

:param max_attempts: Upper limit on total number of attempts made, including
initial attempt and retries.
"""
self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy()
self.max_attempts = max_attempts

async def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> SimpleRetryToken:
"""Create a base retry token for the start of a request.

:param token_scope: This argument is ignored by this retry strategy.
"""
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0)
return SimpleRetryToken(retry_count=0, retry_delay=retry_delay)

async def refresh_retry_token_for_retry(
self,
*,
token_to_renew: retries_interface.RetryToken,
error: Exception,
) -> SimpleRetryToken:
"""Replace an existing retry token from a failed attempt with a new token.

This retry strategy always returns a token until the attempt count stored in
the new token exceeds the ``max_attempts`` value.

:param token_to_renew: The token used for the previous failed attempt.
:param error: The error that triggered the need for a retry.
:raises RetryError: If no further retry attempts are allowed.
"""
if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe:
retry_count = token_to_renew.retry_count + 1
if retry_count >= self.max_attempts:
raise RetryError(
f"Reached maximum number of allowed attempts: {self.max_attempts}"
) from error
retry_delay = self.backoff_strategy.compute_next_backoff_delay(retry_count)
return SimpleRetryToken(retry_count=retry_count, retry_delay=retry_delay)
else:
raise RetryError(f"Error is not retryable: {error}") from error

async def record_success(self, *, token: retries_interface.RetryToken) -> None:
"""Not used by this retry strategy."""

def __deepcopy__(self, memo: Any) -> "SimpleRetryStrategy":
return self


class StandardRetryStrategy:
def __init__(
self,
*,
backoff_strategy: retries_interface.RetryBackoffStrategy | None = None,
max_attempts: int = 3,
retry_quota: StandardRetryQuota | None = None,
):
"""Standard retry strategy using truncated binary exponential backoff
with full jitter.

:param backoff_strategy: The backoff strategy used by returned tokens to compute
the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`.

:param max_attempts: Upper limit on total number of attempts made, including
initial attempt and retries.

:param retry_quota: The retry quota to use for managing retry capacity. Defaults
to a new :py:class:`StandardRetryQuota` instance.
"""
if max_attempts < 0:
raise ValueError(
f"max_attempts must be a non-negative integer, got {max_attempts}"
)

self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy(
backoff_scale_value=1,
max_backoff=20,
jitter_type=ExponentialBackoffJitterType.FULL,
)
self.max_attempts = max_attempts
self._retry_quota = retry_quota or StandardRetryQuota()

async def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> StandardRetryToken:
"""Create a base retry token for the start of a request.

:param token_scope: This argument is ignored by this retry strategy.
"""
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0)
return StandardRetryToken(retry_count=0, retry_delay=retry_delay)

async def refresh_retry_token_for_retry(
self,
*,
token_to_renew: retries_interface.RetryToken,
error: Exception,
) -> StandardRetryToken:
"""Replace an existing retry token from a failed attempt with a new token.

This retry strategy always returns a token until the attempt count stored in
the new token exceeds the ``max_attempts`` value.

:param token_to_renew: The token used for the previous failed attempt.
:param error: The error that triggered the need for a retry.
:raises RetryError: If no further retry attempts are allowed.
"""
if not isinstance(token_to_renew, StandardRetryToken):
raise TypeError(
f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}"
)

if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe:
retry_count = token_to_renew.retry_count + 1
if retry_count >= self.max_attempts:
raise RetryError(
f"Reached maximum number of allowed attempts: {self.max_attempts}"
) from error

# Acquire additional quota for this retry attempt
# (may raise a RetryError if none is available)
quota_acquired = self._retry_quota.acquire(error=error)

if error.retry_after is not None:
retry_delay = error.retry_after
else:
retry_delay = self.backoff_strategy.compute_next_backoff_delay(
retry_count
)

return StandardRetryToken(
retry_count=retry_count,
retry_delay=retry_delay,
quota_acquired=quota_acquired,
)
else:
raise RetryError(f"Error is not retryable: {error}") from error

async def record_success(self, *, token: retries_interface.RetryToken) -> None:
"""Release retry quota back based on the amount consumed by the last retry.

:param token: The token used for the previous successful attempt.
"""
if not isinstance(token, StandardRetryToken):
raise TypeError(
f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}"
)
self._retry_quota.release(release_amount=token.quota_acquired)

def __deepcopy__(self, memo: Any) -> "StandardRetryStrategy":
return self
Loading
Loading