Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
64f7247
fix: add per-domain RequestThrottler for 429 backoff (#1437)
MrAliHasan Feb 20, 2026
62ab3b8
refactor: replace RequestThrottler with ThrottlingRequestManager
MrAliHasan Feb 23, 2026
1065e9b
refactor: reimplement `ThrottlingRequestManager` with per-domain sub-…
MrAliHasan Feb 25, 2026
138fd67
test: fix typing and linting checks in ThrottlingRequestManager tests
MrAliHasan Feb 26, 2026
abdf51c
feat: Add explicit domain routing and management to the request throt…
MrAliHasan Mar 2, 2026
dd99d9d
feat: Implement recreate_purged for ThrottlingRequestManager and refa…
MrAliHasan Mar 4, 2026
497b782
deps: Pin ty to version 0.0.18 and update uv.lock to include Python 3…
MrAliHasan Mar 5, 2026
902f885
fix: Ensure ThrottlingRequestManager.add_request explicitly handles N…
MrAliHasan Mar 6, 2026
e02dd68
refactor: Address reviewer feedback on ThrottlingRequestManager and r…
MrAliHasan Mar 11, 2026
2e3493c
fix: Restore uv.lock from upstream master
MrAliHasan Mar 13, 2026
ac18556
fix: Add type narrowing for add_request to satisfy ty type checker
MrAliHasan Mar 13, 2026
44b93bb
fix: Change add_request return type to ProcessedRequest | None
MrAliHasan Mar 13, 2026
412df15
refactor: Address review feedback — proper typing, recreate_purged gu…
MrAliHasan Mar 26, 2026
a249a23
refactor: Add request_manager_opener callback, move record_success in…
MrAliHasan Apr 3, 2026
4be7b2d
Merge branch 'master' into fix/request-throttler-429-backoff
vdusek May 5, 2026
a3f5c7c
Make `ThrottlingRequestManager` generic over inner manager type
vdusek May 5, 2026
bebf2db
Cache crawl-delay configuration per origin in `BasicCrawler`
vdusek May 5, 2026
e5fe554
Reflow docstrings to full 120-char width and fix backticks
vdusek May 5, 2026
6dbe696
Simplify
vdusek May 6, 2026
8f10c10
Fix flaky test
vdusek May 6, 2026
ed0dcc0
warn when capping Retry-After or backoff at max_delay
vdusek May 6, 2026
f2f47a5
make ThrottlingRequestManager sub-managers honor the generic inner type
vdusek May 6, 2026
546d7ac
wake ThrottlingRequestManager fetch loop when new work arrives during…
vdusek May 6, 2026
10481c4
Match throttled domains case-insensitively
vdusek May 6, 2026
fac6367
address review nits on ThrottlingRequestManager
vdusek May 6, 2026
8acbb43
Polishment & ordering
vdusek May 6, 2026
0f1574a
Final
vdusek May 6, 2026
b60c720
Address review comments
vdusek May 6, 2026
851d044
Add warnings
vdusek May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/guides/code_examples/request_throttling/throttling_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio

from crawlee.crawlers import BasicCrawler, BasicCrawlingContext
from crawlee.request_loaders import ThrottlingRequestManager
from crawlee.storages import RequestQueue


async def main() -> None:
# Open the default request queue.
queue = await RequestQueue.open()

# Wrap it with ThrottlingRequestManager for specific domains. The throttler uses the
# same storage backend as the underlying queue.
throttler = ThrottlingRequestManager(
queue,
domains=['api.example.com', 'slow-site.org'],
request_manager_opener=RequestQueue.open,
)

# Pass the throttler as the crawler's request manager.
crawler = BasicCrawler(request_manager=throttler)

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url}')

# Add requests. Listed domains are routed directly to their throttled sub-managers.
# Others go to the inner manager.
await throttler.add_requests(
[
'https://api.example.com/data',
'https://api.example.com/users',
'https://slow-site.org/page1',
'https://fast-site.com/page1', # Not throttled
]
)

await crawler.run()


if __name__ == '__main__':
asyncio.run(main())
47 changes: 47 additions & 0 deletions docs/guides/request_throttling.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---
id: request-throttling
title: Request throttling
description: How to throttle requests per domain using the ThrottlingRequestManager.
---

import ApiLink from '@site/src/components/ApiLink';
import RunnableCodeBlock from '@site/src/components/RunnableCodeBlock';

import ThrottlingExample from '!!raw-loader!roa-loader!./code_examples/request_throttling/throttling_example.py';

