Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/guides/code_examples/http_crawlers/selectolax_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ async def final_step(
yield SelectolaxLexborContext.from_parsed_http_crawling_context(context)

# Build context pipeline: HTTP request -> parsing -> custom context.
kwargs['_context_pipeline'] = (
self._create_static_content_crawler_pipeline().compose(final_step)
)
kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline(
**kwargs
).compose(final_step)
super().__init__(
parser=SelectolaxLexborParser(),
**kwargs,
Expand Down
88 changes: 83 additions & 5 deletions src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

import asyncio
import logging
import sys
from abc import ABC
from datetime import timedelta
from hashlib import sha256
from typing import TYPE_CHECKING, Any, Generic

if sys.version_info >= (3, 14):
from compression import zstd as _compressor
else:
import zlib as _compressor

from more_itertools import partition
from pydantic import ValidationError
from typing_extensions import NotRequired, TypeVar
Expand All @@ -19,6 +26,7 @@
from crawlee.statistics import StatisticsState

from ._http_crawling_context import HttpCrawlingContext, ParsedHttpCrawlingContext, TParseResult, TSelectResult
from ._types import CachedHttpResponse

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Awaitable, Callable, Iterator
Expand All @@ -27,6 +35,7 @@

from crawlee import RequestTransformAction
from crawlee._types import BasicCrawlingContext, EnqueueLinksKwargs, ExtractLinksFunction
from crawlee.storages import KeyValueStore

from ._abstract_http_parser import AbstractHttpParser

Expand All @@ -46,6 +55,9 @@ class HttpCrawlerOptions(
navigation_timeout: NotRequired[timedelta | None]
"""Timeout for the HTTP request."""

response_cache: NotRequired[KeyValueStore | None]
"""Key-value store for caching HTTP responses."""


@docs_group('Crawlers')
class AbstractHttpCrawler(
Expand All @@ -72,12 +84,14 @@ def __init__(
*,
parser: AbstractHttpParser[TParseResult, TSelectResult],
navigation_timeout: timedelta | None = None,
response_cache: KeyValueStore | None = None,
**kwargs: Unpack[BasicCrawlerOptions[TCrawlingContext, StatisticsState]],
) -> None:
self._parser = parser
self._navigation_timeout = navigation_timeout or timedelta(minutes=1)
self._pre_navigation_hooks: list[Callable[[BasicCrawlingContext], Awaitable[None]]] = []
self._shared_navigation_timeouts: dict[int, SharedTimeout] = {}
self._response_cache = response_cache

if '_context_pipeline' not in kwargs:
raise ValueError(
Expand Down Expand Up @@ -106,20 +120,33 @@ def __init__(
parser: AbstractHttpParser[TParseResult, TSelectResult] = static_parser,
**kwargs: Unpack[BasicCrawlerOptions[ParsedHttpCrawlingContext[TParseResult]]],
) -> None:
kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline()
kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline(**kwargs)
super().__init__(
parser=parser,
**kwargs,
)

return _ParsedHttpCrawler

def _create_static_content_crawler_pipeline(self) -> ContextPipeline[ParsedHttpCrawlingContext[TParseResult]]:
def _create_static_content_crawler_pipeline(
self,
response_cache: KeyValueStore | None = None,
**_kwargs: BasicCrawlerOptions[TCrawlingContext, StatisticsState],
) -> ContextPipeline[ParsedHttpCrawlingContext[TParseResult]]:
"""Create static content crawler context pipeline with expected pipeline steps."""
pipeline = ContextPipeline().compose(self._execute_pre_navigation_hooks)

if response_cache:
return (
pipeline.compose_with_skip(self._try_load_from_cache, skip_to='parse')
.compose(self._make_http_request)
.compose(self._handle_status_code_response)
.compose(self._save_response_to_cache)
.compose(self._parse_http_response, name='parse')
.compose(self._handle_blocked_request_by_content)
)
return (
ContextPipeline()
.compose(self._execute_pre_navigation_hooks)
.compose(self._make_http_request)
pipeline.compose(self._make_http_request)
.compose(self._handle_status_code_response)
.compose(self._parse_http_response)
.compose(self._handle_blocked_request_by_content)
Expand Down Expand Up @@ -308,3 +335,54 @@ def pre_navigation_hook(self, hook: Callable[[BasicCrawlingContext], Awaitable[N
hook: A coroutine function to be called before each navigation.
"""
self._pre_navigation_hooks.append(hook)

async def _try_load_from_cache(
self, context: BasicCrawlingContext
) -> AsyncGenerator[HttpCrawlingContext | None, None]:
"""Try to load a cached HTTP response. Yields HttpCrawlingContext if found, None otherwise."""
if not self._response_cache:
raise RuntimeError('Response cache is not configured.')

key = self._get_cache_key(context.request.unique_key)
raw = await self._response_cache.get_value(key)

if raw is None:
yield None
return

compressed: bytes = raw
data = _compressor.decompress(compressed)
cached = CachedHttpResponse.model_validate_json(data)

context.request.loaded_url = cached.loaded_url or context.request.url
context.request.state = RequestState.AFTER_NAV

yield HttpCrawlingContext.from_basic_crawling_context(context=context, http_response=cached)

async def _save_response_to_cache(self, context: HttpCrawlingContext) -> AsyncGenerator[HttpCrawlingContext, None]:
"""Save the HTTP response to cache after a successful request."""
if not self._response_cache:
raise RuntimeError('Response cache is not configured.')

body = await context.http_response.read()

cached = CachedHttpResponse(
http_version=context.http_response.http_version,
status_code=context.http_response.status_code,
headers=context.http_response.headers,
body=body,
loaded_url=context.request.loaded_url,
)

compressed = _compressor.compress(cached.model_dump_json().encode())

key = self._get_cache_key(context.request.unique_key)
await self._response_cache.set_value(key, compressed)

yield context

@staticmethod
def _get_cache_key(unique_key: str) -> str:
"""Generate a deterministic cache key for a unique_key."""
hashed_key = sha256(unique_key.encode('utf-8')).hexdigest()
return f'response_{hashed_key[:15]}'
26 changes: 26 additions & 0 deletions src/crawlee/crawlers/_abstract_http/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from pydantic import BaseModel

from crawlee._types import HttpHeaders

if TYPE_CHECKING:
from collections.abc import AsyncIterator


class CachedHttpResponse(BaseModel):
"""An `HttpResponse` implementation that serves pre-stored response data from cache."""

http_version: str
status_code: int
headers: HttpHeaders
body: bytes
loaded_url: str | None = None

async def read(self) -> bytes:
return self.body

async def read_stream(self) -> AsyncIterator[bytes]:
yield self.body
65 changes: 62 additions & 3 deletions src/crawlee/crawlers/_basic/_context_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ def __init__(
]
| None = None,
_parent: ContextPipeline[BasicCrawlingContext] | None = None,
name: str | None = None,
skip_to: str | None = None,
) -> None:
self._middleware = _middleware
self._parent = _parent
self.name = name
self.skip_to = skip_to

def _middleware_chain(self) -> Generator[ContextPipeline[Any], None, None]:
yield self
Expand All @@ -91,14 +95,24 @@ async def __call__(
chain = list(self._middleware_chain())
cleanup_stack: list[_Middleware[Any]] = []
final_consumer_exception: Exception | None = None
skip_to_middleware: str | None = None

try:
for member in reversed(chain):
if skip_to_middleware is not None:
if member.name == skip_to_middleware:
skip_to_middleware = None
else:
continue

if member._middleware: # noqa: SLF001
middleware_instance = _Middleware(middleware=member._middleware, input_context=crawling_context) # noqa: SLF001
middleware_instance = _Middleware(
middleware=member._middleware, # noqa: SLF001
input_context=crawling_context,
)
try:
result = await middleware_instance.action()
except SessionError: # Session errors get special treatment
except SessionError:
raise
except StopAsyncIteration as e:
raise RuntimeError('The middleware did not yield') from e
Expand All @@ -107,12 +121,26 @@ async def __call__(
except Exception as e:
raise ContextPipelineInitializationError(e, crawling_context) from e

if result is None:
if member.skip_to is None:
raise RuntimeError(
'Middleware yielded None but no skip_to target is configured. '
'Use compose_with_skip() for conditional middleware.'
)
# Keep the existing context for next middleware
result = crawling_context
elif member.skip_to:
skip_to_middleware = member.skip_to

crawling_context = result
cleanup_stack.append(middleware_instance)

if skip_to_middleware is not None:
raise RuntimeError(f'Skip target middleware "{skip_to_middleware}" not found in pipeline')

try:
await final_context_consumer(cast('TCrawlingContext', crawling_context))
except SessionError as e: # Session errors get special treatment
except SessionError as e:
final_consumer_exception = e
raise
except Exception as e:
Expand All @@ -128,6 +156,7 @@ def compose(
[TCrawlingContext],
AsyncGenerator[TMiddlewareCrawlingContext, None],
],
name: str | None = None,
) -> ContextPipeline[TMiddlewareCrawlingContext]:
"""Add a middleware to the pipeline.
Expand All @@ -143,4 +172,34 @@ def compose(
middleware,
),
_parent=cast('ContextPipeline[BasicCrawlingContext]', self),
name=name,
)

def compose_with_skip(
self,
middleware: Callable[
[TCrawlingContext],
AsyncGenerator[TMiddlewareCrawlingContext | None, None],
],
skip_to: str,
) -> ContextPipeline[TMiddlewareCrawlingContext]:
"""Add a conditional middleware that can skip to a named target middleware.
If middleware yields a context, that context is used and pipeline skips to the target middleware.
If middleware yields None, pipeline continues normally without changing context.
Args:
middleware: Middleware that yields context (activates skip) or None (continue normally).
skip_to: Name of the target middleware to skip to (must exist in pipeline).
Returns:
The extended pipeline instance, providing a fluent interface.
"""
return ContextPipeline[TMiddlewareCrawlingContext](
_middleware=cast(
'Callable[[BasicCrawlingContext], AsyncGenerator[TMiddlewareCrawlingContext, Exception | None]]',
middleware,
),
_parent=cast('ContextPipeline[BasicCrawlingContext]', self),
skip_to=skip_to,
)
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def final_step(
"""Enhance `ParsedHttpCrawlingContext[BeautifulSoup]` with `soup` property."""
yield BeautifulSoupCrawlingContext.from_parsed_http_crawling_context(context)

kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline().compose(final_step)
kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline(**kwargs).compose(final_step)

super().__init__(
parser=BeautifulSoupParser(parser=parser),
Expand Down
2 changes: 1 addition & 1 deletion src/crawlee/crawlers/_http/_http_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(
Args:
kwargs: Additional keyword arguments to pass to the underlying `AbstractHttpCrawler`.
"""
kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline()
kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline(**kwargs)
super().__init__(
parser=NoParser(),
**kwargs,
Expand Down
2 changes: 1 addition & 1 deletion src/crawlee/crawlers/_parsel/_parsel_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def final_step(
"""Enhance `ParsedHttpCrawlingContext[Selector]` with a `selector` property."""
yield ParselCrawlingContext.from_parsed_http_crawling_context(context)

kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline().compose(final_step)
kwargs['_context_pipeline'] = self._create_static_content_crawler_pipeline(**kwargs).compose(final_step)
super().__init__(
parser=ParselParser(),
**kwargs,
Expand Down
Loading
Loading