From cba00a1108bb7485ee9a67f8119046c024ed1c44 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 13 Mar 2026 09:54:48 +0100 Subject: [PATCH 1/3] fix: Add exponential backoff for shared RQ propagation delay in integration tests In shared request queue mode, the Apify API has an eventual consistency propagation delay before newly added or reclaimed requests become visible in fetch_next_request(). Many integration tests were missing the call_with_exp_backoff() retry pattern, causing sporadic failures. Also added rq_access_mode parameter to call_with_exp_backoff() so callers can pass it directly instead of wrapping every call in an if/else block. Co-Authored-By: Claude Opus 4.6 --- tests/integration/_utils.py | 12 +- tests/integration/test_request_queue.py | 278 ++++++++++++++---------- 2 files changed, 170 insertions(+), 120 deletions(-) diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index d46fae06..20205b36 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -13,13 +13,23 @@ T = TypeVar('T') -async def call_with_exp_backoff(fn: Callable[[], Awaitable[T]], *, max_retries: int = 3) -> T | None: +async def call_with_exp_backoff( + fn: Callable[[], Awaitable[T]], + *, + rq_access_mode: str | None = None, + max_retries: int = 3, +) -> T | None: """Call an async callable with exponential backoff retries until it returns a truthy value. In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle that delay in integration tests. + + When `rq_access_mode` is provided and is not `'shared'`, the function is called once without retries. """ + if rq_access_mode is not None and rq_access_mode != 'shared': + return await fn() + result = None for attempt in range(max_retries): diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index eb395d42..3bb6b34a 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -42,8 +42,10 @@ async def test_add_and_fetch_requests( Actor.log.info(f'Adding request {i}...') await rq.add_request(f'https://example.com/{i}') + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). handled_request_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): Actor.log.info('Fetching next request...') queue_operation_info = await rq.mark_request_as_handled(next_request) assert queue_operation_info is not None, f'queue_operation_info={queue_operation_info}' @@ -57,10 +59,7 @@ async def test_add_and_fetch_requests( f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' @@ -80,8 +79,10 @@ async def test_add_requests_in_batches( total_count = await rq.get_total_count() Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). handled_request_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}...') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -95,10 +96,7 @@ async def test_add_requests_in_batches( f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' @@ -122,8 +120,10 @@ async def test_add_non_unique_requests_in_batch( total_count = await rq.get_total_count() Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). handled_request_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}: {next_request.url}') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -138,16 +138,17 @@ async def test_add_non_unique_requests_in_batch( f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' -async def test_forefront_requests_ordering(request_queue_apify: RequestQueue) -> None: +async def test_forefront_requests_ordering( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test that forefront requests are processed before regular requests.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -164,9 +165,11 @@ async def test_forefront_requests_ordering(request_queue_apify: RequestQueue) -> total_count = await rq.get_total_count() Actor.log.info(f'Added 2 forefront requests, total in queue: {total_count}') - # Fetch requests and verify order + # Fetch requests and verify order. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). fetched_urls = [] - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): Actor.log.info(f'Fetched request: {next_request.url}') fetched_urls.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -185,8 +188,12 @@ async def test_forefront_requests_ordering(request_queue_apify: RequestQueue) -> ) -async def test_request_unique_key_behavior(request_queue_apify: RequestQueue) -> None: +async def test_request_unique_key_behavior( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test behavior of custom unique keys.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -213,10 +220,12 @@ async def test_request_unique_key_behavior(request_queue_apify: RequestQueue) -> assert result2.was_already_present is True, f'result2.was_already_present={result2.was_already_present}' assert result3.was_already_present is False, f'result3.was_already_present={result3.was_already_present}' - # Only 2 requests should be fetchable + # Only 2 requests should be fetchable. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). fetched_count = 0 fetched_requests = [] - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): fetched_count += 1 fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -246,8 +255,10 @@ async def test_request_reclaim_functionality( await rq.add_request('https://example.com/test') Actor.log.info('Added test request') - # Fetch and reclaim the request - fetched_request = await rq.fetch_next_request() + # Fetch and reclaim the request. + # In shared mode, there is a propagation delay before the newly added request becomes visible + # (see https://github.com/apify/apify-sdk-python/issues/808). + fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -261,10 +272,7 @@ async def test_request_reclaim_functionality( # Should be able to fetch the same request again. # In shared mode, there is a propagation delay before the reclaimed request becomes visible # (see https://github.com/apify/apify-sdk-python/issues/808). - if rq_access_mode == 'shared': - request2 = await call_with_exp_backoff(rq.fetch_next_request) - else: - request2 = await rq.fetch_next_request() + request2 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request2 is not None assert request2.url == fetched_request.url @@ -273,10 +281,7 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True @@ -297,8 +302,10 @@ async def test_request_reclaim_with_forefront( await rq.add_request('https://example.com/3') Actor.log.info('Added 3 requests') - # Fetch first request - first_request = await rq.fetch_next_request() + # Fetch first request. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). + first_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -309,10 +316,7 @@ async def test_request_reclaim_with_forefront( # The reclaimed request should be fetched first again. # In shared mode, there is a propagation delay before the reclaimed request becomes visible # (see https://github.com/apify/apify-sdk-python/issues/808). - if rq_access_mode == 'shared': - next_request = await call_with_exp_backoff(rq.fetch_next_request) - else: - next_request = await rq.fetch_next_request() + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert next_request is not None assert next_request.url == first_request.url @@ -330,25 +334,31 @@ async def test_request_reclaim_with_forefront( Actor.log.info(f'Test completed - processed {remaining_count} additional requests') -async def test_complex_request_objects(request_queue_apify: RequestQueue) -> None: +async def test_complex_request_objects( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test handling complex Request objects with various properties.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') # Create request with various properties - request = Request.from_url( + complex_request = Request.from_url( 'https://example.com/api/data', method='POST', headers={'Authorization': 'Bearer token123', 'Content-Type': 'application/json'}, user_data={'category': 'api', 'priority': 'high'}, unique_key='api-request-1', ) - await rq.add_request(request) - Actor.log.info(f'Added complex request: {request.url} with method {request.method}') + await rq.add_request(complex_request) + Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') - # Fetch and verify all properties are preserved - fetched_request = await rq.fetch_next_request() + # Fetch and verify all properties are preserved. + # In shared mode, there is a propagation delay before the newly added request becomes visible + # (see https://github.com/apify/apify-sdk-python/issues/808). + fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -398,8 +408,12 @@ async def test_get_request_by_unique_key(request_queue_apify: RequestQueue) -> N Actor.log.info('Non-existent unique_key correctly returned None') -async def test_metadata_tracking(request_queue_apify: RequestQueue) -> None: +async def test_metadata_tracking( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test request queue metadata and counts.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -422,13 +436,21 @@ async def test_metadata_tracking(request_queue_apify: RequestQueue) -> None: assert total_after_add == 5, f'total_after_add={total_after_add}' assert handled_after_add == 0, f'handled_after_add={handled_after_add}' - # Process some requests - for _ in range(3): - request = await rq.fetch_next_request() - if request: - await rq.mark_request_as_handled(request) + # Process some requests. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + handled = 0 + if next_request: + await rq.mark_request_as_handled(next_request) + handled += 1 + for _ in range(2): + next_request = await rq.fetch_next_request() + if next_request: + await rq.mark_request_as_handled(next_request) + handled += 1 - Actor.log.info('Processed 3 requests') + Actor.log.info(f'Processed {handled} requests') # Check counts after processing final_total = await rq.get_total_count() @@ -463,9 +485,11 @@ async def test_batch_operations_performance( assert total_count == 50, f'total_count={total_count}' assert handled_count == 0, f'handled_count={handled_count}' - # Process all requests + # Process all requests. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). processed_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): processed_count += 1 await rq.mark_request_as_handled(next_request) if processed_count >= 50: # Safety break @@ -474,10 +498,7 @@ async def test_batch_operations_performance( Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' @@ -500,11 +521,17 @@ async def test_state_consistency( initial_total = await rq.get_total_count() Actor.log.info(f'Initial total count: {initial_total}') - # Simulate some requests being processed and others being reclaimed + # Simulate some requests being processed and others being reclaimed. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). processed_requests = [] reclaimed_requests = [] - for i in range(5): + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + if next_request: + await rq.mark_request_as_handled(next_request) + processed_requests.append(next_request) + for i in range(1, 5): next_request = await rq.fetch_next_request() if next_request: if i % 2 == 0: # Process even indices @@ -530,17 +557,16 @@ async def test_state_consistency( ) assert current_total == 10, f'current_total={current_total}' - # Process remaining requests + # Process remaining requests. + # In shared mode, there is a propagation delay before the reclaimed requests become visible + # (see https://github.com/apify/apify-sdk-python/issues/808). remaining_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): remaining_count += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' @@ -597,30 +623,28 @@ async def test_large_batch_operations( total_count = await rq.get_total_count() assert total_count == 500, f'total_count={total_count}' - # Process all in chunks to test performance + # Process all in chunks to test performance. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). processed_count = 0 - while not await rq.is_empty(): - next_request = await rq.fetch_next_request() - - # The RQ is_empty should ensure we don't get None - assert next_request is not None, f'next_request={next_request}' - + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' -async def test_mixed_string_and_request_objects(request_queue_apify: RequestQueue) -> None: +async def test_mixed_string_and_request_objects( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test adding both string URLs and Request objects.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -647,9 +671,11 @@ async def test_mixed_string_and_request_objects(request_queue_apify: RequestQueu total_count = await rq.get_total_count() Actor.log.info(f'Total requests in queue: {total_count}') - # Fetch and verify all types work + # Fetch and verify all types work. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). fetched_requests = [] - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -667,8 +693,12 @@ async def test_mixed_string_and_request_objects(request_queue_apify: RequestQueu Actor.log.info('Mixed types verified - found request object with user_data') -async def test_persistence_across_operations(request_queue_apify: RequestQueue) -> None: +async def test_persistence_across_operations( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test that queue state persists across different operations.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') # Open queue and add some requests rq = request_queue_apify @@ -682,12 +712,18 @@ async def test_persistence_across_operations(request_queue_apify: RequestQueue) initial_total = await rq.get_total_count() Actor.log.info(f'Total count after initial batch: {initial_total}') - # Process some requests + # Process some requests. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). processed_count = 0 - for _ in range(5): - request = await rq.fetch_next_request() - if request: - await rq.mark_request_as_handled(request) + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + if next_request: + await rq.mark_request_as_handled(next_request) + processed_count += 1 + for _ in range(4): + next_request = await rq.fetch_next_request() + if next_request: + await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processed {processed_count} requests from initial batch') @@ -708,15 +744,13 @@ async def test_persistence_across_operations(request_queue_apify: RequestQueue) assert total_after_additional == 15, f'total_after_additional={total_after_additional}' assert handled_after_additional == 5, f'handled_after_additional={handled_after_additional}' - # Process remaining + # Process remaining. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). remaining_processed = 0 - while not await rq.is_finished(): - request = await rq.fetch_next_request() - if request: - remaining_processed += 1 - await rq.mark_request_as_handled(request) - else: - break + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + remaining_processed += 1 + await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') is_finished = await rq.is_finished() @@ -786,8 +820,12 @@ async def test_request_deduplication_edge_cases( ) -async def test_request_ordering_with_mixed_operations(request_queue_apify: RequestQueue) -> None: +async def test_request_ordering_with_mixed_operations( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test request ordering with mixed add/reclaim operations.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -797,8 +835,10 @@ async def test_request_ordering_with_mixed_operations(request_queue_apify: Reque await rq.add_request('https://example.com/2') Actor.log.info('Added initial requests') - # Fetch one and reclaim to forefront - request1 = await rq.fetch_next_request() + # Fetch one and reclaim to forefront. + # In shared mode, there is a propagation delay before the newly added requests become available + # (see https://github.com/apify/apify-sdk-python/issues/808). + request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request1 is not None, f'request1={request1}' assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' Actor.log.info(f'Fetched request: {request1.url}') @@ -810,9 +850,10 @@ async def test_request_ordering_with_mixed_operations(request_queue_apify: Reque await rq.add_request('https://example.com/priority', forefront=True) Actor.log.info('Added new forefront request') - # Fetch all requests and verify forefront behavior + # Fetch all requests and verify forefront behavior. + # In shared mode, there is a propagation delay before the reclaimed/added requests become visible. urls_ordered = list[str]() - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): urls_ordered.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -1046,38 +1087,40 @@ async def test_rq_long_url( assert processed_request is not None assert processed_request.id == request_id - request_obtained = await rq.fetch_next_request() + request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request_obtained is not None await rq.mark_request_as_handled(request_obtained) - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished async def test_pre_existing_request_with_user_data( - request_queue_apify: RequestQueue, apify_client_async: ApifyClientAsync + request_queue_apify: RequestQueue, + apify_client_async: ApifyClientAsync, + request: pytest.FixtureRequest, ) -> None: """Test that pre-existing requests with user data are fully fetched. list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') custom_data = {'key': 'value'} rq = request_queue_apify - request = Request.from_url( + req = Request.from_url( 'https://example.com', user_data=custom_data.copy(), ) # Add request by a different producer rq_client = apify_client_async.request_queue(request_queue_id=rq.id) - await rq_client.add_request(request.model_dump(by_alias=True)) + await rq_client.add_request(req.model_dump(by_alias=True)) - # Fetch the request by the client under test - request_obtained = await rq.fetch_next_request() + # Fetch the request by the client under test. + # In shared mode, there is a propagation delay before the newly added request becomes visible + # (see https://github.com/apify/apify-sdk-python/issues/808). + request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) assert custom_data.items() <= request_obtained.user_data.items() @@ -1111,17 +1154,16 @@ async def test_request_queue_is_finished( await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - fetched = await request_queue_apify.fetch_next_request() + # In shared mode, there is a propagation delay before the newly added request becomes visible + # (see https://github.com/apify/apify-sdk-python/issues/808). + fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) await request_queue_apify.mark_request_as_handled(fetched) - if rq_access_mode == 'shared': - assert await call_with_exp_backoff(request_queue_apify.is_finished) - else: - assert await request_queue_apify.is_finished() + assert await call_with_exp_backoff(request_queue_apify.is_finished, rq_access_mode=rq_access_mode) async def test_request_queue_deduplication_unprocessed_requests( @@ -1386,14 +1428,12 @@ async def worker() -> int: total_after_workers = await rq.get_total_count() assert total_after_workers == 20 + # In shared mode, there is a propagation delay before the reclaimed requests become visible + # (see https://github.com/apify/apify-sdk-python/issues/808). remaining_count = 0 - while not await rq.is_finished(): - request = await rq.fetch_next_request() - if request: - remaining_count += 1 - await rq.mark_request_as_handled(request) - else: - break + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'): + remaining_count += 1 + await rq.mark_request_as_handled(next_request) final_handled = await rq.get_handled_count() final_total = await rq.get_total_count() From f20c29f75610a1a4db6cc79c7113333b11e561ee Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 13 Mar 2026 10:30:25 +0100 Subject: [PATCH 2/3] Update --- tests/integration/_utils.py | 29 +++--- tests/integration/test_request_queue.py | 118 ++++++++---------------- 2 files changed, 54 insertions(+), 93 deletions(-) diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index 20205b36..2d941af7 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -1,7 +1,7 @@ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING, TypeVar +from typing import TYPE_CHECKING, Literal, TypeVar from crawlee._utils.crypto import crypto_random_object_id @@ -16,7 +16,7 @@ async def call_with_exp_backoff( fn: Callable[[], Awaitable[T]], *, - rq_access_mode: str | None = None, + rq_access_mode: Literal['single', 'shared'], max_retries: int = 3, ) -> T | None: """Call an async callable with exponential backoff retries until it returns a truthy value. @@ -25,24 +25,27 @@ async def call_with_exp_backoff( become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle that delay in integration tests. - When `rq_access_mode` is provided and is not `'shared'`, the function is called once without retries. + When `rq_access_mode` is `'single'`, the function is called once without retries. """ - if rq_access_mode is not None and rq_access_mode != 'shared': + if rq_access_mode == 'single': return await fn() - result = None + if rq_access_mode == 'shared': + result = None - for attempt in range(max_retries): - result = await fn() + for attempt in range(max_retries): + result = await fn() - if result: - return result + if result: + return result - delay = 2**attempt - Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') - await asyncio.sleep(delay) + delay = 2**attempt + Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') + await asyncio.sleep(delay) - return result + return result + + raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}') def generate_unique_resource_name(label: str) -> str: diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 3bb6b34a..2d65d4b5 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -25,6 +25,9 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata +# In shared mode, there is a propagation delay between operations so we use test helper +# `call_with_exp_backoff` for exponential backoff. See https://github.com/apify/apify-sdk-python/issues/808. + async def test_add_and_fetch_requests( request_queue_apify: RequestQueue, @@ -42,8 +45,6 @@ async def test_add_and_fetch_requests( Actor.log.info(f'Adding request {i}...') await rq.add_request(f'https://example.com/{i}') - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). handled_request_count = 0 while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): Actor.log.info('Fetching next request...') @@ -79,8 +80,6 @@ async def test_add_requests_in_batches( total_count = await rq.get_total_count() Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). handled_request_count = 0 while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): if handled_request_count % 20 == 0: @@ -120,8 +119,6 @@ async def test_add_non_unique_requests_in_batch( total_count = await rq.get_total_count() Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). handled_request_count = 0 while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): if handled_request_count % 20 == 0: @@ -166,8 +163,6 @@ async def test_forefront_requests_ordering( Actor.log.info(f'Added 2 forefront requests, total in queue: {total_count}') # Fetch requests and verify order. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). fetched_urls = [] while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): Actor.log.info(f'Fetched request: {next_request.url}') @@ -221,8 +216,6 @@ async def test_request_unique_key_behavior( assert result3.was_already_present is False, f'result3.was_already_present={result3.was_already_present}' # Only 2 requests should be fetchable. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). fetched_count = 0 fetched_requests = [] while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): @@ -256,8 +249,6 @@ async def test_request_reclaim_functionality( Actor.log.info('Added test request') # Fetch and reclaim the request. - # In shared mode, there is a propagation delay before the newly added request becomes visible - # (see https://github.com/apify/apify-sdk-python/issues/808). fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -270,8 +261,6 @@ async def test_request_reclaim_functionality( Actor.log.info('Request reclaimed successfully') # Should be able to fetch the same request again. - # In shared mode, there is a propagation delay before the reclaimed request becomes visible - # (see https://github.com/apify/apify-sdk-python/issues/808). request2 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request2 is not None @@ -303,8 +292,6 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Added 3 requests') # Fetch first request. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). first_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -314,8 +301,6 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Request reclaimed to forefront') # The reclaimed request should be fetched first again. - # In shared mode, there is a propagation delay before the reclaimed request becomes visible - # (see https://github.com/apify/apify-sdk-python/issues/808). next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert next_request is not None @@ -356,8 +341,6 @@ async def test_complex_request_objects( Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') # Fetch and verify all properties are preserved. - # In shared mode, there is a propagation delay before the newly added request becomes visible - # (see https://github.com/apify/apify-sdk-python/issues/808). fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -437,20 +420,12 @@ async def test_metadata_tracking( assert handled_after_add == 0, f'handled_after_add={handled_after_add}' # Process some requests. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) - handled = 0 - if next_request: - await rq.mark_request_as_handled(next_request) - handled += 1 - for _ in range(2): - next_request = await rq.fetch_next_request() + for _ in range(3): + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) if next_request: await rq.mark_request_as_handled(next_request) - handled += 1 - Actor.log.info(f'Processed {handled} requests') + Actor.log.info('Processed 3 requests') # Check counts after processing final_total = await rq.get_total_count() @@ -486,8 +461,6 @@ async def test_batch_operations_performance( assert handled_count == 0, f'handled_count={handled_count}' # Process all requests. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). processed_count = 0 while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): processed_count += 1 @@ -522,17 +495,11 @@ async def test_state_consistency( Actor.log.info(f'Initial total count: {initial_total}') # Simulate some requests being processed and others being reclaimed. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). processed_requests = [] reclaimed_requests = [] - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) - if next_request: - await rq.mark_request_as_handled(next_request) - processed_requests.append(next_request) - for i in range(1, 5): - next_request = await rq.fetch_next_request() + for i in range(5): + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) if next_request: if i % 2 == 0: # Process even indices await rq.mark_request_as_handled(next_request) @@ -558,8 +525,6 @@ async def test_state_consistency( assert current_total == 10, f'current_total={current_total}' # Process remaining requests. - # In shared mode, there is a propagation delay before the reclaimed requests become visible - # (see https://github.com/apify/apify-sdk-python/issues/808). remaining_count = 0 while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): remaining_count += 1 @@ -570,23 +535,24 @@ async def test_state_consistency( assert is_finished is True, f'is_finished={is_finished}' -async def test_empty_rq_behavior(request_queue_apify: RequestQueue) -> None: +async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pytest.FixtureRequest) -> None: """Test behavior with empty queues.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') # Test empty queue operations - is_empty = await rq.is_empty() - is_finished = await rq.is_finished() + is_empty = await call_with_exp_backoff(rq.is_empty, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) Actor.log.info(f'Empty queue - is_empty: {is_empty}, is_finished: {is_finished}') assert is_empty is True, f'is_empty={is_empty}' assert is_finished is True, f'is_finished={is_finished}' # Fetch from empty queue - request = await rq.fetch_next_request() - Actor.log.info(f'Fetch result from empty queue: {request}') - assert request is None, f'request={request}' + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + Actor.log.info(f'Fetch result from empty queue: {next_request}') + assert next_request is None, f'request={next_request}' # Check metadata for empty queue metadata = await rq.get_metadata() @@ -624,11 +590,14 @@ async def test_large_batch_operations( assert total_count == 500, f'total_count={total_count}' # Process all in chunks to test performance. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while not await call_with_exp_backoff(rq.is_empty, rq_access_mode=rq_access_mode): + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + + # The RQ is_empty should ensure we don't get None + assert next_request is not None, f'next_request={next_request}' + await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -672,8 +641,6 @@ async def test_mixed_string_and_request_objects( Actor.log.info(f'Total requests in queue: {total_count}') # Fetch and verify all types work. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). fetched_requests = [] while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): fetched_requests.append(next_request) @@ -713,15 +680,9 @@ async def test_persistence_across_operations( Actor.log.info(f'Total count after initial batch: {initial_total}') # Process some requests. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). processed_count = 0 - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) - if next_request: - await rq.mark_request_as_handled(next_request) - processed_count += 1 - for _ in range(4): - next_request = await rq.fetch_next_request() + for _ in range(5): + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) if next_request: await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -745,15 +706,17 @@ async def test_persistence_across_operations( assert handled_after_additional == 5, f'handled_after_additional={handled_after_additional}' # Process remaining. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). remaining_processed = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): - remaining_processed += 1 - await rq.mark_request_as_handled(next_request) + while not await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode): + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + if next_request: + remaining_processed += 1 + await rq.mark_request_as_handled(next_request) + else: + break Actor.log.info(f'Processed {remaining_processed} remaining requests') - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) final_total = await rq.get_total_count() final_handled = await rq.get_handled_count() @@ -836,8 +799,6 @@ async def test_request_ordering_with_mixed_operations( Actor.log.info('Added initial requests') # Fetch one and reclaim to forefront. - # In shared mode, there is a propagation delay before the newly added requests become available - # (see https://github.com/apify/apify-sdk-python/issues/808). request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request1 is not None, f'request1={request1}' assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' @@ -851,7 +812,6 @@ async def test_request_ordering_with_mixed_operations( Actor.log.info('Added new forefront request') # Fetch all requests and verify forefront behavior. - # In shared mode, there is a propagation delay before the reclaimed/added requests become visible. urls_ordered = list[str]() while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): urls_ordered.append(next_request.url) @@ -1118,8 +1078,6 @@ async def test_pre_existing_request_with_user_data( await rq_client.add_request(req.model_dump(by_alias=True)) # Fetch the request by the client under test. - # In shared mode, there is a propagation delay before the newly added request becomes visible - # (see https://github.com/apify/apify-sdk-python/issues/808). request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) @@ -1154,8 +1112,6 @@ async def test_request_queue_is_finished( await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - # In shared mode, there is a propagation delay before the newly added request becomes visible - # (see https://github.com/apify/apify-sdk-python/issues/808). fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched is not None assert not await request_queue_apify.is_finished(), ( @@ -1428,12 +1384,14 @@ async def worker() -> int: total_after_workers = await rq.get_total_count() assert total_after_workers == 20 - # In shared mode, there is a propagation delay before the reclaimed requests become visible - # (see https://github.com/apify/apify-sdk-python/issues/808). remaining_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'): - remaining_count += 1 - await rq.mark_request_as_handled(next_request) + while not await call_with_exp_backoff(rq.is_finished, rq_access_mode='shared'): + request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared') + if request: + remaining_count += 1 + await rq.mark_request_as_handled(request) + else: + break final_handled = await rq.get_handled_count() final_total = await rq.get_total_count() From 3e28576887cbce5992ace917153313bd88e56f0e Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 13 Mar 2026 12:04:25 +0100 Subject: [PATCH 3/3] fix: Remove call_with_exp_backoff from while-not loop conditions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Using call_with_exp_backoff in `while not is_empty/is_finished` loop conditions caused infinite loops in shared mode — the backoff retried 3 times (7s) per iteration even when the queue legitimately had items, leading to 1800s+ timeouts on large batches. Replaced with walrus operator `while request := await call_with_exp_backoff(fetch_next_request)` pattern in test_large_batch_operations, test_persistence_across_operations, and test_concurrent_processing_simulation. Co-Authored-By: Claude Opus 4.6 --- tests/integration/test_request_queue.py | 27 +++++++------------------ 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 2d65d4b5..8011d8f1 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -592,12 +592,7 @@ async def test_large_batch_operations( # Process all in chunks to test performance. processed_count = 0 - while not await call_with_exp_backoff(rq.is_empty, rq_access_mode=rq_access_mode): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) - - # The RQ is_empty should ensure we don't get None - assert next_request is not None, f'next_request={next_request}' - + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -707,13 +702,9 @@ async def test_persistence_across_operations( # Process remaining. remaining_processed = 0 - while not await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) - if next_request: - remaining_processed += 1 - await rq.mark_request_as_handled(next_request) - else: - break + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + remaining_processed += 1 + await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) @@ -1385,13 +1376,9 @@ async def worker() -> int: assert total_after_workers == 20 remaining_count = 0 - while not await call_with_exp_backoff(rq.is_finished, rq_access_mode='shared'): - request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared') - if request: - remaining_count += 1 - await rq.mark_request_as_handled(request) - else: - break + while request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'): + remaining_count += 1 + await rq.mark_request_as_handled(request) final_handled = await rq.get_handled_count() final_total = await rq.get_total_count()