When crawling websites that enforce rate limits (HTTP 429) or specify `crawl-delay` in their `robots.txt`, you need a way to throttle requests per domain without blocking unrelated domains. The <ApiLink to="class/ThrottlingRequestManager">`ThrottlingRequestManager`</ApiLink> provides exactly this.

## Overview

The <ApiLink to="class/ThrottlingRequestManager">`ThrottlingRequestManager`</ApiLink> wraps a <ApiLink to="class/RequestManager">`RequestManager`</ApiLink> (typically a <ApiLink to="class/RequestQueue">`RequestQueue`</ApiLink>) and manages per-domain throttling. You specify which domains to throttle at initialization, and the manager automatically:

- **Routes requests** for listed domains into dedicated sub-managers at insertion time.
- **Enforces delays** from HTTP 429 responses (exponential backoff) and `robots.txt` crawl-delay directives.
- **Schedules fairly** by fetching from the domain that has been waiting the longest.
- **Sleeps intelligently** when all configured domains are throttled, instead of busy-waiting.

Requests for domains **not** in the configured list pass through to the main queue without any throttling.

## Basic usage

To use request throttling, create a <ApiLink to="class/ThrottlingRequestManager">`ThrottlingRequestManager`</ApiLink> with the domains you want to throttle and pass it as the `request_manager` to your crawler:

<RunnableCodeBlock className="language-python" language="python">
{ThrottlingExample}
</RunnableCodeBlock>

## How it works

1. **Insertion-time routing**: When you add requests via `add_request` or `add_requests`, each request is checked against the configured domain list. Matching requests go directly into a per-domain sub-manager; all others go to the inner manager. This eliminates request duplication entirely.

2. **429 backoff**: When the crawler detects an HTTP 429 response, the `ThrottlingRequestManager` records an exponential backoff delay for that domain (starting at 2s, doubling up to 60s). If the response includes a `Retry-After` header, that value takes priority.

3. **Crawl-delay**: If `robots.txt` specifies a `crawl-delay`, the manager enforces a minimum interval between requests to that domain.

4. **Fair scheduling**: `fetch_next_request` sorts available sub-managers by how long each domain has been waiting, ensuring no domain is starved.

:::tip

The `ThrottlingRequestManager` is an opt-in feature. If you don't pass it to your crawler, requests are processed normally without any per-domain throttling.

:::
17 changes: 7 additions & 10 deletions src/crawlee/_autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

from __future__ import annotations

import functools
from bisect import insort
from datetime import datetime, timedelta, timezone
from logging import getLogger
from logging import WARNING, getLogger
from typing import TYPE_CHECKING, TypeVar, cast

from crawlee import service_locator
from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Ratio, Snapshot
from crawlee._utils.byte_size import ByteSize
from crawlee._utils.context import ensure_context
from crawlee._utils.docs import docs_group
from crawlee._utils.log import LoggerOnce
from crawlee._utils.recurring_task import RecurringTask
from crawlee._utils.system import MemoryInfo, MemoryUsageInfo, get_memory_info
from crawlee.events._types import Event, EventSystemInfoData
Expand All @@ -23,16 +23,11 @@
from crawlee.configuration import Configuration

logger = getLogger(__name__)
logger_once = LoggerOnce(logger)

T = TypeVar('T', bound=Snapshot)


@functools.lru_cache
def _warn_once(warning_message: str) -> None:
"""Log a warning message only once."""
logger.warning(warning_message)


class SortedSnapshotList(list[T]):
"""A list that maintains sorted order by `created_at` attribute for snapshot objects."""

