1010from typing_extensions import override
1111
1212from apify_client import ApifyClientAsync
13- from crawlee ._utils .requests import unique_key_to_request_id
1413from crawlee .storage_clients ._base import RequestQueueClient
1514from crawlee .storage_clients .models import AddRequestsResponse , ProcessedRequest , RequestQueueMetadata
1615
@@ -59,10 +58,10 @@ def __init__(
5958 """The name of the request queue."""
6059
6160 self ._queue_head = deque [str ]()
62- """A deque to store request IDs in the queue head."""
61+ """A deque to store request unique keys in the queue head."""
6362
6463 self ._requests_cache : LRUCache [str , CachedRequest ] = LRUCache (maxsize = self ._MAX_CACHED_REQUESTS )
65- """A cache to store request objects. Request ID is used as the cache key."""
64+ """A cache to store request objects. Request unique key is used as the cache key."""
6665
6766 self ._queue_has_locked_requests : bool | None = None
6867 """Whether the queue has requests locked by another client."""
@@ -248,14 +247,13 @@ async def add_batch_of_requests(
248247 already_present_requests : list [ProcessedRequest ] = []
249248
250249 for request in requests :
251- if self ._requests_cache .get (request .id ):
250+ if self ._requests_cache .get (request .unique_key ):
252251 # We are not sure if it was already handled at this point, and it is not worth calling API for it.
253252 # It could have been handled by another client in the meantime, so cached information about
254253 # `request.was_already_handled` is not reliable.
255254 already_present_requests .append (
256255 ProcessedRequest .model_validate (
257256 {
258- 'id' : request .id ,
259257 'uniqueKey' : request .unique_key ,
260258 'wasAlreadyPresent' : True ,
261259 'wasAlreadyHandled' : request .was_already_handled ,
@@ -267,14 +265,13 @@ async def add_batch_of_requests(
267265 # Add new request to the cache.
268266 processed_request = ProcessedRequest .model_validate (
269267 {
270- 'id' : request .id ,
271268 'uniqueKey' : request .unique_key ,
272269 'wasAlreadyPresent' : True ,
273270 'wasAlreadyHandled' : request .was_already_handled ,
274271 }
275272 )
276273 self ._cache_request (
277- unique_key_to_request_id ( request .unique_key ) ,
274+ request .unique_key ,
278275 processed_request ,
279276 )
280277 new_requests .append (request )
@@ -299,7 +296,7 @@ async def add_batch_of_requests(
299296
300297 # Remove unprocessed requests from the cache
301298 for unprocessed_request in api_response .unprocessed_requests :
302- self ._requests_cache .pop (unique_key_to_request_id ( unprocessed_request .unique_key ) , None )
299+ self ._requests_cache .pop (unprocessed_request .unique_key , None )
303300
304301 else :
305302 api_response = AddRequestsResponse .model_validate (
@@ -323,16 +320,16 @@ async def add_batch_of_requests(
323320 return api_response
324321
325322 @override
326- async def get_request (self , request_id : str ) -> Request | None :
323+ async def get_request (self , request_unique_key : str ) -> Request | None :
327324 """Get a request by ID.
328325
329326 Args:
330- request_id: The ID of the request to get.
327+ request_unique_key: Unique key of the request to get.
331328
332329 Returns:
333330 The request or None if not found.
334331 """
335- response = await self ._api_client .get_request ( request_id )
332+ response = await self ._api_client .get_request_by_unique_key ( request_unique_key )
336333
337334 if response is None :
338335 return None
@@ -381,7 +378,7 @@ async def fetch_next_request(self) -> Request | None:
381378 return None
382379
383380 # Use get request to ensure we have the full request object.
384- request = await self .get_request (request .id )
381+ request = await self .get_request (request .unique_key )
385382 if request is None :
386383 logger .debug (
387384 'Request fetched from the beginning of queue was not found in the RQ' ,
@@ -407,7 +404,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
407404 if request .handled_at is None :
408405 request .handled_at = datetime .now (tz = timezone .utc )
409406
410- if cached_request := self ._requests_cache [request .id ]:
407+ if cached_request := self ._requests_cache [request .unique_key ]:
411408 cached_request .was_already_handled = request .was_already_handled
412409 try :
413410 # Update the request in the API
@@ -419,14 +416,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
419416 self ._assumed_handled_count += 1
420417
421418 # Update the cache with the handled request
422- cache_key = unique_key_to_request_id ( request .unique_key )
419+ cache_key = request .unique_key
423420 self ._cache_request (
424421 cache_key ,
425422 processed_request ,
426423 hydrated_request = request ,
427424 )
428425 except Exception as exc :
429- logger .debug (f'Error marking request { request .id } as handled: { exc !s} ' )
426+ logger .debug (f'Error marking request { request .unique_key } as handled: { exc !s} ' )
430427 return None
431428 else :
432429 return processed_request
@@ -467,7 +464,7 @@ async def reclaim_request(
467464 self ._assumed_handled_count -= 1
468465
469466 # Update the cache
470- cache_key = unique_key_to_request_id ( request .unique_key )
467+ cache_key = request .unique_key
471468 self ._cache_request (
472469 cache_key ,
473470 processed_request ,
@@ -481,11 +478,11 @@ async def reclaim_request(
481478
482479 # Try to release the lock on the request
483480 try :
484- await self ._delete_request_lock (request .id , forefront = forefront )
481+ await self ._delete_request_lock (request .unique_key , forefront = forefront )
485482 except Exception as err :
486- logger .debug (f'Failed to delete request lock for request { request .id } ' , exc_info = err )
483+ logger .debug (f'Failed to delete request lock for request { request .unique_key } ' , exc_info = err )
487484 except Exception as exc :
488- logger .debug (f'Error reclaiming request { request .id } : { exc !s} ' )
485+ logger .debug (f'Error reclaiming request { request .unique_key } : { exc !s} ' )
489486 return None
490487 else :
491488 return processed_request
@@ -554,7 +551,7 @@ async def _get_or_hydrate_request(self, request_id: str) -> Request | None:
554551 return None
555552
556553 # Update cache with hydrated request
557- cache_key = unique_key_to_request_id ( request .unique_key )
554+ cache_key = request .unique_key
558555 self ._cache_request (
559556 cache_key ,
560557 ProcessedRequest (
@@ -592,7 +589,7 @@ async def _update_request(
592589 )
593590
594591 return ProcessedRequest .model_validate (
595- {'id' : request . id , ' uniqueKey' : request .unique_key } | response ,
592+ {'uniqueKey' : request .unique_key } | response ,
596593 )
597594
598595 async def _list_head (
@@ -653,28 +650,26 @@ async def _list_head(
653650 request = Request .model_validate (request_data )
654651
655652 # Skip requests without ID or unique key
656- if not request .id or not request . unique_key :
653+ if not request .unique_key :
657654 logger .debug (
658655 'Skipping request from queue head, missing ID or unique key' ,
659656 extra = {
660- 'id' : request .id ,
661657 'unique_key' : request .unique_key ,
662658 },
663659 )
664660 continue
665661
666662 # Cache the request
667663 self ._cache_request (
668- unique_key_to_request_id ( request .unique_key ) ,
664+ request .unique_key ,
669665 ProcessedRequest (
670- id = request .id ,
671666 unique_key = request .unique_key ,
672667 was_already_present = True ,
673668 was_already_handled = False ,
674669 ),
675670 hydrated_request = request ,
676671 )
677- self ._queue_head .append (request .id )
672+ self ._queue_head .append (request .unique_key )
678673
679674 for leftover_request_id in leftover_buffer :
680675 # After adding new requests to the forefront, any existing leftover locked request is kept in the end.
@@ -683,21 +678,21 @@ async def _list_head(
683678
684679 async def _prolong_request_lock (
685680 self ,
686- request_id : str ,
681+ request_unique_key : str ,
687682 * ,
688683 lock_secs : int ,
689684 ) -> ProlongRequestLockResponse :
690685 """Prolong the lock on a specific request in the queue.
691686
692687 Args:
693- request_id: The identifier of the request whose lock is to be prolonged.
688+ request_unique_key: Unique key of the request whose lock is to be prolonged.
694689 lock_secs: The additional amount of time, in seconds, that the request will remain locked.
695690
696691 Returns:
697692 A response containing the time at which the lock will expire.
698693 """
699- response = await self ._api_client .prolong_request_lock (
700- request_id = request_id ,
694+ response = await self ._api_client .prolong_request_lock_by_unique_key (
695+ request_unique_key = request_unique_key ,
701696 # All requests reaching this code were the tip of the queue at the moment when they were fetched,
702697 # so if their lock expires, they should be put back to the forefront as their handling is long overdue.
703698 forefront = True ,
@@ -710,37 +705,37 @@ async def _prolong_request_lock(
710705
711706 # Update the cache with the new lock expiration
712707 for cached_request in self ._requests_cache .values ():
713- if cached_request .id == request_id :
708+ if cached_request .unique_key == request_unique_key :
714709 cached_request .lock_expires_at = result .lock_expires_at
715710 break
716711
717712 return result
718713
719714 async def _delete_request_lock (
720715 self ,
721- request_id : str ,
716+ request_unique_key : str ,
722717 * ,
723718 forefront : bool = False ,
724719 ) -> None :
725720 """Delete the lock on a specific request in the queue.
726721
727722 Args:
728- request_id: ID of the request to delete the lock.
723+ request_unique_key: Unique key of the request to delete the lock.
729724 forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted.
730725 """
731726 try :
732- await self ._api_client .delete_request_lock (
733- request_id = request_id ,
727+ await self ._api_client .delete_request_lock_by_unique_key (
728+ request_unique_key = request_unique_key ,
734729 forefront = forefront ,
735730 )
736731
737732 # Update the cache to remove the lock
738733 for cached_request in self ._requests_cache .values ():
739- if cached_request .id == request_id :
734+ if cached_request .unique_key == request_unique_key :
740735 cached_request .lock_expires_at = None
741736 break
742737 except Exception as err :
743- logger .debug (f'Failed to delete request lock for request { request_id } ' , exc_info = err )
738+ logger .debug (f'Failed to delete request lock for request { request_unique_key } ' , exc_info = err )
744739
745740 def _cache_request (
746741 self ,
@@ -758,7 +753,7 @@ def _cache_request(
758753 hydrated_request: The hydrated request object, if available.
759754 """
760755 self ._requests_cache [cache_key ] = CachedRequest (
761- id = processed_request .id ,
756+ unique_key = processed_request .unique_key ,
762757 was_already_handled = processed_request .was_already_handled ,
763758 hydrated = hydrated_request ,
764759 lock_expires_at = None ,
0 commit comments