Skip to content

Commit e9802ae

Browse files
FullyTypedAstraea Quinn S
authored andcommitted
Parity: Added create_wait_strategy
Changes: - Half-jitter is now delay * (1.5 + random*0.5) - Added create_wait_strategy to match create_retry_strategy
1 parent af7b970 commit e9802ae

File tree

12 files changed

+589
-98
lines changed

12 files changed

+589
-98
lines changed

src/aws_durable_execution_sdk_python/config.py

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
from __future__ import annotations
44

5+
import random
56
from dataclasses import dataclass, field
6-
from enum import Enum
7+
from enum import Enum, StrEnum
78
from typing import TYPE_CHECKING, Generic, TypeVar
89

9-
from aws_durable_execution_sdk_python.retries import RetryDecision # noqa: TCH001
10-
1110
P = TypeVar("P") # Payload type
1211
R = TypeVar("R") # Result type
1312
T = TypeVar("T")
@@ -18,6 +17,7 @@
1817
from concurrent.futures import Future
1918

2019
from aws_durable_execution_sdk_python.lambda_service import OperationSubType
20+
from aws_durable_execution_sdk_python.retries import RetryDecision
2121
from aws_durable_execution_sdk_python.serdes import SerDes
2222
from aws_durable_execution_sdk_python.types import SummaryGenerator
2323

@@ -364,33 +364,6 @@ class WaitForCallbackConfig(CallbackConfig):
364364
retry_strategy: Callable[[Exception, int], RetryDecision] | None = None
365365

366366

367-
@dataclass(frozen=True)
368-
class WaitForConditionDecision:
369-
"""Decision about whether to continue waiting."""
370-
371-
should_continue: bool
372-
delay_seconds: int
373-
374-
@classmethod
375-
def continue_waiting(cls, delay_seconds: int) -> WaitForConditionDecision:
376-
"""Create a decision to continue waiting for delay_seconds."""
377-
return cls(should_continue=True, delay_seconds=delay_seconds)
378-
379-
@classmethod
380-
def stop_polling(cls) -> WaitForConditionDecision:
381-
"""Create a decision to stop polling."""
382-
return cls(should_continue=False, delay_seconds=-1)
383-
384-
385-
@dataclass(frozen=True)
386-
class WaitForConditionConfig(Generic[T]):
387-
"""Configuration for wait_for_condition."""
388-
389-
wait_strategy: Callable[[T, int], WaitForConditionDecision]
390-
initial_state: T
391-
serdes: SerDes | None = None
392-
393-
394367
class StepFuture(Generic[T]):
395368
"""A future that will block on result() until the step returns."""
396369

@@ -401,3 +374,37 @@ def __init__(self, future: Future[T], name: str | None = None):
401374
def result(self, timeout_seconds: int | None = None) -> T:
402375
"""Return the result of the Future."""
403376
return self.future.result(timeout=timeout_seconds)
377+
378+
379+
# region Jitter
380+
381+
382+
class JitterStrategy(StrEnum):
383+
"""
384+
Jitter strategies are used to introduce noise when attempting to retry
385+
an invoke. We introduce noise to prevent a thundering-herd effect where
386+
a group of accesses (e.g. invokes) happen at once.
387+
388+
Jitter is meant to be used to spread operations across time.
389+
390+
members:
391+
:NONE: No jitter; use the exact calculated delay
392+
:FULL: Full jitter; random delay between 0 and calculated delay
393+
:HALF: Half jitter; random delay between 0.5x and 1.0x of the calculated delay
394+
"""
395+
396+
NONE = "NONE"
397+
FULL = "FULL"
398+
HALF = "HALF"
399+
400+
def compute_jitter(self, delay) -> float:
401+
match self:
402+
case JitterStrategy.NONE:
403+
return 0
404+
case JitterStrategy.HALF:
405+
return delay * (random.random() * 0.5 + 0.5) # noqa: S311
406+
case _: # default is FULL
407+
return random.random() * delay # noqa: S311
408+
409+
410+
# endregion Jitter