Expand Down Expand Up @@ -303,9 +298,11 @@ async def _snapshot_memory(self, event_data: EventSystemInfoData) -> None:
# This is just hypothetical case, that will most likely not happen in practice.
# `LocalEventManager` should always provide `MemoryInfo` in the event data.
# When running on Apify, `self._max_memory_size` is always `ByteSize`, not `Ratio`.
_warn_once(
logger_once.log(
'It is recommended that a custom implementation of `LocalEventManager` emits `SYSTEM_INFO` events '
'with `MemoryInfo` and not just `MemoryUsageInfo`.'
'with `MemoryInfo` and not just `MemoryUsageInfo`.',
key='memory_usage_info_event',
level=WARNING,
)
max_memory_size = get_memory_info().total_size * ratio.value
system_wide_used_size = None
Expand Down
46 changes: 46 additions & 0 deletions src/crawlee/_utils/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""HTTP utility functions for Crawlee."""

from __future__ import annotations

from datetime import datetime, timedelta, timezone
from email.utils import parsedate_to_datetime
from logging import getLogger

logger = getLogger(__name__)


def parse_retry_after_header(value: str | None) -> timedelta | None:
"""Parse the Retry-After HTTP header value.

The header can contain either a number of seconds or an HTTP-date.
See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After

Args:
value: The raw Retry-After header value.

Returns:
A timedelta representing the delay, or None if the header is missing or unparsable.
"""
if not value:
return None

try:
return timedelta(seconds=int(value))
except ValueError:
pass

try:
retry_date = parsedate_to_datetime(value)
Comment thread
vdusek marked this conversation as resolved.
# `parsedate_to_datetime` may return a naive datetime when the input has no timezone info.
# Treat such values as UTC — HTTP-dates are GMT per RFC 7231.
if retry_date.tzinfo is None:
retry_date = retry_date.replace(tzinfo=timezone.utc)

delay = retry_date - datetime.now(timezone.utc)
if delay.total_seconds() > 0:
return delay
logger.debug(f'Retry-After HTTP-date {value!r} is in the past; ignoring.')
except (ValueError, TypeError):
pass

return None
29 changes: 29 additions & 0 deletions src/crawlee/_utils/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations

import logging


class LoggerOnce:
"""Emits each log message at most once, keyed by an explicit string.

Useful for diagnostic warnings that would otherwise spam the log when the same condition recurs (per-request
misconfiguration warnings, repeated fallback paths, etc.). Deduplication scope follows the lifetime of the
instance — a module-level instance gives process-wide dedup; an attribute on a class gives per-instance dedup.
"""

def __init__(self, logger: logging.Logger) -> None:
self._logger = logger
self._seen: set[str] = set()

def log(self, message: str, *, key: str, level: int = logging.INFO) -> None:
"""Log `message` at `level` the first time `key` is seen on this instance; later calls are no-ops.

Args:
message: The message to log.
key: Deduplication key. Two calls with the same key emit at most once.
level: Standard `logging` level (e.g. `logging.WARNING`). Defaults to `logging.INFO`.
"""
if key in self._seen:
return
self._seen.add(key)
self._logger.log(level, message)
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,12 @@ async def _handle_status_code_response(
"""
status_code = context.http_response.status_code
if self._retry_on_blocked:
self._raise_for_session_blocked_status_code(context.session, status_code)
self._raise_for_session_blocked_status_code(
context.session,
status_code,
request_url=context.request.url,
retry_after_header=context.http_response.headers.get('retry-after'),
)
self._raise_for_error_status_code(status_code)
yield context

Expand Down
77 changes: 64 additions & 13 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Inspiration: https://github.com/apify/crawlee/blob/v3.7.3/packages/basic-crawler/src/internals/basic-crawler.ts

from __future__ import annotations

import asyncio
Expand All @@ -13,6 +14,7 @@
from contextlib import AsyncExitStack, suppress
from datetime import timedelta
from functools import partial
from http import HTTPStatus
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, Generic, Literal, ParamSpec, cast
Expand Down Expand Up @@ -43,6 +45,8 @@
)
from crawlee._utils.docs import docs_group
from crawlee._utils.file import atomic_write, export_csv_to_stream, export_json_to_stream
from crawlee._utils.http import parse_retry_after_header
from crawlee._utils.log import LoggerOnce
from crawlee._utils.recurring_task import RecurringTask
from crawlee._utils.robots import RobotsTxtFile
from crawlee._utils.urls import UNSUPPORTED_SCHEME_MESSAGE, convert_to_absolute_url, filter_url, is_url_absolute
Expand All @@ -61,6 +65,7 @@
)
from crawlee.events._types import Event, EventCrawlerStatusData
from crawlee.http_clients import ImpitHttpClient
from crawlee.request_loaders import ThrottlingRequestManager
from crawlee.router import Router
from crawlee.sessions import SessionPool
from crawlee.statistics import Statistics, StatisticsState
Expand Down Expand Up @@ -491,18 +496,18 @@ async def persist_state_factory() -> KeyValueStore:
run_task_function=self.__run_task_function,
)
self._crawler_state_rec_task = RecurringTask(
func=self._crawler_state_task, delay=status_message_logging_interval
func=self._crawler_state_task,
delay=status_message_logging_interval,
)
self._previous_crawler_state: TStatisticsState | None = None

# State flags
self._keep_alive = keep_alive
self._running = False
self._has_finished_before = False

self._failed = False

self._unexpected_stop = False
self._logger_once = LoggerOnce(self._logger)

