-
Notifications
You must be signed in to change notification settings - Fork 742
feat: Add opt-in per-domain request throttling for HTTP 429 backoff #1762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
64f7247
62ab3b8
1065e9b
138fd67
abdf51c
dd99d9d
497b782
902f885
e02dd68
2e3493c
ac18556
44b93bb
412df15
a249a23
4be7b2d
a3f5c7c
bebf2db
e5fe554
6dbe696
8f10c10
ed0dcc0
f2f47a5
546d7ac
10481c4
fac6367
8acbb43
0f1574a
b60c720
851d044
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| import asyncio | ||
|
|
||
| from crawlee.crawlers import BasicCrawler, BasicCrawlingContext | ||
| from crawlee.request_loaders import ThrottlingRequestManager | ||
| from crawlee.storages import RequestQueue | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| # Open the default request queue. | ||
| queue = await RequestQueue.open() | ||
|
|
||
| # Wrap it with ThrottlingRequestManager for specific domains. The throttler uses the | ||
| # same storage backend as the underlying queue. | ||
| throttler = ThrottlingRequestManager( | ||
| queue, | ||
| domains=['api.example.com', 'slow-site.org'], | ||
| request_manager_opener=RequestQueue.open, | ||
| ) | ||
|
|
||
| # Pass the throttler as the crawler's request manager. | ||
| crawler = BasicCrawler(request_manager=throttler) | ||
|
|
||
| @crawler.router.default_handler | ||
| async def handler(context: BasicCrawlingContext) -> None: | ||
| context.log.info(f'Processing {context.request.url}') | ||
|
|
||
| # Add requests. Listed domains are routed directly to their throttled sub-managers. | ||
| # Others go to the inner manager. | ||
| await throttler.add_requests( | ||
| [ | ||
| 'https://api.example.com/data', | ||
| 'https://api.example.com/users', | ||
| 'https://slow-site.org/page1', | ||
| 'https://fast-site.com/page1', # Not throttled | ||
| ] | ||
| ) | ||
|
|
||
| await crawler.run() | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| asyncio.run(main()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| --- | ||
| id: request-throttling | ||
| title: Request throttling | ||
| description: How to throttle requests per domain using the ThrottlingRequestManager. | ||
| --- | ||
|
|
||
| import ApiLink from '@site/src/components/ApiLink'; | ||
| import RunnableCodeBlock from '@site/src/components/RunnableCodeBlock'; | ||
|
|
||
| import ThrottlingExample from '!!raw-loader!roa-loader!./code_examples/request_throttling/throttling_example.py'; | ||
|
|
||
| When crawling websites that enforce rate limits (HTTP 429) or specify `crawl-delay` in their `robots.txt`, you need a way to throttle requests per domain without blocking unrelated domains. The <ApiLink to="class/ThrottlingRequestManager">`ThrottlingRequestManager`</ApiLink> provides exactly this. | ||
|
|
||
| ## Overview | ||
|
|
||
| The <ApiLink to="class/ThrottlingRequestManager">`ThrottlingRequestManager`</ApiLink> wraps a <ApiLink to="class/RequestManager">`RequestManager`</ApiLink> (typically a <ApiLink to="class/RequestQueue">`RequestQueue`</ApiLink>) and manages per-domain throttling. You specify which domains to throttle at initialization, and the manager automatically: | ||
|
|
||
| - **Routes requests** for listed domains into dedicated sub-managers at insertion time. | ||
| - **Enforces delays** from HTTP 429 responses (exponential backoff) and `robots.txt` crawl-delay directives. | ||
| - **Schedules fairly** by fetching from the domain that has been waiting the longest. | ||
| - **Sleeps intelligently** when all configured domains are throttled, instead of busy-waiting. | ||
|
|
||
| Requests for domains **not** in the configured list pass through to the main queue without any throttling. | ||
|
|
||
| ## Basic usage | ||
|
|
||
| To use request throttling, create a <ApiLink to="class/ThrottlingRequestManager">`ThrottlingRequestManager`</ApiLink> with the domains you want to throttle and pass it as the `request_manager` to your crawler: | ||
|
|
||
| <RunnableCodeBlock className="language-python" language="python"> | ||
| {ThrottlingExample} | ||
| </RunnableCodeBlock> | ||
|
|
||
| ## How it works | ||
|
|
||
| 1. **Insertion-time routing**: When you add requests via `add_request` or `add_requests`, each request is checked against the configured domain list. Matching requests go directly into a per-domain sub-manager; all others go to the inner manager. This eliminates request duplication entirely. | ||
|
|
||
| 2. **429 backoff**: When the crawler detects an HTTP 429 response, the `ThrottlingRequestManager` records an exponential backoff delay for that domain (starting at 2s, doubling up to 60s). If the response includes a `Retry-After` header, that value takes priority. | ||
|
|
||
| 3. **Crawl-delay**: If `robots.txt` specifies a `crawl-delay`, the manager enforces a minimum interval between requests to that domain. | ||
|
|
||
| 4. **Fair scheduling**: `fetch_next_request` sorts available sub-managers by how long each domain has been waiting, ensuring no domain is starved. | ||
|
|
||
| :::tip | ||
|
|
||
| The `ThrottlingRequestManager` is an opt-in feature. If you don't pass it to your crawler, requests are processed normally without any per-domain throttling. | ||
|
|
||
| ::: |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| """HTTP utility functions for Crawlee.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from datetime import datetime, timedelta, timezone | ||
| from email.utils import parsedate_to_datetime | ||
| from logging import getLogger | ||
|
|
||
| logger = getLogger(__name__) | ||
|
|
||
|
|
||
| def parse_retry_after_header(value: str | None) -> timedelta | None: | ||
| """Parse the Retry-After HTTP header value. | ||
|
|
||
| The header can contain either a number of seconds or an HTTP-date. | ||
| See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After | ||
|
|
||
| Args: | ||
| value: The raw Retry-After header value. | ||
|
|
||
| Returns: | ||
| A timedelta representing the delay, or None if the header is missing or unparsable. | ||
| """ | ||
| if not value: | ||
| return None | ||
|
|
||
| try: | ||
| return timedelta(seconds=int(value)) | ||
| except ValueError: | ||
| pass | ||
|
|
||
| try: | ||
| retry_date = parsedate_to_datetime(value) | ||
| # `parsedate_to_datetime` may return a naive datetime when the input has no timezone info. | ||
| # Treat such values as UTC — HTTP-dates are GMT per RFC 7231. | ||
| if retry_date.tzinfo is None: | ||
| retry_date = retry_date.replace(tzinfo=timezone.utc) | ||
|
|
||
| delay = retry_date - datetime.now(timezone.utc) | ||
| if delay.total_seconds() > 0: | ||
| return delay | ||
| logger.debug(f'Retry-After HTTP-date {value!r} is in the past; ignoring.') | ||
| except (ValueError, TypeError): | ||
| pass | ||
|
|
||
| return None | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
|
|
||
|
|
||
| class LoggerOnce: | ||
| """Emits each log message at most once, keyed by an explicit string. | ||
|
|
||
| Useful for diagnostic warnings that would otherwise spam the log when the same condition recurs (per-request | ||
| misconfiguration warnings, repeated fallback paths, etc.). Deduplication scope follows the lifetime of the | ||
| instance — a module-level instance gives process-wide dedup; an attribute on a class gives per-instance dedup. | ||
| """ | ||
|
|
||
| def __init__(self, logger: logging.Logger) -> None: | ||
| self._logger = logger | ||
| self._seen: set[str] = set() | ||
|
|
||
| def log(self, message: str, *, key: str, level: int = logging.INFO) -> None: | ||
| """Log `message` at `level` the first time `key` is seen on this instance; later calls are no-ops. | ||
|
|
||
| Args: | ||
| message: The message to log. | ||
| key: Deduplication key. Two calls with the same key emit at most once. | ||
| level: Standard `logging` level (e.g. `logging.WARNING`). Defaults to `logging.INFO`. | ||
| """ | ||
| if key in self._seen: | ||
| return | ||
| self._seen.add(key) | ||
| self._logger.log(level, message) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| # Inspiration: https://github.com/apify/crawlee/blob/v3.7.3/packages/basic-crawler/src/internals/basic-crawler.ts | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
|
|
@@ -13,6 +14,7 @@ | |
| from contextlib import AsyncExitStack, suppress | ||
| from datetime import timedelta | ||
| from functools import partial | ||
| from http import HTTPStatus | ||
| from io import StringIO | ||
| from pathlib import Path | ||
| from typing import TYPE_CHECKING, Any, Generic, Literal, ParamSpec, cast | ||
|
|
@@ -43,6 +45,8 @@ | |
| ) | ||
| from crawlee._utils.docs import docs_group | ||
| from crawlee._utils.file import atomic_write, export_csv_to_stream, export_json_to_stream | ||
| from crawlee._utils.http import parse_retry_after_header | ||
| from crawlee._utils.log import LoggerOnce | ||
| from crawlee._utils.recurring_task import RecurringTask | ||
| from crawlee._utils.robots import RobotsTxtFile | ||
| from crawlee._utils.urls import UNSUPPORTED_SCHEME_MESSAGE, convert_to_absolute_url, filter_url, is_url_absolute | ||
|
|
@@ -61,6 +65,7 @@ | |
| ) | ||
| from crawlee.events._types import Event, EventCrawlerStatusData | ||
| from crawlee.http_clients import ImpitHttpClient | ||
| from crawlee.request_loaders import ThrottlingRequestManager | ||
| from crawlee.router import Router | ||
| from crawlee.sessions import SessionPool | ||
| from crawlee.statistics import Statistics, StatisticsState | ||
|
|
@@ -491,18 +496,18 @@ async def persist_state_factory() -> KeyValueStore: | |
| run_task_function=self.__run_task_function, | ||
| ) | ||
| self._crawler_state_rec_task = RecurringTask( | ||
| func=self._crawler_state_task, delay=status_message_logging_interval | ||
| func=self._crawler_state_task, | ||
| delay=status_message_logging_interval, | ||
| ) | ||
| self._previous_crawler_state: TStatisticsState | None = None | ||
|
|
||
| # State flags | ||
| self._keep_alive = keep_alive | ||
| self._running = False | ||
| self._has_finished_before = False | ||
|
|
||
| self._failed = False | ||
|
|
||
| self._unexpected_stop = False | ||
| self._logger_once = LoggerOnce(self._logger) | ||
|
|
||
| @property | ||
| def log(self) -> logging.Logger: | ||
|
|
@@ -697,19 +702,22 @@ async def run( | |
|
|
||
| self._running = True | ||
|
|
||
| if self._respect_robots_txt_file and not isinstance(self._request_manager, ThrottlingRequestManager): | ||
| self._logger.warning( | ||
|
vdusek marked this conversation as resolved.
|
||
| 'The `respect_robots_txt_file` option is enabled, but the crawler is not using ' | ||
| '`ThrottlingRequestManager`. Crawl-delay directives from robots.txt will not be enforced. To enable ' | ||
| 'crawl-delay support, configure the crawler to use `ThrottlingRequestManager` as the request manager.' | ||
| ) | ||
|
|
||
| if self._has_finished_before: | ||
| await self._statistics.reset() | ||
|
|
||
| if self._use_session_pool: | ||
| await self._session_pool.reset_store() | ||
|
|
||
| request_manager = await self.get_request_manager() | ||
| if purge_request_queue and isinstance(request_manager, RequestQueue): | ||
| await request_manager.drop() | ||
| self._request_manager = await RequestQueue.open( | ||
| storage_client=self._service_locator.get_storage_client(), | ||
| configuration=self._service_locator.get_configuration(), | ||
| ) | ||
|
Comment on lines
-707
to
-712
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even in the state before the change, this was a code smell - shouldn't we add a "purge_on_start_hook"-like abstract method to This is aimed mostly at @vdusek and @Pijukatel. We definitely don't need to resolve it in this PR if you guys don't see an obvious way out.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood, happy to leave this for a follow-up. |
||
| if purge_request_queue: | ||
| request_manager = await self.get_request_manager() | ||
| await request_manager.purge() | ||
|
|
||
| if requests is not None: | ||
| await self.add_requests(requests) | ||
|
|
@@ -1535,16 +1543,51 @@ def _raise_for_error_status_code(self, status_code: int) -> None: | |
| if is_status_code_server_error(status_code) and not is_ignored_status: | ||
| raise HttpStatusCodeError('Error status code returned', status_code) | ||
|
|
||
| def _raise_for_session_blocked_status_code(self, session: Session | None, status_code: int) -> None: | ||
| def _raise_for_session_blocked_status_code( | ||
| self, | ||
| session: Session | None, | ||
| status_code: int, | ||
|
janbuchar marked this conversation as resolved.
|
||
| *, | ||
| request_url: str, | ||
| retry_after_header: str | None = None, | ||
| ) -> None: | ||
| """Raise an exception if the given status code indicates the session is blocked. | ||
|
|
||
| If the status code is 429 (Too Many Requests), the domain is recorded as rate-limited in the | ||
| `ThrottlingRequestManager` for per-domain backoff. | ||
|
|
||
| Args: | ||
| session: The session used for the request. If None, no check is performed. | ||
| session: The session used for the request. If `None`, no check is performed. | ||
| status_code: The HTTP status code to check. | ||
| request_url: The request URL, used for per-domain rate limit tracking. | ||
| retry_after_header: The value of the `Retry-After` response header, if present. | ||
|
|
||
| Raises: | ||
| SessionError: If the status code indicates the session is blocked. | ||
| """ | ||
| if status_code == HTTPStatus.TOO_MANY_REQUESTS: | ||
| if isinstance(self._request_manager, ThrottlingRequestManager): | ||
| retry_after = parse_retry_after_header(retry_after_header) | ||
| if not self._request_manager.record_domain_delay(request_url, retry_after=retry_after): | ||
| domain = (URL(request_url).host or '').lower() | ||
| if domain: | ||
| self._logger_once.log( | ||
| f'Received an HTTP 429 (Too Many Requests) response from domain "{domain}", but it is ' | ||
| f'not in the `ThrottlingRequestManager.domains` list. Per-domain backoff will not be ' | ||
| f'applied for this domain. Add it to `domains=` to enable throttling.', | ||
| key=f'unconfigured_throttle_domain:{domain}', | ||
| level=logging.WARNING, | ||
| ) | ||
| else: | ||
| self._logger_once.log( | ||
| 'Received an HTTP 429 (Too Many Requests) response, but the crawler is not using ' | ||
| '`ThrottlingRequestManager`. Per-domain backoff and `Retry-After` headers will not be honored. ' | ||
| 'To enable per-domain rate limiting, configure the crawler to use `ThrottlingRequestManager` ' | ||
| 'as the request manager.', | ||
| key='no_throttling_manager_on_429', | ||
| level=logging.WARNING, | ||
| ) | ||
|
|
||
| if session is not None and session.is_blocked_status_code( | ||
| status_code=status_code, | ||
| ignore_http_error_status_codes=self._ignore_http_error_status_codes, | ||
|
|
@@ -1575,7 +1618,15 @@ async def _is_allowed_based_on_robots_txt_file(self, url: str) -> bool: | |
| if not self._respect_robots_txt_file: | ||
| return True | ||
| robots_txt_file = await self._get_robots_txt_file_for_url(url) | ||
| return not robots_txt_file or robots_txt_file.is_allowed(url) | ||
| if not robots_txt_file: | ||
| return True | ||
|
|
||
| if isinstance(self._request_manager, ThrottlingRequestManager): | ||
| crawl_delay = robots_txt_file.get_crawl_delay() | ||
| if crawl_delay is not None: | ||
| self._request_manager.set_crawl_delay(url, crawl_delay) | ||
|
|
||
| return robots_txt_file.is_allowed(url) | ||
|
|
||
| async def _get_robots_txt_file_for_url(self, url: str) -> RobotsTxtFile | None: | ||
| """Get the RobotsTxtFile for a given URL. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.