-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathtest_request_queue.py
More file actions
154 lines (123 loc) · 7.15 KB
/
test_request_queue.py
File metadata and controls
154 lines (123 loc) · 7.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from __future__ import annotations
from typing import TYPE_CHECKING
import pytest
from integration.integration_test_utils import random_resource_name, random_string
from apify_client.clients.resource_clients.request_queue import unique_key_to_request_id
if TYPE_CHECKING:
from apify_client import ApifyClient, ApifyClientAsync
class TestRequestQueueSync:
def test_request_queue_lock(self, apify_client: ApifyClient) -> None:
created_queue = apify_client.request_queues().get_or_create(name=random_resource_name('queue'))
queue = apify_client.request_queue(created_queue['id'], client_key=random_string(10))
# Add requests and check if correct number of requests was locked
for i in range(15):
queue.add_request({'url': f'http://test-lock.com/{i}', 'uniqueKey': f'http://test-lock.com/{i}'})
locked_requests_list = queue.list_and_lock_head(limit=10, lock_secs=10)
locked_requests = locked_requests_list['items']
for locked_request in locked_requests:
assert locked_request['lockExpiresAt'] is not None
# Check if the delete request works
queue.delete_request_lock(locked_requests[1]['id'])
delete_lock_request = queue.get_request(locked_requests[1]['id'])
assert delete_lock_request is not None
assert delete_lock_request.get('lockExpiresAt') is None
queue.delete_request_lock(locked_requests[2]['id'], forefront=True)
delete_lock_request2 = queue.get_request(locked_requests[2]['id'])
assert delete_lock_request2 is not None
assert delete_lock_request2.get('lockExpiresAt') is None
# Check if the prolong request works
assert queue.prolong_request_lock(locked_requests[3]['id'], lock_secs=15)['lockExpiresAt'] is not None
queue.delete()
assert apify_client.request_queue(created_queue['id']).get() is None
def test_request_batch_operations(self, apify_client: ApifyClient) -> None:
created_queue = apify_client.request_queues().get_or_create(name=random_resource_name('queue'))
queue = apify_client.request_queue(created_queue['id'])
# Add requests to queue and check if they were added
requests_to_add = [
{'url': f'http://test-batch.com/{i}', 'uniqueKey': f'http://test-batch.com/{i}'} for i in range(25)
]
added_requests = queue.batch_add_requests(requests_to_add)
assert len(added_requests.get('processedRequests', [])) > 0
requests_in_queue = queue.list_requests()
assert len(requests_in_queue['items']) == len(added_requests['processedRequests'])
# Delete requests from queue and check if they were deleted
requests_to_delete = requests_in_queue['items'][:20]
delete_response = queue.batch_delete_requests(
[{'uniqueKey': req.get('uniqueKey')} for req in requests_to_delete]
)
requests_in_queue2 = queue.list_requests()
assert len(requests_in_queue2['items']) == 25 - len(delete_response['processedRequests'])
queue.delete()
class TestRequestQueueAsync:
async def test_request_queue_lock(self, apify_client_async: ApifyClientAsync) -> None:
created_queue = await apify_client_async.request_queues().get_or_create(name=random_resource_name('queue'))
queue = apify_client_async.request_queue(created_queue['id'], client_key=random_string(10))
# Add requests and check if correct number of requests was locked
for i in range(15):
await queue.add_request({'url': f'http://test-lock.com/{i}', 'uniqueKey': f'http://test-lock.com/{i}'})
locked_requests_list = await queue.list_and_lock_head(limit=10, lock_secs=10)
locked_requests = locked_requests_list['items']
for locked_request in locked_requests:
assert locked_request['lockExpiresAt'] is not None
# Check if the delete request works
await queue.delete_request_lock(locked_requests[1]['id'])
delete_lock_request = await queue.get_request(locked_requests[1]['id'])
assert delete_lock_request is not None
assert delete_lock_request.get('lockExpiresAt') is None
await queue.delete_request_lock(locked_requests[2]['id'], forefront=True)
delete_lock_request2 = await queue.get_request(locked_requests[2]['id'])
assert delete_lock_request2 is not None
assert delete_lock_request2.get('lockExpiresAt') is None
# Check if the prolong request works
prolonged_request = await queue.prolong_request_lock(locked_requests[3]['id'], lock_secs=15)
assert prolonged_request['lockExpiresAt'] is not None
await queue.delete()
assert await apify_client_async.request_queue(created_queue['id']).get() is None
async def test_request_batch_operations(self, apify_client_async: ApifyClientAsync) -> None:
created_queue = await apify_client_async.request_queues().get_or_create(name=random_resource_name('queue'))
queue = apify_client_async.request_queue(created_queue['id'])
# Add requests to queue and check if they were added
requests_to_add = [
{'url': f'http://test-batch.com/{i}', 'uniqueKey': f'http://test-batch.com/{i}'} for i in range(25)
]
added_requests = await queue.batch_add_requests(requests_to_add)
assert len(added_requests.get('processedRequests', [])) > 0
requests_in_queue = await queue.list_requests()
assert len(requests_in_queue['items']) == len(added_requests['processedRequests'])
# Delete requests from queue and check if they were deleted
requests_to_delete = requests_in_queue['items'][:20]
delete_response = await queue.batch_delete_requests(
[{'uniqueKey': req.get('uniqueKey')} for req in requests_to_delete]
)
requests_in_queue2 = await queue.list_requests()
assert len(requests_in_queue2['items']) == 25 - len(delete_response['processedRequests'])
await queue.delete()
def test_unique_key_to_request_id_length() -> None:
unique_key = 'exampleKey123'
request_id = unique_key_to_request_id(unique_key, request_id_length=15)
assert len(request_id) == 15, 'Request ID should have the correct length.'
def test_unique_key_to_request_id_consistency() -> None:
unique_key = 'consistentKey'
request_id_1 = unique_key_to_request_id(unique_key)
request_id_2 = unique_key_to_request_id(unique_key)
assert request_id_1 == request_id_2, 'The same unique key should generate consistent request IDs.'
@pytest.mark.parametrize(
('unique_key', 'expected_request_id'),
[
('abc', 'ungWv48BzpBQUDe'),
('uniqueKey', 'xiWPs083cree7mH'),
('', '47DEQpj8HBSaTIm'),
('测试中文', 'lKPdJkdvw8MXEUp'),
('test+/=', 'XZRQjhoG0yjfnYD'),
],
ids=[
'basic_abc',
'keyword_uniqueKey',
'empty_string',
'non_ascii_characters',
'url_unsafe_characters',
],
)
def test_unique_key_to_request_id_matches_known_values(unique_key: str, expected_request_id: str) -> None:
request_id = unique_key_to_request_id(unique_key)
assert request_id == expected_request_id, f'Unique key "{unique_key}" should produce the expected request ID.'