@property
def log(self) -> logging.Logger:
Expand Down Expand Up @@ -697,19 +702,22 @@ async def run(

self._running = True

if self._respect_robots_txt_file and not isinstance(self._request_manager, ThrottlingRequestManager):
self._logger.warning(
Comment thread
vdusek marked this conversation as resolved.
'The `respect_robots_txt_file` option is enabled, but the crawler is not using '
'`ThrottlingRequestManager`. Crawl-delay directives from robots.txt will not be enforced. To enable '
'crawl-delay support, configure the crawler to use `ThrottlingRequestManager` as the request manager.'
)

if self._has_finished_before:
await self._statistics.reset()

if self._use_session_pool:
await self._session_pool.reset_store()

request_manager = await self.get_request_manager()
if purge_request_queue and isinstance(request_manager, RequestQueue):
await request_manager.drop()
self._request_manager = await RequestQueue.open(
storage_client=self._service_locator.get_storage_client(),
configuration=self._service_locator.get_configuration(),
)
Comment on lines -707 to -712
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even in the state before the change, this was a code smell - shouldn't we add a "purge_on_start_hook"-like abstract method to RequestManager and implement it in RequestQueue? Or should we just call .drop on request manager?

This is aimed mostly at @vdusek and @Pijukatel. We definitely don't need to resolve it in this PR if you guys don't see an obvious way out.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, happy to leave this for a follow-up.

if purge_request_queue:
request_manager = await self.get_request_manager()
await request_manager.purge()

if requests is not None:
await self.add_requests(requests)
Expand Down Expand Up @@ -1535,16 +1543,51 @@ def _raise_for_error_status_code(self, status_code: int) -> None:
if is_status_code_server_error(status_code) and not is_ignored_status:
raise HttpStatusCodeError('Error status code returned', status_code)

def _raise_for_session_blocked_status_code(self, session: Session | None, status_code: int) -> None:
def _raise_for_session_blocked_status_code(
self,
session: Session | None,
status_code: int,
Comment thread
janbuchar marked this conversation as resolved.
*,
request_url: str,
retry_after_header: str | None = None,
) -> None:
"""Raise an exception if the given status code indicates the session is blocked.

If the status code is 429 (Too Many Requests), the domain is recorded as rate-limited in the
`ThrottlingRequestManager` for per-domain backoff.

Args:
session: The session used for the request. If None, no check is performed.
session: The session used for the request. If `None`, no check is performed.
status_code: The HTTP status code to check.
request_url: The request URL, used for per-domain rate limit tracking.
retry_after_header: The value of the `Retry-After` response header, if present.

Raises:
SessionError: If the status code indicates the session is blocked.
"""
if status_code == HTTPStatus.TOO_MANY_REQUESTS:
if isinstance(self._request_manager, ThrottlingRequestManager):
retry_after = parse_retry_after_header(retry_after_header)
if not self._request_manager.record_domain_delay(request_url, retry_after=retry_after):
domain = (URL(request_url).host or '').lower()
if domain:
self._logger_once.log(
f'Received an HTTP 429 (Too Many Requests) response from domain "{domain}", but it is '
f'not in the `ThrottlingRequestManager.domains` list. Per-domain backoff will not be '
f'applied for this domain. Add it to `domains=` to enable throttling.',
key=f'unconfigured_throttle_domain:{domain}',
level=logging.WARNING,
)
else:
self._logger_once.log(
'Received an HTTP 429 (Too Many Requests) response, but the crawler is not using '
'`ThrottlingRequestManager`. Per-domain backoff and `Retry-After` headers will not be honored. '
'To enable per-domain rate limiting, configure the crawler to use `ThrottlingRequestManager` '
'as the request manager.',
key='no_throttling_manager_on_429',
level=logging.WARNING,
)

if session is not None and session.is_blocked_status_code(
status_code=status_code,
ignore_http_error_status_codes=self._ignore_http_error_status_codes,
Expand Down Expand Up @@ -1575,7 +1618,15 @@ async def _is_allowed_based_on_robots_txt_file(self, url: str) -> bool:
if not self._respect_robots_txt_file:
return True
robots_txt_file = await self._get_robots_txt_file_for_url(url)
return not robots_txt_file or robots_txt_file.is_allowed(url)
if not robots_txt_file:
return True

if isinstance(self._request_manager, ThrottlingRequestManager):
crawl_delay = robots_txt_file.get_crawl_delay()
if crawl_delay is not None:
self._request_manager.set_crawl_delay(url, crawl_delay)

return robots_txt_file.is_allowed(url)

async def _get_robots_txt_file_for_url(self, url: str) -> RobotsTxtFile | None:
"""Get the RobotsTxtFile for a given URL.
Expand Down
Loading
Loading