Skip to content

Commit ac81511

Browse files
vdusekclaude
andcommitted
fix: Add exponential backoff for shared RQ propagation delay in integration tests
In shared request queue mode, the Apify API has an eventual consistency propagation delay before newly added or reclaimed requests become visible in fetch_next_request(). Many integration tests were missing the call_with_exp_backoff() retry pattern, causing sporadic failures. Also added rq_access_mode parameter to call_with_exp_backoff() so callers can pass it directly instead of wrapping every call in an if/else block. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent cba00a1 commit ac81511

2 files changed

Lines changed: 19 additions & 58 deletions

File tree

tests/integration/_utils.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4-
from typing import TYPE_CHECKING, TypeVar
4+
from typing import TYPE_CHECKING, Literal, TypeVar
55

66
from crawlee._utils.crypto import crypto_random_object_id
77

@@ -16,7 +16,7 @@
1616
async def call_with_exp_backoff(
1717
fn: Callable[[], Awaitable[T]],
1818
*,
19-
rq_access_mode: str | None = None,
19+
rq_access_mode: Literal['single', 'shared'],
2020
max_retries: int = 3,
2121
) -> T | None:
2222
"""Call an async callable with exponential backoff retries until it returns a truthy value.
@@ -25,24 +25,27 @@ async def call_with_exp_backoff(
2525
become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with
2626
exponential backoff to handle that delay in integration tests.
2727
28-
When `rq_access_mode` is provided and is not `'shared'`, the function is called once without retries.
28+
When `rq_access_mode` is `'single'`, the function is called once without retries.
2929
"""
30-
if rq_access_mode is not None and rq_access_mode != 'shared':
30+
if rq_access_mode == 'single':
3131
return await fn()
3232

33-
result = None
33+
if rq_access_mode == 'shared':
34+
result = None
3435

35-
for attempt in range(max_retries):
36-
result = await fn()
36+
for attempt in range(max_retries):
37+
result = await fn()
3738

38-
if result:
39-
return result
39+
if result:
40+
return result
4041

41-
delay = 2**attempt
42-
Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
43-
await asyncio.sleep(delay)
42+
delay = 2**attempt
43+
Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})')
44+
await asyncio.sleep(delay)
4445

45-
return result
46+
return result
47+
48+
raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}')
4649

4750

4851
def generate_unique_resource_name(label: str) -> str:

tests/integration/test_request_queue.py

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525

2626
from apify.storage_clients._apify._models import ApifyRequestQueueMetadata
2727

28+
# In shared mode, there is a propagation delay between operations so we use test helper
29+
# `call_with_exp_backoff` for exponential backoff. See https://github.com/apify/apify-sdk-python/issues/808.
30+
2831

