diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 8257ec15..929fb458 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import contextlib from collections import deque from datetime import datetime, timedelta, timezone from logging import getLogger @@ -73,9 +74,6 @@ def __init__( self._queue_has_locked_requests: bool | None = None """Whether the queue contains requests currently locked by other clients.""" - self._should_check_for_forefront_requests = False - """Flag indicating whether to refresh the queue head to check for newly added forefront requests.""" - self._fetch_lock = asyncio.Lock() """Lock to prevent race conditions during concurrent fetch operations.""" @@ -106,7 +104,7 @@ async def add_batch_of_requests( ) else: - # Add new request to the cache. + # Add new request to the cache, hydrated so subsequent fetches don't need an API roundtrip. processed_request = ProcessedRequest( id=request_id, unique_key=request.unique_key, @@ -116,6 +114,7 @@ async def add_batch_of_requests( self._cache_request( request_id, processed_request, + hydrated_request=request, ) new_requests.append(request) @@ -137,9 +136,23 @@ async def add_batch_of_requests( api_response.processed_requests.extend(already_present_requests) # Remove unprocessed requests from the cache + unprocessed_ids = set[str]() for unprocessed_request in api_response.unprocessed_requests: unprocessed_request_id = unique_key_to_request_id(unprocessed_request.unique_key) self._requests_cache.pop(unprocessed_request_id, None) + unprocessed_ids.add(unprocessed_request_id) + + # When adding to the forefront, eagerly insert successfully-added requests at the front of the + # local queue head. The Apify API has a propagation delay between `batch_add_requests(forefront)` + # and the result being reflected in `list_and_lock_head` (#808), so trusting the local intent + # makes the new requests fetchable immediately. Iterate in reverse so the first request in the + # batch ends up at the very front. + if forefront: + for request in reversed(new_requests): + request_id = unique_key_to_request_id(request.unique_key) + if request_id in unprocessed_ids or request_id in self._queue_head: + continue + self._queue_head.appendleft(request_id) else: api_response = AddRequestsResponse.model_validate( @@ -274,10 +287,15 @@ async def reclaim_request( hydrated_request=request, ) - # If we're adding to the forefront, we need to check for forefront requests - # in the next list_head call + # When reclaiming to the forefront, eagerly insert the request at the front of the local + # queue head. The Apify API has a propagation delay between `update_request(forefront=true)` + # and the result being reflected in `list_and_lock_head` (#808), so trusting the local intent + # avoids returning a different request on the next fetch. Dedupe in case the request_id was + # re-added by a concurrent refresh between fetch and reclaim. if forefront: - self._should_check_for_forefront_requests = True + with contextlib.suppress(ValueError): + self._queue_head.remove(request_id) + self._queue_head.appendleft(request_id) except Exception: logger.exception(f'Error reclaiming request {request.unique_key}') @@ -317,7 +335,7 @@ async def _get_request_by_id(self, request_id: str) -> Request | None: async def _ensure_head_is_non_empty(self) -> None: """Ensure that the queue head has requests if they are available in the queue.""" # If queue head has adequate requests, skip fetching more - if len(self._queue_head) > 1 and not self._should_check_for_forefront_requests: + if len(self._queue_head) > 1: return # Fetch requests from the API and populate the queue head @@ -404,8 +422,10 @@ async def _list_head( Returns: A collection of requests from the beginning of the queue. """ - # Return from cache if available and we're not checking for new forefront requests - if self._queue_head and not self._should_check_for_forefront_requests: + # Return from cache if available. Local-head updates from `add_batch_of_requests(forefront=True)` and + # `reclaim_request(forefront=True)` keep the cache authoritative for our own forefront ops, so we don't + # need to force an API refresh after them. + if self._queue_head: logger.debug(f'Using cached queue head with {len(self._queue_head)} requests') # Create a list of requests from the cached queue head items = [] @@ -424,11 +444,6 @@ async def _list_head( lock_time=None, queue_has_locked_requests=self._queue_has_locked_requests, ) - leftover_buffer = list[str]() - if self._should_check_for_forefront_requests: - leftover_buffer = list(self._queue_head) - self._queue_head.clear() - self._should_check_for_forefront_requests = False # Otherwise fetch from API response = await self._api_client.list_and_lock_head( @@ -469,10 +484,6 @@ async def _list_head( ) self._queue_head.append(request_id) - for leftover_id in leftover_buffer: - # After adding new requests to the forefront, any existing leftover locked request is kept in the end. - self._queue_head.append(leftover_id) - return RequestQueueHead.model_validate(response) def _cache_request(