src/aws_durable_execution_sdk_python/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
ParallelConfig,
1414
StepConfig,
1515
WaitForCallbackConfig,
16-
WaitForConditionConfig,
1716
)
1817
from aws_durable_execution_sdk_python.exceptions import (
1918
CallbackError,
@@ -55,6 +54,7 @@
5554

5655
from aws_durable_execution_sdk_python.state import CheckpointedResult
5756
from aws_durable_execution_sdk_python.types import LambdaContext
57+
from aws_durable_execution_sdk_python.waits import WaitForConditionConfig
5858

5959
P = TypeVar("P") # Payload type
6060
R = TypeVar("R") # Result type

src/aws_durable_execution_sdk_python/operation/step.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import TYPE_CHECKING, TypeVar
77

88
from aws_durable_execution_sdk_python.config import (
9-
RetryDecision,
109
StepConfig,
1110
StepSemantics,
1211
)
@@ -19,7 +18,7 @@
1918
OperationUpdate,
2019
)
2120
from aws_durable_execution_sdk_python.logger import Logger, LogInfo
22-
from aws_durable_execution_sdk_python.retries import RetryPresets
21+
from aws_durable_execution_sdk_python.retries import RetryDecision, RetryPresets
2322
from aws_durable_execution_sdk_python.serdes import deserialize, serialize
2423
from aws_durable_execution_sdk_python.suspend import (
2524
suspend_with_optional_timeout,

src/aws_durable_execution_sdk_python/operation/wait_for_condition.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@
2323
if TYPE_CHECKING:
2424
from collections.abc import Callable
2525

26-
from aws_durable_execution_sdk_python.config import (
27-
WaitForConditionConfig,
28-
WaitForConditionDecision,
29-
)
3026
from aws_durable_execution_sdk_python.identifier import OperationIdentifier
3127
from aws_durable_execution_sdk_python.logger import Logger
3228
from aws_durable_execution_sdk_python.state import (
3329
CheckpointedResult,
3430
ExecutionState,
3531
)
32+
from aws_durable_execution_sdk_python.waits import (
33+
WaitForConditionConfig,
34+
WaitForConditionDecision,
35+
)
3636

3737

3838
T = TypeVar("T")

src/aws_durable_execution_sdk_python/retries.py

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,50 +3,17 @@
33
from __future__ import annotations
44

55
import math
6-
import random
76
import re
87
from dataclasses import dataclass, field
9-
from enum import StrEnum
108
from typing import TYPE_CHECKING
119

10+
from aws_durable_execution_sdk_python.config import JitterStrategy
11+
1212
if TYPE_CHECKING:
1313
from collections.abc import Callable
1414

1515
Numeric = int | float
1616

17-
# region Jitter
18-
19-
20-
class JitterStrategy(StrEnum):
21-
"""
22-
Jitter strategies are used to introduce noise when attempting to retry
23-
an invoke. We introduce noise to prevent a thundering-herd effect where
24-
a group of accesses (e.g. invokes) happen at once.
25-
26-
Jitter is meant to be used to spread operations across time.
27-
28-
members:
29-
:NONE: No jitter; use the exact calculated delay
30-
:FULL: Full jitter; random delay between 0 and calculated delay
31-
:HALF: Half jitter; random delay between 0.5x and 1.0x of the calculated delay
32-
"""
33-
34-
NONE = "NONE"
35-
FULL = "FULL"
36-
HALF = "HALF"
37-
38-
def compute_jitter(self, delay) -> float:
39-
match self:
40-
case JitterStrategy.NONE:
41-
return 0
42-
case JitterStrategy.HALF:
43-
return random.random() * 0.5 + 0.5 # noqa: S311
44-
case _: # default is FULL
45-
return random.random() * delay # noqa: S311
46-
47-
48-
# endregion Jitter
49-
5017

5118
@dataclass
5219
class RetryDecision:
@@ -68,7 +35,7 @@ def no_retry(cls) -> RetryDecision:
6835

6936
@dataclass
7037
class RetryStrategyConfig:
71-
max_attempts: int = 3 # "infinite", practically
38+
max_attempts: int = 3
7239
initial_delay_seconds: int = 5
7340
max_delay_seconds: int = 300 # 5 minutes
7441
backoff_rate: Numeric = 2.0
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"""Ready-made wait strategies and wait creators."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass, field
6+
from typing import TYPE_CHECKING, Generic
7+
8+
from aws_durable_execution_sdk_python.config import JitterStrategy, T
9+
10+
if TYPE_CHECKING:
11+
from collections.abc import Callable
12+
13+
from aws_durable_execution_sdk_python.serdes import SerDes
14+
15+
Numeric = int | float
16+
17+
18+
@dataclass
19+
class WaitDecision:
20+
"""Decision about whether to wait a step and with what delay."""
21+
22+
should_wait: bool
23+
delay_seconds: int
24+
25+
@classmethod
26+
def wait(cls, delay_seconds: int) -> WaitDecision:
27+
"""Create a wait decision."""
28+
return cls(should_wait=True, delay_seconds=delay_seconds)
29+
30+
@classmethod
31+
def no_wait(cls) -> WaitDecision:
32+
"""Create a no-wait decision."""
33+
return cls(should_wait=False, delay_seconds=0)
34+
35+
36+
@dataclass
37+
class WaitStrategyConfig(Generic[T]):
38+
should_continue_polling: Callable[[T], bool]
39+
max_attempts: int = 60
40+
initial_delay_seconds: int = 5
41+
max_delay_seconds: int = 300 # 5 minutes
42+
backoff_rate: Numeric = 1.5
43+
jitter_strategy: JitterStrategy = field(default=JitterStrategy.FULL)
44+
timeout_seconds: int | None = None # Not implemented yet
45+
46+
47+
def create_wait_strategy(
48+
config: WaitStrategyConfig[T],
49+
) -> Callable[[T, int], WaitDecision]:
50+
def wait_strategy(result: T, attempts_made: int) -> WaitDecision:
51+
# Check if condition is met
52+
if not config.should_continue_polling(result):
53+
return WaitDecision.no_wait()
54+
55+
# Check if we've exceeded max attempts
56+
if attempts_made >= config.max_attempts:
57+
return WaitDecision.no_wait()
58+
59+
# Calculate delay with exponential backoff
60+
base_delay = min(
61+
config.initial_delay_seconds * (config.backoff_rate ** (attempts_made - 1)),
62+
config.max_delay_seconds,
63+
)
64+
65+
# Apply jitter (add jitter to base delay)
66+
jitter = config.jitter_strategy.compute_jitter(base_delay)
67+
delay_with_jitter = base_delay + jitter
68+
69+
# Ensure delay is an integer >= 1
70+
final_delay = max(1, round(delay_with_jitter))
71+
72+
return WaitDecision.wait(final_delay)
73+
74+
return wait_strategy
75+
76+
77+
@dataclass(frozen=True)
78+
class WaitForConditionDecision:
79+
"""Decision about whether to continue waiting."""
80+
81+
should_continue: bool
82+
delay_seconds: int
83+
84+
@classmethod
85+
def continue_waiting(cls, delay_seconds: int) -> WaitForConditionDecision:
86+
"""Create a decision to continue waiting for delay_seconds."""
87+
return cls(should_continue=True, delay_seconds=delay_seconds)
88+
89+
@classmethod
90+
def stop_polling(cls) -> WaitForConditionDecision:
91+
"""Create a decision to stop polling."""
92+
return cls(should_continue=False, delay_seconds=-1)
93+
94+
95+
@dataclass(frozen=True)
96+
class WaitForConditionConfig(Generic[T]):
97+
"""Configuration for wait_for_condition."""
98+
99+
wait_strategy: Callable[[T, int], WaitForConditionDecision]
100+
initial_state: T
101+
serdes: SerDes | None = None

tests/config_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
StepFuture,
1818
StepSemantics,
1919
TerminationMode,
20+
)
21+
from aws_durable_execution_sdk_python.waits import (
2022
WaitForConditionConfig,
2123
WaitForConditionDecision,
2224
)

tests/context_test.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
MapConfig,
1515
ParallelConfig,
1616
StepConfig,
17-
WaitForConditionConfig,
18-
WaitForConditionDecision,
1917
)
2018
from aws_durable_execution_sdk_python.context import Callback, DurableContext
2119
from aws_durable_execution_sdk_python.exceptions import (
@@ -33,6 +31,10 @@
3331
OperationType,
3432
)
3533
from aws_durable_execution_sdk_python.state import CheckpointedResult, ExecutionState
34+
from aws_durable_execution_sdk_python.waits import (
35+
WaitForConditionConfig,
36+
WaitForConditionDecision,
37+
)
3638
from tests.serdes_test import CustomDictSerDes
3739
from tests.test_helpers import operation_id_sequence
3840

tests/operation/step_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import pytest
88

99
from aws_durable_execution_sdk_python.config import (
10-
RetryDecision,
1110
StepConfig,
1211
StepSemantics,
1312
)
@@ -29,6 +28,7 @@
2928
)
3029
from aws_durable_execution_sdk_python.logger import Logger
3130
from aws_durable_execution_sdk_python.operation.step import step_handler
31+
from aws_durable_execution_sdk_python.retries import RetryDecision
3232
from aws_durable_execution_sdk_python.state import CheckpointedResult, ExecutionState
3333
from tests.serdes_test import CustomDictSerDes
3434

tests/operation/wait_for_condition_test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@
66

77
import pytest
88

9-
from aws_durable_execution_sdk_python.config import (
10-
WaitForConditionConfig,
11-
WaitForConditionDecision,
12-
)
139
from aws_durable_execution_sdk_python.exceptions import (
1410
CallableRuntimeError,
1511
InvocationError,
@@ -29,6 +25,10 @@
2925
)
3026
from aws_durable_execution_sdk_python.state import CheckpointedResult, ExecutionState
3127
from aws_durable_execution_sdk_python.types import WaitForConditionCheckContext
28+
from aws_durable_execution_sdk_python.waits import (
29+
WaitForConditionConfig,
30+
WaitForConditionDecision,
31+
)
3232
from tests.serdes_test import CustomDictSerDes
3333

3434

0 commit comments

Comments
 (0)