2932
async def test_add_and_fetch_requests(
3033
request_queue_apify: RequestQueue,
@@ -42,8 +45,6 @@ async def test_add_and_fetch_requests(
4245
Actor.log.info(f'Adding request {i}...')
4346
await rq.add_request(f'https://example.com/{i}')
4447

45-
# In shared mode, there is a propagation delay before the newly added requests become available
46-
# (see https://github.com/apify/apify-sdk-python/issues/808).
4748
handled_request_count = 0
4849
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
4950
Actor.log.info('Fetching next request...')
@@ -79,8 +80,6 @@ async def test_add_requests_in_batches(
7980
total_count = await rq.get_total_count()
8081
Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}')
8182

82-
# In shared mode, there is a propagation delay before the newly added requests become available
83-
# (see https://github.com/apify/apify-sdk-python/issues/808).
8483
handled_request_count = 0
8584
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
8685
if handled_request_count % 20 == 0:
@@ -120,8 +119,6 @@ async def test_add_non_unique_requests_in_batch(
120119
total_count = await rq.get_total_count()
121120
Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}')
122121

123-
# In shared mode, there is a propagation delay before the newly added requests become available
124-
# (see https://github.com/apify/apify-sdk-python/issues/808).
125122
handled_request_count = 0
126123
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
127124
if handled_request_count % 20 == 0:
@@ -166,8 +163,6 @@ async def test_forefront_requests_ordering(
166163
Actor.log.info(f'Added 2 forefront requests, total in queue: {total_count}')
167164

168165
# Fetch requests and verify order.
169-
# In shared mode, there is a propagation delay before the newly added requests become available
170-
# (see https://github.com/apify/apify-sdk-python/issues/808).
171166
fetched_urls = []
172167
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
173168
Actor.log.info(f'Fetched request: {next_request.url}')
@@ -221,8 +216,6 @@ async def test_request_unique_key_behavior(
221216
assert result3.was_already_present is False, f'result3.was_already_present={result3.was_already_present}'
222217

223218
# Only 2 requests should be fetchable.
224-
# In shared mode, there is a propagation delay before the newly added requests become available
225-
# (see https://github.com/apify/apify-sdk-python/issues/808).
226219
fetched_count = 0
227220
fetched_requests = []
228221
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
@@ -256,8 +249,6 @@ async def test_request_reclaim_functionality(
256249
Actor.log.info('Added test request')
257250

258251
# Fetch and reclaim the request.
259-
# In shared mode, there is a propagation delay before the newly added request becomes visible
260-
# (see https://github.com/apify/apify-sdk-python/issues/808).
261252
fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
262253
assert fetched_request is not None
263254
Actor.log.info(f'Fetched request: {fetched_request.url}')
@@ -270,8 +261,6 @@ async def test_request_reclaim_functionality(
270261
Actor.log.info('Request reclaimed successfully')
271262

272263
# Should be able to fetch the same request again.
273-
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
274-
# (see https://github.com/apify/apify-sdk-python/issues/808).
275264
request2 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
276265

277266
assert request2 is not None
@@ -303,8 +292,6 @@ async def test_request_reclaim_with_forefront(
303292
Actor.log.info('Added 3 requests')
304293

305294
# Fetch first request.
306-
# In shared mode, there is a propagation delay before the newly added requests become available
307-
# (see https://github.com/apify/apify-sdk-python/issues/808).
308295
first_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
309296
assert first_request is not None
310297
Actor.log.info(f'Fetched first request: {first_request.url}')
@@ -314,8 +301,6 @@ async def test_request_reclaim_with_forefront(
314301
Actor.log.info('Request reclaimed to forefront')
315302

316303
# The reclaimed request should be fetched first again.
317-
# In shared mode, there is a propagation delay before the reclaimed request becomes visible
318-
# (see https://github.com/apify/apify-sdk-python/issues/808).
319304
next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
320305

321306
assert next_request is not None
@@ -356,8 +341,6 @@ async def test_complex_request_objects(
356341
Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}')
357342

358343
# Fetch and verify all properties are preserved.
359-
# In shared mode, there is a propagation delay before the newly added request becomes visible
360-
# (see https://github.com/apify/apify-sdk-python/issues/808).
361344
fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
362345
assert fetched_request is not None, f'fetched_request={fetched_request}'
363346
Actor.log.info(f'Fetched request: {fetched_request.url}')
@@ -437,8 +420,6 @@ async def test_metadata_tracking(
437420
assert handled_after_add == 0, f'handled_after_add={handled_after_add}'
438421

439422
# Process some requests.
440-
# In shared mode, there is a propagation delay before the newly added requests become available
441-
# (see https://github.com/apify/apify-sdk-python/issues/808).
442423
next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
443424
handled = 0
444425
if next_request:
@@ -486,8 +467,6 @@ async def test_batch_operations_performance(
486467
assert handled_count == 0, f'handled_count={handled_count}'
487468

488469
# Process all requests.
489-
# In shared mode, there is a propagation delay before the newly added requests become available
490-
# (see https://github.com/apify/apify-sdk-python/issues/808).
491470
processed_count = 0
492471
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
493472
processed_count += 1
@@ -522,8 +501,6 @@ async def test_state_consistency(
522501
Actor.log.info(f'Initial total count: {initial_total}')
523502

524503
# Simulate some requests being processed and others being reclaimed.
525-
# In shared mode, there is a propagation delay before the newly added requests become available
526-
# (see https://github.com/apify/apify-sdk-python/issues/808).
527504
processed_requests = []
528505
reclaimed_requests = []
529506

@@ -558,8 +535,6 @@ async def test_state_consistency(
558535
assert current_total == 10, f'current_total={current_total}'
559536

560537
# Process remaining requests.
561-
# In shared mode, there is a propagation delay before the reclaimed requests become visible
562-
# (see https://github.com/apify/apify-sdk-python/issues/808).
563538
remaining_count = 0
564539
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
565540
remaining_count += 1
@@ -624,8 +599,6 @@ async def test_large_batch_operations(
624599
assert total_count == 500, f'total_count={total_count}'
625600

626601
# Process all in chunks to test performance.
627-
# In shared mode, there is a propagation delay before the newly added requests become available
628-
# (see https://github.com/apify/apify-sdk-python/issues/808).
629602
processed_count = 0
630603

631604
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
@@ -672,8 +645,6 @@ async def test_mixed_string_and_request_objects(
672645
Actor.log.info(f'Total requests in queue: {total_count}')
673646

674647
# Fetch and verify all types work.
675-
# In shared mode, there is a propagation delay before the newly added requests become available
676-
# (see https://github.com/apify/apify-sdk-python/issues/808).
677648
fetched_requests = []
678649
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
679650
fetched_requests.append(next_request)
@@ -713,8 +684,6 @@ async def test_persistence_across_operations(
713684
Actor.log.info(f'Total count after initial batch: {initial_total}')
714685

715686
# Process some requests.
716-
# In shared mode, there is a propagation delay before the newly added requests become available
717-
# (see https://github.com/apify/apify-sdk-python/issues/808).
718687
processed_count = 0
719688
next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
720689
if next_request:
@@ -745,8 +714,6 @@ async def test_persistence_across_operations(
745714
assert handled_after_additional == 5, f'handled_after_additional={handled_after_additional}'
746715

747716
# Process remaining.
748-
# In shared mode, there is a propagation delay before the newly added requests become available
749-
# (see https://github.com/apify/apify-sdk-python/issues/808).
750717
remaining_processed = 0
751718
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
752719
remaining_processed += 1
@@ -836,8 +803,6 @@ async def test_request_ordering_with_mixed_operations(
836803
Actor.log.info('Added initial requests')
837804

838805
# Fetch one and reclaim to forefront.
839-
# In shared mode, there is a propagation delay before the newly added requests become available
840-
# (see https://github.com/apify/apify-sdk-python/issues/808).
841806
request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
842807
assert request1 is not None, f'request1={request1}'
843808
assert request1.url == 'https://example.com/1', f'request1.url={request1.url}'
@@ -851,7 +816,6 @@ async def test_request_ordering_with_mixed_operations(
851816
Actor.log.info('Added new forefront request')
852817

853818
# Fetch all requests and verify forefront behavior.
854-
# In shared mode, there is a propagation delay before the reclaimed/added requests become visible.
855819
urls_ordered = list[str]()
856820
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode):
857821
urls_ordered.append(next_request.url)
@@ -1118,8 +1082,6 @@ async def test_pre_existing_request_with_user_data(
11181082
await rq_client.add_request(req.model_dump(by_alias=True))
11191083

11201084
# Fetch the request by the client under test.
1121-
# In shared mode, there is a propagation delay before the newly added request becomes visible
1122-
# (see https://github.com/apify/apify-sdk-python/issues/808).
11231085
request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode)
11241086
assert request_obtained is not None
11251087
# Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data)
@@ -1154,8 +1116,6 @@ async def test_request_queue_is_finished(
11541116
await request_queue_apify.add_request(Request.from_url('http://example.com'))
11551117
assert not await request_queue_apify.is_finished()
11561118

1157-
# In shared mode, there is a propagation delay before the newly added request becomes visible
1158-
# (see https://github.com/apify/apify-sdk-python/issues/808).
11591119
fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, rq_access_mode=rq_access_mode)
11601120
assert fetched is not None
11611121
assert not await request_queue_apify.is_finished(), (
@@ -1428,8 +1388,6 @@ async def worker() -> int:
14281388
total_after_workers = await rq.get_total_count()
14291389
assert total_after_workers == 20
14301390

1431-
# In shared mode, there is a propagation delay before the reclaimed requests become visible
1432-
# (see https://github.com/apify/apify-sdk-python/issues/808).
14331391
remaining_count = 0
14341392
while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'):
14351393
remaining_count += 1

0 commit comments

Comments
 (0)