Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 78 additions & 35 deletions tests/integration/_utils.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,120 @@
from __future__ import annotations

import asyncio
import inspect
import logging
import time
from typing import TYPE_CHECKING, Literal, TypeVar
from typing import TYPE_CHECKING, TypeVar, cast, overload

from crawlee._utils.crypto import crypto_random_object_id

from apify import Actor

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable

T = TypeVar('T')
logger = logging.getLogger(__name__)

T = TypeVar('T')

async def call_with_exp_backoff(
fn: Callable[[], Awaitable[T]],
*,
rq_access_mode: Literal['single', 'shared'],
max_retries: int = 5,
) -> T | None:
"""Call an async callable with exponential backoff retries until it returns a truthy value.

In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests
become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with
exponential backoff to handle that delay in integration tests.
async def _maybe_await(value: Awaitable[T] | T) -> T:
"""Await `value` if it is awaitable, otherwise return it unchanged.

When `rq_access_mode` is `'single'`, the function is called once without retries.
Lets `call_with_exp_backoff` and `poll_until_condition` accept both sync and async callables.
"""
if rq_access_mode == 'single':
return await fn()

if rq_access_mode == 'shared':
result = None
if inspect.isawaitable(value):
return await cast('Awaitable[T]', value)
return cast('T', value)

for attempt in range(max_retries):
result = await fn()

if result:
return result
@overload
async def call_with_exp_backoff(
fn: Callable[[], Awaitable[T]],
condition: Callable[[T], bool] = ...,
*,
max_retries: int = ...,
base_delay: float = ...,
) -> T: ...
@overload
async def call_with_exp_backoff(
fn: Callable[[], T],
condition: Callable[[T], bool] = ...,
*,
max_retries: int = ...,
base_delay: float = ...,
) -> T: ...
async def call_with_exp_backoff(
fn: Callable[[], Awaitable[T] | T],
condition: Callable[[T], bool] = bool,
*,
max_retries: int = 5,
base_delay: float = 1.0,
) -> T:
"""Call `fn`, retrying with exponential backoff until `condition(result)` is True.

delay = 2**attempt
Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
await asyncio.sleep(delay)
Calls `fn` and checks whether `condition` holds for its result. If it does not, `fn` is retried up to
`max_retries` times, sleeping `base_delay * 2 ** attempt` seconds before each retry. The last result is
returned regardless of whether the condition was ever satisfied, so the caller can run its own assertion.

return result
This is useful for eventually-consistent APIs where a freshly added, reclaimed, or handled item may take a
moment to become visible (see https://github.com/apify/apify-sdk-python/issues/808). The default condition
checks for a truthy result. Pass `max_retries=0` to call `fn` exactly once without any retries.

raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}')
Unlike `poll_until_condition`, the delay between attempts grows exponentially rather than staying constant.
"""
result = await _maybe_await(fn())
for attempt in range(max_retries):
if condition(result):
return result
delay = base_delay * 2**attempt
logger.info(
'Condition not met for %r, retrying in %ss (attempt %d/%d).', result, delay, attempt + 1, max_retries
)
await asyncio.sleep(delay)
result = await _maybe_await(fn())
return result


@overload
async def poll_until_condition(
fn: Callable[[], Awaitable[T]],
condition: Callable[[T], bool],
condition: Callable[[T], bool] = ...,
*,
timeout: float = ...,
poll_interval: float = ...,
) -> T: ...
@overload
async def poll_until_condition(
fn: Callable[[], T],
condition: Callable[[T], bool] = ...,
*,
timeout: float = ...,
poll_interval: float = ...,
) -> T: ...
async def poll_until_condition(
fn: Callable[[], Awaitable[T] | T],
condition: Callable[[T], bool] = bool,
*,
timeout: float = 60,
poll_interval: float = 5,
) -> T:
"""Poll `fn` until `condition(result)` is True or the timeout expires.

Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed.
Returns the last polled result regardless of whether the condition was met.
Returns the last polled result regardless of whether the condition was met, so the caller can run its own
assertion. The default condition checks for a truthy result.

Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent API state (e.g. request queue
stats) that may take a variable amount of time to propagate.
Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. request queue
stats) that may take a variable amount of time to propagate. Unlike `call_with_exp_backoff`, the interval
between polls stays constant.
"""
deadline = time.monotonic() + timeout
result = await fn()
result = await _maybe_await(fn())
while not condition(result):
remaining = deadline - time.monotonic()
if remaining <= 0:
break
await asyncio.sleep(min(poll_interval, remaining))
result = await fn()
result = await _maybe_await(fn())
return result


Expand Down
Loading
Loading