From 305213137541ed49a349c8ea7964e18221c93c96 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 29 May 2026 18:00:46 +0200 Subject: [PATCH] test: generalize integration polling helpers Make call_with_exp_backoff and poll_until_condition generic and consistent so the same two helpers can be shared across apify-sdk-python, crawlee-python and apify-client-python: - call_with_exp_backoff no longer takes rq_access_mode. It now takes an optional condition (default: truthy) and retries fn until the condition holds, returning the last result. The Apify RQ single/shared distinction moves to the call sites via max_retries (0 = call once / single mode, 5 = retry / shared mode). - poll_until_condition gains the same condition default and now accepts both sync and async callables (so it can poll plain values too). - Both helpers accept sync or async fn via a small _maybe_await helper, typed with overloads so the return type is preserved. All ~36 call sites in test_request_queue.py are refactored accordingly. --- tests/integration/_utils.py | 113 ++++++++++++++++-------- tests/integration/test_request_queue.py | 97 ++++++++++++-------- 2 files changed, 137 insertions(+), 73 deletions(-) diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index 3fb26bbd..dada6684 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -1,57 +1,98 @@ from __future__ import annotations import asyncio +import inspect +import logging import time -from typing import TYPE_CHECKING, Literal, TypeVar +from typing import TYPE_CHECKING, TypeVar, cast, overload from crawlee._utils.crypto import crypto_random_object_id -from apify import Actor - if TYPE_CHECKING: from collections.abc import Awaitable, Callable -T = TypeVar('T') +logger = logging.getLogger(__name__) +T = TypeVar('T') -async def call_with_exp_backoff( - fn: Callable[[], Awaitable[T]], - *, - rq_access_mode: Literal['single', 'shared'], - max_retries: int = 5, -) -> T | None: - """Call an async callable with exponential backoff retries until it returns a truthy value. - In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests - become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with - exponential backoff to handle that delay in integration tests. +async def _maybe_await(value: Awaitable[T] | T) -> T: + """Await `value` if it is awaitable, otherwise return it unchanged. - When `rq_access_mode` is `'single'`, the function is called once without retries. + Lets `call_with_exp_backoff` and `poll_until_condition` accept both sync and async callables. """ - if rq_access_mode == 'single': - return await fn() - - if rq_access_mode == 'shared': - result = None + if inspect.isawaitable(value): + return await cast('Awaitable[T]', value) + return cast('T', value) - for attempt in range(max_retries): - result = await fn() - if result: - return result +@overload +async def call_with_exp_backoff( + fn: Callable[[], Awaitable[T]], + condition: Callable[[T], bool] = ..., + *, + max_retries: int = ..., + base_delay: float = ..., +) -> T: ... +@overload +async def call_with_exp_backoff( + fn: Callable[[], T], + condition: Callable[[T], bool] = ..., + *, + max_retries: int = ..., + base_delay: float = ..., +) -> T: ... +async def call_with_exp_backoff( + fn: Callable[[], Awaitable[T] | T], + condition: Callable[[T], bool] = bool, + *, + max_retries: int = 5, + base_delay: float = 1.0, +) -> T: + """Call `fn`, retrying with exponential backoff until `condition(result)` is True. - delay = 2**attempt - Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') - await asyncio.sleep(delay) + Calls `fn` and checks whether `condition` holds for its result. If it does not, `fn` is retried up to + `max_retries` times, sleeping `base_delay * 2 ** attempt` seconds before each retry. The last result is + returned regardless of whether the condition was ever satisfied, so the caller can run its own assertion. - return result + This is useful for eventually-consistent APIs where a freshly added, reclaimed, or handled item may take a + moment to become visible (see https://github.com/apify/apify-sdk-python/issues/808). The default condition + checks for a truthy result. Pass `max_retries=0` to call `fn` exactly once without any retries. - raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}') + Unlike `poll_until_condition`, the delay between attempts grows exponentially rather than staying constant. + """ + result = await _maybe_await(fn()) + for attempt in range(max_retries): + if condition(result): + return result + delay = base_delay * 2**attempt + logger.info( + 'Condition not met for %r, retrying in %ss (attempt %d/%d).', result, delay, attempt + 1, max_retries + ) + await asyncio.sleep(delay) + result = await _maybe_await(fn()) + return result +@overload async def poll_until_condition( fn: Callable[[], Awaitable[T]], - condition: Callable[[T], bool], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., +) -> T: ... +@overload +async def poll_until_condition( + fn: Callable[[], T], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., +) -> T: ... +async def poll_until_condition( + fn: Callable[[], Awaitable[T] | T], + condition: Callable[[T], bool] = bool, *, timeout: float = 60, poll_interval: float = 5, @@ -59,19 +100,21 @@ async def poll_until_condition( """Poll `fn` until `condition(result)` is True or the timeout expires. Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed. - Returns the last polled result regardless of whether the condition was met. + Returns the last polled result regardless of whether the condition was met, so the caller can run its own + assertion. The default condition checks for a truthy result. - Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent API state (e.g. request queue - stats) that may take a variable amount of time to propagate. + Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. request queue + stats) that may take a variable amount of time to propagate. Unlike `call_with_exp_backoff`, the interval + between polls stays constant. """ deadline = time.monotonic() + timeout - result = await fn() + result = await _maybe_await(fn()) while not condition(result): remaining = deadline - time.monotonic() if remaining <= 0: break await asyncio.sleep(min(poll_interval, remaining)) - result = await fn() + result = await _maybe_await(fn()) return result diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 87ae7e0b..d1e77cdd 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -25,8 +25,9 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata -# In shared mode, there is a propagation delay between operations so we use test helper -# `call_with_exp_backoff` for exponential backoff. See https://github.com/apify/apify-sdk-python/issues/808. +# In shared mode, there is a propagation delay between operations, so we retry reads with the test helper +# `call_with_exp_backoff` (`max_retries=5`). In single mode reads are immediately consistent, so we call once +# (`max_retries=0`). See https://github.com/apify/apify-sdk-python/issues/808. async def test_add_and_fetch_requests( @@ -35,6 +36,7 @@ async def test_add_and_fetch_requests( ) -> None: """Test basic functionality of adding and fetching requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 desired_request_count = 100 Actor.log.info('Opening request queue...') @@ -46,7 +48,7 @@ async def test_add_and_fetch_requests( await rq.add_request(f'https://example.com/{i}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): Actor.log.info('Fetching next request...') queue_operation_info = await rq.mark_request_as_handled(next_request) assert queue_operation_info is not None, f'queue_operation_info={queue_operation_info}' @@ -60,7 +62,7 @@ async def test_add_and_fetch_requests( f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' @@ -70,6 +72,7 @@ async def test_add_requests_in_batches( ) -> None: """Test adding multiple requests in a single batch operation.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 desired_request_count = 100 rq = request_queue_apify @@ -81,7 +84,7 @@ async def test_add_requests_in_batches( Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}...') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -95,7 +98,7 @@ async def test_add_requests_in_batches( f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' @@ -105,6 +108,7 @@ async def test_add_non_unique_requests_in_batch( ) -> None: """Test adding requests with duplicate unique keys in batch.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 desired_request_count = 100 rq = request_queue_apify @@ -120,7 +124,7 @@ async def test_add_non_unique_requests_in_batch( Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}: {next_request.url}') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -135,7 +139,7 @@ async def test_add_non_unique_requests_in_batch( f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' @@ -146,6 +150,7 @@ async def test_forefront_requests_ordering( ) -> None: """Test that forefront requests are processed before regular requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -164,7 +169,7 @@ async def test_forefront_requests_ordering( # Fetch requests and verify order. fetched_urls = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): Actor.log.info(f'Fetched request: {next_request.url}') fetched_urls.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -189,6 +194,7 @@ async def test_request_unique_key_behavior( ) -> None: """Test behavior of custom unique keys.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -218,7 +224,7 @@ async def test_request_unique_key_behavior( # Only 2 requests should be fetchable. fetched_count = 0 fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): fetched_count += 1 fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -240,6 +246,7 @@ async def test_request_reclaim_functionality( ) -> None: """Test request reclaiming for failed processing.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -249,7 +256,7 @@ async def test_request_reclaim_functionality( Actor.log.info('Added test request') # Fetch and reclaim the request. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + fetched_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -271,7 +278,7 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True @@ -282,6 +289,7 @@ async def test_request_reclaim_with_forefront( """Test reclaiming requests to the front of the queue.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -293,7 +301,7 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Added 3 requests') # Fetch first request. - first_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + first_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -327,6 +335,7 @@ async def test_complex_request_objects( ) -> None: """Test handling complex Request objects with various properties.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -343,7 +352,7 @@ async def test_complex_request_objects( Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') # Fetch and verify all properties are preserved. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + fetched_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -374,6 +383,7 @@ async def test_get_request_by_unique_key( ) -> None: """Test retrieving specific requests by their unique_key.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -386,7 +396,7 @@ async def test_get_request_by_unique_key( retrieved_request = await call_with_exp_backoff( lambda: rq.get_request(request_unique_key), - rq_access_mode=rq_access_mode, + max_retries=max_retries, ) assert retrieved_request is not None, f'retrieved_request={retrieved_request}' assert retrieved_request.url == 'https://example.com/test', f'retrieved_request.url={retrieved_request.url}' @@ -405,6 +415,7 @@ async def test_metadata_tracking( ) -> None: """Test request queue metadata and counts.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -429,7 +440,7 @@ async def test_metadata_tracking( # Process some requests. for _ in range(3): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) if next_request: await rq.mark_request_as_handled(next_request) @@ -449,6 +460,7 @@ async def test_batch_operations_performance( ) -> None: """Test batch operations vs individual operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -470,7 +482,7 @@ async def test_batch_operations_performance( # Process all requests. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): processed_count += 1 await rq.mark_request_as_handled(next_request) if processed_count >= 50: # Safety break @@ -479,7 +491,7 @@ async def test_batch_operations_performance( Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' @@ -490,6 +502,7 @@ async def test_state_consistency( ) -> None: """Test queue state consistency during concurrent operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -507,7 +520,7 @@ async def test_state_consistency( reclaimed_requests = [] for i in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) if next_request: if i % 2 == 0: # Process even indices await rq.mark_request_as_handled(next_request) @@ -534,31 +547,32 @@ async def test_state_consistency( # Process remaining requests. remaining_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): remaining_count += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pytest.FixtureRequest) -> None: """Test behavior with empty queues.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') # Test empty queue operations - is_empty = await call_with_exp_backoff(rq.is_empty, rq_access_mode=rq_access_mode) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_empty = await call_with_exp_backoff(rq.is_empty, max_retries=max_retries) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) Actor.log.info(f'Empty queue - is_empty: {is_empty}, is_finished: {is_finished}') assert is_empty is True, f'is_empty={is_empty}' assert is_finished is True, f'is_finished={is_finished}' # Fetch from empty queue - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) Actor.log.info(f'Fetch result from empty queue: {next_request}') assert next_request is None, f'request={next_request}' @@ -581,6 +595,7 @@ async def test_large_batch_operations( ) -> None: """Test handling large batches of requests.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -600,14 +615,14 @@ async def test_large_batch_operations( # Process all in chunks to test performance. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished is True, f'is_finished={is_finished}' @@ -617,6 +632,7 @@ async def test_mixed_string_and_request_objects( ) -> None: """Test adding both string URLs and Request objects.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -645,7 +661,7 @@ async def test_mixed_string_and_request_objects( # Fetch and verify all types work. fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -669,6 +685,7 @@ async def test_persistence_across_operations( ) -> None: """Test that queue state persists across different operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 # Open queue and add some requests rq = request_queue_apify @@ -685,7 +702,7 @@ async def test_persistence_across_operations( # Process some requests. processed_count = 0 for _ in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) if next_request: await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -710,12 +727,12 @@ async def test_persistence_across_operations( # Process remaining. remaining_processed = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): remaining_processed += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) final_total = await rq.get_total_count() final_handled = await rq.get_handled_count() @@ -785,6 +802,7 @@ async def test_request_ordering_with_mixed_operations( ) -> None: """Test request ordering with mixed add/reclaim operations.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify Actor.log.info('Request queue opened') @@ -795,7 +813,7 @@ async def test_request_ordering_with_mixed_operations( Actor.log.info('Added initial requests') # Fetch one and reclaim to forefront. - request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request1 = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert request1 is not None, f'request1={request1}' assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' Actor.log.info(f'Fetched request: {request1.url}') @@ -809,7 +827,7 @@ async def test_request_ordering_with_mixed_operations( # Fetch all requests and verify forefront behavior. urls_ordered = list[str]() - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries): urls_ordered.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -1044,6 +1062,7 @@ async def test_rq_long_url( ) -> None: """Test handling of requests with long URLs and extended unique keys.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 rq = request_queue_apify long_url_request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', @@ -1057,12 +1076,12 @@ async def test_rq_long_url( assert processed_request is not None assert processed_request.id == request_id - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request_obtained = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert request_obtained is not None await rq.mark_request_as_handled(request_obtained) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, max_retries=max_retries) assert is_finished @@ -1075,6 +1094,7 @@ async def test_pre_existing_request_with_user_data( list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 custom_data = {'key': 'value'} rq = request_queue_apify @@ -1088,7 +1108,7 @@ async def test_pre_existing_request_with_user_data( await rq_client.add_request(req.model_dump(by_alias=True)) # Fetch the request by the client under test. - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request_obtained = await call_with_exp_backoff(rq.fetch_next_request, max_retries=max_retries) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) assert custom_data.items() <= request_obtained.user_data.items() @@ -1118,18 +1138,19 @@ async def test_request_queue_is_finished( request: pytest.FixtureRequest, ) -> None: rq_access_mode = request.node.callspec.params.get('request_queue_apify') + max_retries = 0 if rq_access_mode == 'single' else 5 await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, rq_access_mode=rq_access_mode) + fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, max_retries=max_retries) assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) await request_queue_apify.mark_request_as_handled(fetched) - assert await call_with_exp_backoff(request_queue_apify.is_finished, rq_access_mode=rq_access_mode) + assert await call_with_exp_backoff(request_queue_apify.is_finished, max_retries=max_retries) async def test_request_queue_deduplication_unprocessed_requests( @@ -1415,7 +1436,7 @@ async def worker() -> int: assert total_after_workers == 20 remaining_count = 0 - while request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'): + while request := await call_with_exp_backoff(rq.fetch_next_request, max_retries=5): remaining_count += 1 await rq.mark_request_as_handled(request)