diff --git a/abx_plugins/plugins/parse_sitemap_urls/README.md b/abx_plugins/plugins/parse_sitemap_urls/README.md new file mode 100644 index 0000000..e74444a --- /dev/null +++ b/abx_plugins/plugins/parse_sitemap_urls/README.md @@ -0,0 +1,232 @@ +# parse_sitemap_urls + +Discover URLs from `sitemap.xml` (urlset and sitemapindex documents, gzipped +sitemaps, `robots.txt` `Sitemap:` directives, and the Google image / video / +news extensions) and emit one `Snapshot` JSONL record per discovered URL. + +This plugin closes the gap that motivated [ArchiveBox#191][issue-191]: a +single seed URL can expand into a full-site crawl without an external +crawler in the loop. The host (ArchiveBox / `abx-dl`) keeps ownership of +the crawl frontier, depth cap, and dedup; this hook only feeds it URLs. + +[sitemap index]: https://www.sitemaps.org/protocol.html#index +[issue-191]: https://github.com/ArchiveBox/ArchiveBox/issues/191 + +## What it does + +Given a seed URL the hook tries, in order: + +1. **`*.xml` / `*.xml.gz`** — parse directly as a sitemap or sitemap-index. +2. **`*/robots.txt`** — read every `Sitemap:` line and walk each one. +3. **Anything else** (treated as a site root): + 1. Probe `/robots.txt` for `Sitemap:` directives. + 2. If none found, fall back to the paths in + `PARSE_SITEMAP_URLS_FALLBACK_PATHS` + (default: `/sitemap.xml`, `/sitemap_index.xml`, + `/sitemap-index.xml`, `/wp-sitemap.xml`, `/sitemap.xml.gz`). + +For each `` document the hook emits a `Snapshot` record per +``, preserving the optional `` value as `bookmarked_at` +and recording `` / `` for filtering. For each +`` document it recurses into the child sitemaps up to +`PARSE_SITEMAP_URLS_MAX_SITEMAP_DEPTH`. + +Gzipped sitemaps (detected by the `1f 8b` magic bytes, a `.gz` suffix, +or a `Content-Encoding: gzip` response header) are transparently +decompressed under hard size / ratio caps. UTF-8, UTF-16 LE, and +UTF-16 BE byte-order marks are stripped before parsing. Fragments are +stripped from emitted URLs so `#anchor` variants do not produce +duplicate snapshots. + +### Optional sitemap extensions + +| Extension | Config | Behavior | +| --- | --- | --- | +| [Image][img] | `PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS=true` | Emits each `` as an extra `Snapshot` with `tags=sitemap-media`. | +| [Video][vid] | `PARSE_SITEMAP_URLS_EMIT_VIDEO_URLS=true` | Emits each `` and `` similarly. | +| [News][news] | `PARSE_SITEMAP_URLS_EMIT_NEWS_TAG=true` | Emits a `Tag` record per unique ``. | + +[img]: https://developers.google.com/search/docs/crawling-indexing/sitemaps/image-sitemaps +[vid]: https://developers.google.com/search/docs/crawling-indexing/sitemaps/video-sitemaps +[news]: https://developers.google.com/search/docs/crawling-indexing/sitemaps/news-sitemap + +## Security posture + +Sitemaps come from untrusted servers. The hook applies the following +defenses by default: + +- **XML hardening.** Parsing goes through `defusedxml`, which rejects + DTDs, internal/external entities, and external-resource resolution. + Billion-laughs and XXE payloads fail-closed. +- **Response size cap.** Each HTTP response is bounded to + `PARSE_SITEMAP_URLS_MAX_RESPONSE_BYTES` (default 50 MiB) before any + parsing happens. +- **Decompression cap.** Gzipped responses are bounded to + `PARSE_SITEMAP_URLS_MAX_DECOMPRESSED_BYTES` (default 200 MiB) and the + decompressed/compressed ratio is bounded to + `PARSE_SITEMAP_URLS_GZIP_MAX_RATIO` (default 100×). Gzip bombs fail + with `status=failed`. +- **Scheme allowlist.** Only `http` and `https` are accepted as + page-URL schemes; `javascript:`, `data:`, `ftp:`, and similar are + refused. `file://` is allowed only when the seed itself is `file://` + or when `PARSE_SITEMAP_URLS_ALLOW_FILE_URLS=true` is set. +- **Bounded, validated redirects.** Redirects are capped by + `PARSE_SITEMAP_URLS_HTTP_MAX_REDIRECTS` and rejected when the target + uses a non-HTTP scheme or resolves to a loopback / RFC1918 / + link-local / multicast address (unless + `PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS=true`). +- **Per-emit regex scan length cap.** `INCLUDE_REGEX` / `EXCLUDE_REGEX` + scan only the first `PARSE_SITEMAP_URLS_REGEX_INPUT_CAP` characters + of each URL, blunting catastrophic-backtracking risk on long URLs. +- **Sitemap attempt cap.** `PARSE_SITEMAP_URLS_MAX_SITEMAPS` caps the + number of sitemap fetch *attempts* (default 100), so an adversarial + sitemap-index pointing at thousands of 404 / timeout / refused + children cannot trigger that many outbound requests. + +The seed URL is also subject to the scheme + private-host gates, so a +crafted `archivebox add file:///etc/passwd` does not produce a +disclosable record unless the operator explicitly opts in. + +**DNS-rebinding caveat.** The private-host check resolves the +hostname at policy time, but `urllib` resolves it again at connect +time. A rebinding DNS record could return a public IP to the first +lookup and a private IP to the second. This plugin does not pin the +resolved IP through to the socket connect; if your threat model +includes DNS rebinding, run behind an outbound firewall that blocks +RFC1918 / loopback targets at the network layer. + +## Configuration + +| Env var | Default | Description | +| --- | --- | --- | +| `PARSE_SITEMAP_URLS_ENABLED` (`USE_PARSE_SITEMAP_URLS`, `SAVE_SITEMAP_URLS`) | `true` | Toggle the plugin. | +| `PARSE_SITEMAP_URLS_MAX_URLS` | `5000` | Hard cap on emitted `Snapshot` records per invocation. | +| `PARSE_SITEMAP_URLS_MAX_SITEMAP_DEPTH` | `5` | Max recursion depth when following sitemap-index documents. `0` walks only the seed; `1` walks seed plus one level of children. | +| `PARSE_SITEMAP_URLS_MAX_SITEMAPS` | `100` | Max number of sitemap fetch attempts across the entire walk (defense against adversarial sitemap-indexes pointing at thousands of empty / broken children). `0` disables the cap. | +| `PARSE_SITEMAP_URLS_TIMEOUT` (fallback: `TIMEOUT`) | `60` | Network timeout per fetch, in seconds. | +| `PARSE_SITEMAP_URLS_USER_AGENT` (fallback: `USER_AGENT`) | shared default | User-Agent for HTTP requests. | +| `PARSE_SITEMAP_URLS_INCLUDE_REGEX` | `""` | Only URLs matching this regex are emitted (scanned up to `REGEX_INPUT_CAP` chars). | +| `PARSE_SITEMAP_URLS_EXCLUDE_REGEX` | `""` | URLs matching this regex are skipped. | +| `PARSE_SITEMAP_URLS_REGEX_INPUT_CAP` | `8192` | Maximum URL prefix length scanned by the regex filters. | +| `PARSE_SITEMAP_URLS_SAME_HOST_ONLY` | `false` | Skip URLs whose host differs from the seed URL's host. | +| `PARSE_SITEMAP_URLS_DISCOVER_FROM_ROBOTS` | `true` | Probe `robots.txt` for `Sitemap:` directives. | +| `PARSE_SITEMAP_URLS_FALLBACK_PATHS` | `[/sitemap.xml, /sitemap_index.xml, /sitemap-index.xml, /wp-sitemap.xml, /sitemap.xml.gz]` | Paths to probe when no robots.txt sitemap was found. | +| `PARSE_SITEMAP_URLS_HTTP_RETRIES` | `2` | Retries on transient failures (408, 429, 5xx, network errors). | +| `PARSE_SITEMAP_URLS_HTTP_BACKOFF_SECONDS` | `1.0` | Base delay for exponential backoff between retries. | +| `PARSE_SITEMAP_URLS_HTTP_MAX_REDIRECTS` | `5` | Max HTTP redirects per fetch. The custom redirect handler rejects non-HTTP schemes and private hosts. | +| `PARSE_SITEMAP_URLS_MAX_RESPONSE_BYTES` | `52428800` | Maximum on-the-wire response size (50 MiB). | +| `PARSE_SITEMAP_URLS_MAX_DECOMPRESSED_BYTES` | `209715200` | Maximum size after gzip decompression (200 MiB). | +| `PARSE_SITEMAP_URLS_GZIP_MAX_RATIO` | `100` | Maximum decompressed/compressed ratio (gzip bomb guard); `0` disables. | +| `PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS` | `false` | Allow fetches and redirects to loopback / RFC1918 / link-local / multicast addresses. | +| `PARSE_SITEMAP_URLS_ALLOW_FILE_URLS` | `false` | Allow `file://` URLs in fetched sitemaps when the seed is not `file://`. | +| `PARSE_SITEMAP_URLS_VERIFY_TLS` (fallback: `CHECK_SSL_VALIDITY`) | `true` | Verify TLS certificates on HTTPS fetches. | +| `PARSE_SITEMAP_URLS_ACCEPT_LANGUAGE` | `""` | Optional `Accept-Language` header value. | +| `PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS` | `false` | Emit URLs from `` (Sitemap image extension). Subject to the same URL policy as page URLs. | +| `PARSE_SITEMAP_URLS_EMIT_VIDEO_URLS` | `false` | Emit URLs from `` / ``. | +| `PARSE_SITEMAP_URLS_EMIT_NEWS_TAG` | `false` | Emit `Tag` records for ``. | +| `PARSE_SITEMAP_URLS_PRIORITY_MIN` | `0.0` | Drop URLs whose `` is below this threshold (`0.0` disables). Entries without `` pass through unless `REQUIRE_PRIORITY=true`. | +| `PARSE_SITEMAP_URLS_REQUIRE_PRIORITY` | `false` | When `PRIORITY_MIN > 0`, also drop URLs with no `` tag. | +| `PARSE_SITEMAP_URLS_CHANGEFREQ_ALLOWED` | `[]` | When non-empty, only emit URLs whose `` appears in this list. | +| `PARSE_SITEMAP_URLS_SORT_BY` | `url` | `url` (alpha) / `lastmod` (newest first) / `priority` (highest first) / `order` (preserve sitemap order). | +| `PARSE_SITEMAP_URLS_VERBOSE` | `false` | Emit one `fetching sitemap …` line per fetch to stderr. | + +The plugin also honours the shared `USER_AGENT`, `TIMEOUT`, +`CHECK_SSL_VALIDITY`, and `SNAP_DIR` env vars from `base/config.json`. + +## Outputs + +- **stdout** — one JSONL record per line: + - 0+ `Tag` records (when the news extension is enabled). + - 0+ `Snapshot` records (one per discovered URL, with + `depth = parent + 1`). Media extras carry `tags=sitemap-media`. + - Exactly one terminal `ArchiveResult` record. +- **`SNAP_DIR/parse_sitemap_urls/urls.jsonl`** — same `Snapshot` records, + persisted for the host's crawl frontier. Written atomically and + removed on `noresults` / `failed`. +- **stderr** — discovery / fetch error lines and the human summary of + the `ArchiveResult`. + +`ArchiveResult.status` follows the abx contract: + +| status | meaning | +| --- | --- | +| `succeeded` | At least one URL emitted. | +| `noresults` | No URLs (empty sitemap, or every URL filtered out). | +| `skipped` | `PARSE_SITEMAP_URLS_ENABLED=false`. | +| `failed` | Every candidate sitemap failed to fetch or parse, or a security guard tripped. | + +The summary string carries counters so logs make it obvious why nothing +emitted, e.g. +`0 URLs parsed (visited 1 sitemap(s); skipped_filter=3 skipped_host=0 skipped_priority=2 skipped_changefreq=0 skipped_scheme=1 skipped_extras=0)`. + +## Examples + +```bash +# Just give it a site root. +./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com + +# Point directly at a known sitemap. +./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com/sitemap.xml + +# Point at robots.txt (reads all Sitemap: lines). +./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com/robots.txt + +# Restrict to a subtree of a large site. +PARSE_SITEMAP_URLS_INCLUDE_REGEX="^https://example\\.com/blog/" \ + ./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com + +# Skip product pages while crawling marketing pages. +PARSE_SITEMAP_URLS_EXCLUDE_REGEX="/products/" \ + ./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com + +# Lock the crawl to the seed host (skip CDN / asset hosts). +PARSE_SITEMAP_URLS_SAME_HOST_ONLY=true \ + ./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com + +# Only crawl high-priority, daily-refreshed pages, newest first. +PARSE_SITEMAP_URLS_PRIORITY_MIN=0.7 \ +PARSE_SITEMAP_URLS_CHANGEFREQ_ALLOWED='["daily","hourly"]' \ +PARSE_SITEMAP_URLS_SORT_BY=lastmod \ + ./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com + +# Aggressive HTTP retries against a flaky server. +PARSE_SITEMAP_URLS_HTTP_RETRIES=5 \ +PARSE_SITEMAP_URLS_HTTP_BACKOFF_SECONDS=2.0 \ + ./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com/sitemap.xml + +# Pull image URLs out of an image sitemap as additional Snapshots. +PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS=true \ + ./on_Snapshot__76_parse_sitemap_urls.py --url=https://example.com/image-sitemap.xml + +# Self-hosted intranet sitemap — explicitly allow private IPs. +PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS=true \ + ./on_Snapshot__76_parse_sitemap_urls.py --url=https://intranet.local/sitemap.xml +``` + +## Running with ArchiveBox / abx-dl + +The hook follows the standard `on_Snapshot__*` contract: + +- File name `on_Snapshot__76_parse_sitemap_urls.py` places it after + `parse_dom_outlinks (75)` and before any later snapshot work. +- It depends only on the Python standard library plus `rich_click`, + `defusedxml`, and `abx_plugins.plugins.base.utils`. No binary + preflight and no `required_plugins`. +- It emits `Snapshot` records the host consumes via its normal crawl + frontier; the host applies its own `max_depth` / `max_urls` ceiling + on top of the plugin-level caps documented above. + +## Notes and non-goals + +- **JS-rendered links are out of scope.** Pair with + [`parse_dom_outlinks`](../parse_dom_outlinks/) for SPAs that don't + publish a complete sitemap. +- **Politeness is the host's job.** This hook fetches at most one + document per visited sitemap node and never crawls page content; the + host applies rate-limiting when it later fetches each discovered URL. +- **No HTTP caching between runs.** Reruns re-fetch sitemaps so updates + propagate; existing `urls.jsonl` is overwritten atomically. + +## License + +MIT — same as the parent `abx-plugins` package. diff --git a/abx_plugins/plugins/parse_sitemap_urls/__init__.py b/abx_plugins/plugins/parse_sitemap_urls/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/abx_plugins/plugins/parse_sitemap_urls/config.json b/abx_plugins/plugins/parse_sitemap_urls/config.json new file mode 100644 index 0000000..963a8b0 --- /dev/null +++ b/abx_plugins/plugins/parse_sitemap_urls/config.json @@ -0,0 +1,211 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Parse Sitemap URLs", + "description": "Discover URLs from sitemap.xml (urlset and sitemapindex documents, gzipped sitemaps, robots.txt Sitemap: directives, and sitemap image/video/news extensions) and emit one Snapshot record per discovered URL so the host can crawl an entire site from a single seed URL.", + "type": "object", + "additionalProperties": false, + "required_plugins": [], + "required_binaries": [], + "output_mimetypes": [ + "application/x-ndjson" + ], + "properties": { + "PARSE_SITEMAP_URLS_ENABLED": { + "type": "boolean", + "default": true, + "x-aliases": [ + "USE_PARSE_SITEMAP_URLS", + "SAVE_SITEMAP_URLS" + ], + "description": "Enable sitemap.xml URL discovery" + }, + "PARSE_SITEMAP_URLS_MAX_URLS": { + "type": "integer", + "default": 5000, + "minimum": 1, + "description": "Maximum number of URLs to emit from a single sitemap invocation" + }, + "PARSE_SITEMAP_URLS_MAX_SITEMAP_DEPTH": { + "type": "integer", + "default": 5, + "minimum": 0, + "description": "Maximum recursion depth when following nested sitemap index files. 0 walks only the seed; 1 walks seed plus one level of children." + }, + "PARSE_SITEMAP_URLS_MAX_SITEMAPS": { + "type": "integer", + "default": 100, + "minimum": 0, + "description": "Maximum number of sitemap documents to fetch across the entire walk (defense against an adversarial sitemap-index pointing at thousands of empty children). 0 disables the cap." + }, + "PARSE_SITEMAP_URLS_TIMEOUT": { + "type": "integer", + "default": 60, + "minimum": 1, + "x-fallback": "TIMEOUT", + "description": "Network timeout in seconds when fetching sitemap or robots.txt" + }, + "PARSE_SITEMAP_URLS_USER_AGENT": { + "type": "string", + "default": "", + "x-fallback": "USER_AGENT", + "description": "User agent string for HTTP requests" + }, + "PARSE_SITEMAP_URLS_INCLUDE_REGEX": { + "type": "string", + "default": "", + "description": "Only URLs matching this regex are emitted (empty = no filter). The first PARSE_SITEMAP_URLS_REGEX_INPUT_CAP characters of each URL are scanned to bound catastrophic regex risk." + }, + "PARSE_SITEMAP_URLS_EXCLUDE_REGEX": { + "type": "string", + "default": "", + "description": "URLs matching this regex are skipped (empty = no filter). Same scan-length cap as INCLUDE_REGEX." + }, + "PARSE_SITEMAP_URLS_REGEX_INPUT_CAP": { + "type": "integer", + "default": 8192, + "minimum": 128, + "description": "Maximum number of URL characters scanned by INCLUDE_REGEX / EXCLUDE_REGEX. Bounds catastrophic-backtracking risk." + }, + "PARSE_SITEMAP_URLS_SAME_HOST_ONLY": { + "type": "boolean", + "default": false, + "description": "Skip URLs whose host differs from the seed URL's host" + }, + "PARSE_SITEMAP_URLS_DISCOVER_FROM_ROBOTS": { + "type": "boolean", + "default": true, + "description": "When the seed URL is a site root, parse robots.txt for Sitemap: directives" + }, + "PARSE_SITEMAP_URLS_FALLBACK_PATHS": { + "type": "array", + "default": [ + "/sitemap.xml", + "/sitemap_index.xml", + "/sitemap-index.xml", + "/wp-sitemap.xml", + "/sitemap.xml.gz" + ], + "items": { + "type": "string" + }, + "description": "Fallback sitemap paths to probe when no robots.txt sitemap was found" + }, + "PARSE_SITEMAP_URLS_HTTP_RETRIES": { + "type": "integer", + "default": 2, + "minimum": 0, + "description": "Number of times to retry transient HTTP failures (408, 429, 5xx, or network errors)" + }, + "PARSE_SITEMAP_URLS_HTTP_BACKOFF_SECONDS": { + "type": "number", + "default": 1.0, + "minimum": 0, + "description": "Base delay for exponential backoff between retries" + }, + "PARSE_SITEMAP_URLS_HTTP_MAX_REDIRECTS": { + "type": "integer", + "default": 5, + "minimum": 0, + "description": "Maximum HTTP redirects to follow per sitemap fetch (custom handler enforces this; rejects redirects to non-http(s) schemes and to private hosts unless ALLOW_PRIVATE_HOSTS is set)" + }, + "PARSE_SITEMAP_URLS_MAX_RESPONSE_BYTES": { + "type": "integer", + "default": 52428800, + "minimum": 1024, + "description": "Maximum on-the-wire response size, in bytes (default 50 MiB)" + }, + "PARSE_SITEMAP_URLS_MAX_DECOMPRESSED_BYTES": { + "type": "integer", + "default": 209715200, + "minimum": 1024, + "description": "Maximum size after gzip decompression, in bytes (default 200 MiB)" + }, + "PARSE_SITEMAP_URLS_GZIP_MAX_RATIO": { + "type": "integer", + "default": 100, + "minimum": 0, + "description": "Maximum decompressed/compressed ratio. Sitemaps with a larger ratio are rejected as bombs. 0 disables the check." + }, + "PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS": { + "type": "boolean", + "default": false, + "description": "Allow fetches / redirects to private, loopback, link-local, or multicast addresses. Off by default to prevent SSRF via crafted sitemaps." + }, + "PARSE_SITEMAP_URLS_ALLOW_FILE_URLS": { + "type": "boolean", + "default": false, + "description": "Allow file:// URLs in fetched sitemaps when the seed is not file://. Off by default; the seed is automatically permitted to be file:// for local testing." + }, + "PARSE_SITEMAP_URLS_VERIFY_TLS": { + "type": "boolean", + "default": true, + "x-fallback": "CHECK_SSL_VALIDITY", + "description": "Verify TLS certificates on HTTPS sitemap fetches" + }, + "PARSE_SITEMAP_URLS_ACCEPT_LANGUAGE": { + "type": "string", + "default": "", + "description": "Optional Accept-Language header value (empty = omit header)" + }, + "PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS": { + "type": "boolean", + "default": false, + "description": "Also emit URLs from elements (Sitemap image extension). Media URLs are filtered through the same scheme / host / regex pipeline as page URLs." + }, + "PARSE_SITEMAP_URLS_EMIT_VIDEO_URLS": { + "type": "boolean", + "default": false, + "description": "Also emit URLs from and (Sitemap video extension). Subject to the same URL policy as page URLs." + }, + "PARSE_SITEMAP_URLS_EMIT_NEWS_TAG": { + "type": "boolean", + "default": false, + "description": "Emit Tag records for values found in (Sitemap news extension)." + }, + "PARSE_SITEMAP_URLS_PRIORITY_MIN": { + "type": "number", + "default": 0.0, + "minimum": 0.0, + "maximum": 1.0, + "description": "Skip URLs whose is below this threshold. Entries without a tag pass through by default; set PARSE_SITEMAP_URLS_REQUIRE_PRIORITY=true to drop them instead. 0.0 disables the filter." + }, + "PARSE_SITEMAP_URLS_REQUIRE_PRIORITY": { + "type": "boolean", + "default": false, + "description": "When PRIORITY_MIN > 0, also drop URLs that have no tag. Off by default — most real sitemaps omit ." + }, + "PARSE_SITEMAP_URLS_CHANGEFREQ_ALLOWED": { + "type": "array", + "default": [], + "items": { + "type": "string", + "enum": [ + "always", + "hourly", + "daily", + "weekly", + "monthly", + "yearly", + "never" + ] + }, + "description": "If non-empty, only emit URLs whose appears in this list" + }, + "PARSE_SITEMAP_URLS_SORT_BY": { + "type": "string", + "default": "url", + "enum": [ + "url", + "lastmod", + "priority", + "order" + ], + "description": "Output ordering: url (alpha), lastmod (newest first), priority (highest first), or order (preserve sitemap order)" + }, + "PARSE_SITEMAP_URLS_VERBOSE": { + "type": "boolean", + "default": false, + "description": "Emit per-sitemap diagnostic lines to stderr" + } + } +} diff --git a/abx_plugins/plugins/parse_sitemap_urls/on_Snapshot__76_parse_sitemap_urls.py b/abx_plugins/plugins/parse_sitemap_urls/on_Snapshot__76_parse_sitemap_urls.py new file mode 100755 index 0000000..de27243 --- /dev/null +++ b/abx_plugins/plugins/parse_sitemap_urls/on_Snapshot__76_parse_sitemap_urls.py @@ -0,0 +1,1307 @@ +#!/usr/bin/env -S uv run --active --script +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "pydantic-settings", +# "jambo", +# "rich-click", +# "abx-plugins", +# "defusedxml>=0.7.1", +# ] +# /// +""" +Parse sitemap.xml (and sitemap-index, gzipped sitemaps, robots.txt +Sitemap: directives, and image/video/news extensions) and emit one +Snapshot record per discovered URL. + +This is a standalone extractor that runs without ArchiveBox. Given any +seed URL the hook tries, in order: + +1. If the URL points at a `.xml` / `.xml.gz` file, treat it as a sitemap. +2. If the URL points at a robots.txt, parse it for `Sitemap:` directives. +3. Otherwise treat the URL as a site root, probe robots.txt, then fall + back to a list of common sitemap paths. + +The host (ArchiveBox or abx-dl) owns the crawl frontier; this hook only +emits Snapshot JSONL records with an incremented `depth`. The host +applies its own max_depth / max_urls / dedup logic on top. + +Security posture: every discovered URL passes through scheme allowlist, +optional same-host + private-IP guards, regex filters, and a global +visited-set. XML is parsed with `defusedxml` (no DTDs, no entities, no +external resolution). HTTP responses are size-capped before +decompression and the decompression itself is ratio-capped to neutralize +gzip bombs. Redirects are bounded and validated. +""" + +from __future__ import annotations + +import gzip +import io +import ipaddress +import json +import os +import re +import socket +import ssl +import sys +import time +import urllib.error +import urllib.request +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any +from urllib.parse import urldefrag, urljoin, urlparse, urlsplit +from urllib.request import url2pathname +from xml.etree.ElementTree import Element, ParseError + +import rich_click as click +from defusedxml.common import DefusedXmlException +from defusedxml.ElementTree import iterparse as defused_iterparse + +from abx_plugins.plugins.base.url_cleaning import sanitize_extracted_url +from abx_plugins.plugins.base.utils import ( + emit_archive_result_record, + emit_snapshot_record, + emit_tag_record, + get_extra_context, + load_config, + write_text_atomic, +) + +PLUGIN_NAME = "parse_sitemap_urls" +PLUGIN_DIR = Path(__file__).resolve().parent.name +CONFIG = load_config() +SNAP_DIR = Path(CONFIG.SNAP_DIR or ".").resolve() +OUTPUT_DIR = SNAP_DIR / PLUGIN_DIR +OUTPUT_DIR.mkdir(parents=True, exist_ok=True) +os.chdir(OUTPUT_DIR) + +URLS_FILE = Path("urls.jsonl") +NORESULTS_OUTPUT = "0 URLs parsed" + +SITEMAP_NS = { + "s": "http://www.sitemaps.org/schemas/sitemap/0.9", + "image": "http://www.google.com/schemas/sitemap-image/1.1", + "video": "http://www.google.com/schemas/sitemap-video/1.1", + "news": "http://www.google.com/schemas/sitemap-news/0.9", +} +GZIP_MAGIC = b"\x1f\x8b" +UTF8_BOM = b"\xef\xbb\xbf" +UTF16_LE_BOM = b"\xff\xfe" +UTF16_BE_BOM = b"\xfe\xff" +ROBOTS_SITEMAP_RE = re.compile(r"^\s*sitemap\s*:\s*(\S+)\s*$", re.IGNORECASE) +TRANSIENT_HTTP_STATUSES = frozenset({408, 429, 500, 502, 503, 504}) +ALLOWED_REMOTE_SCHEMES = frozenset({"http", "https"}) +ALLOWED_FALLBACK_PATHS = [ + "/sitemap.xml", + "/sitemap_index.xml", + "/sitemap-index.xml", + "/wp-sitemap.xml", + "/sitemap.xml.gz", +] + +# Defensive caps; configurable via env. +DEFAULT_MAX_RESPONSE_BYTES = 50 * 1024 * 1024 # 50 MiB on the wire +DEFAULT_MAX_DECOMPRESSED_BYTES = 200 * 1024 * 1024 # 200 MiB after gunzip +DEFAULT_GZIP_MAX_RATIO = 100 # decompressed / compressed +DEFAULT_REGEX_INPUT_CAP = 8192 # max URL length passed to user regex + + +# --------------------------------------------------------------------------- +# URL helpers +# --------------------------------------------------------------------------- + + +def _strip_query_and_fragment(url: str) -> str: + return url.split("?", 1)[0].split("#", 1)[0] + + +def _is_xml_url(url: str) -> bool: + lowered = _strip_query_and_fragment(url).lower() + return lowered.endswith((".xml", ".xml.gz")) + + +def _is_robots_url(url: str) -> bool: + """True when the URL path's basename is exactly `robots.txt`. + + A trailing match on `robots.txt` alone would also catch + `foo-robots.txt`; we require the path basename to be the file. + """ + path = _strip_query_and_fragment(url).lower() + if not path: + return False + return path.rsplit("/", 1)[-1] == "robots.txt" + + +def _site_root(url: str) -> str: + parsed = urlparse(url) + if not parsed.scheme or not parsed.netloc: + return url + return f"{parsed.scheme}://{parsed.netloc}" + + +def _normalize_url(raw: str, *, base_url: str | None = None) -> str: + """Trim quoting/entity garbage, resolve scheme-relative URLs, drop fragments.""" + cleaned = sanitize_extracted_url(raw).strip() + if not cleaned: + return "" + if cleaned.startswith("//") and base_url: + parsed_base = urlparse(base_url) + if parsed_base.scheme: + cleaned = f"{parsed_base.scheme}:{cleaned}" + cleaned, _ = urldefrag(cleaned) + return cleaned.strip() + + +def _hosts_match(seed_host: str, candidate: str) -> bool: + parsed = urlparse(candidate) + return parsed.netloc.lower() == seed_host.lower() + + +def _ip_is_private(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool: + return ( + ip.is_loopback + or ip.is_private + or ip.is_link_local + or ip.is_multicast + or ip.is_reserved + or ip.is_unspecified + ) + + +@dataclass(frozen=True) +class HostCheck: + """Result of a private-host probe; distinguishes private from unresolvable.""" + + private: bool + reason: str + + +def _classify_host(netloc: str) -> HostCheck: + """Classify a netloc for SSRF policy. + + Re-resolves DNS on every call. The check still has a TOCTOU window + against the subsequent socket connect (urllib re-resolves), so this + is best treated as a defense-in-depth layer alongside the scheme + allowlist, response-size caps, and the operator's outbound firewall + rules. A fully TOCTOU-safe design would require pinning to the + resolved IP at connect time, which is out of scope for this plugin. + """ + if not netloc: + return HostCheck(True, "empty_netloc") + # `urlsplit` correctly extracts hostnames from bracketed IPv6 forms + # like [::1]:8080. Falling back to split(":") would yield "[" or + # "[::1" depending on the form. + parsed = urlsplit(f"//{netloc}") + host = (parsed.hostname or "").strip() + if not host: + return HostCheck(True, "empty_host") + try: + ip = ipaddress.ip_address(host) + return HostCheck(_ip_is_private(ip), "literal_ip") + except ValueError: + pass + try: + infos = socket.getaddrinfo(host, None) + except socket.gaierror: + return HostCheck(True, "dns_unresolvable") + for _family, _type, _proto, _canon, sockaddr in infos: + try: + ip = ipaddress.ip_address(sockaddr[0]) + except ValueError: + continue + if _ip_is_private(ip): + return HostCheck(True, "resolves_to_private") + return HostCheck(False, "public") + + +def _is_private_host(netloc: str) -> bool: + """Return True if netloc resolves to loopback / private / link-local.""" + return _classify_host(netloc).private + + +def _strip_bom(payload: bytes) -> bytes: + if payload.startswith(UTF8_BOM): + return payload[len(UTF8_BOM) :] + if payload.startswith(UTF16_LE_BOM) or payload.startswith(UTF16_BE_BOM): + try: + decoded = payload.decode("utf-16") + except UnicodeDecodeError: + return payload + # Re-emit as UTF-8 and align the XML declaration so the parser doesn't + # choke on the apparent encoding/byte mismatch. + decoded = re.sub( + r'encoding\s*=\s*["\']\s*utf-?16(?:\s*-?\s*(?:le|be))?\s*["\']', + 'encoding="UTF-8"', + decoded, + count=1, + flags=re.IGNORECASE, + ) + return decoded.encode("utf-8") + return payload + + +def _safe_decompress(payload: bytes, *, max_bytes: int, max_ratio: int) -> bytes: + """Decompress gzip with hard caps. Raises ValueError on cap breach or + corrupt input. + + Wraps the underlying `gzip.GzipFile` errors (``OSError`` from + ``BadGzipFile`` / CRC failures, ``EOFError`` from truncation) so the + walker can map a single exception type to a normal `failed` + ArchiveResult. + """ + compressed_size = len(payload) + if compressed_size == 0: + return payload + try: + decompressor = gzip.GzipFile(fileobj=io.BytesIO(payload)) + out = io.BytesIO() + chunk_size = 64 * 1024 + while True: + chunk = decompressor.read(chunk_size) + if not chunk: + break + out.write(chunk) + produced = out.tell() + if produced > max_bytes: + raise ValueError( + f"decompressed payload exceeded {max_bytes} bytes cap", + ) + if max_ratio > 0 and produced > compressed_size * max_ratio: + raise ValueError( + f"decompressed/compressed ratio exceeded {max_ratio}x cap", + ) + except (OSError, EOFError) as exc: + raise ValueError(f"gzip decompression failed: {exc}") from exc + return out.getvalue() + + +def _maybe_decompress( + payload: bytes, + *, + url_hint: str = "", + max_bytes: int, + max_ratio: int, +) -> bytes: + # We only need to peek for the gzip magic bytes here; the .gz URL hint is + # *not* sufficient on its own because `_fetch_bytes` may have already + # decompressed a `Content-Encoding: gzip` body, leaving us with plain XML + # whose URL still ends in `.gz`. Double-decompressing that would raise + # `gzip.BadGzipFile` outside the caller's `ValueError` handler. + if not payload.startswith(GZIP_MAGIC): + return payload + _ = url_hint + return _safe_decompress(payload, max_bytes=max_bytes, max_ratio=max_ratio) + + +def _strip_ns(tag: str) -> str: + return tag.rsplit("}", 1)[-1] if "}" in tag else tag + + +def _findall_ns(parent: Element, prefix: str, local_name: str) -> list[Element]: + """Find children matching prefix:local_name in the sitemaps namespace and unnamespaced.""" + if prefix in SITEMAP_NS: + results = parent.findall(f"{prefix}:{local_name}", SITEMAP_NS) + if results: + return results + return parent.findall(local_name) + + +def _find_ns(parent: Element, prefix: str, local_name: str) -> Element | None: + if prefix in SITEMAP_NS: + found = parent.find(f"{prefix}:{local_name}", SITEMAP_NS) + if found is not None: + return found + return parent.find(local_name) + + +def _compile_optional(pattern: str) -> re.Pattern[str] | None: + if not pattern: + return None + try: + return re.compile(pattern) + except re.error as exc: + click.echo(f"WARNING: invalid regex {pattern!r}: {exc}", err=True) + return None + + +def _safe_float(raw: str | None) -> float | None: + if raw is None: + return None + try: + return float(raw.strip()) + except (ValueError, AttributeError): + return None + + +# --------------------------------------------------------------------------- +# HTTP fetch with retry, bounded redirects, body caps +# --------------------------------------------------------------------------- + + +@dataclass +class FetchOptions: + timeout: int + user_agent: str + retries: int = 2 + backoff_seconds: float = 1.0 + max_redirects: int = 5 + verify_tls: bool = True + accept_language: str = "" + max_response_bytes: int = DEFAULT_MAX_RESPONSE_BYTES + max_decompressed_bytes: int = DEFAULT_MAX_DECOMPRESSED_BYTES + gzip_max_ratio: int = DEFAULT_GZIP_MAX_RATIO + allow_private_hosts: bool = False + allow_file_urls: bool = False + + def headers(self) -> dict[str, str]: + out: dict[str, str] = { + "User-Agent": self.user_agent, + "Accept": "application/xml, text/xml, application/x-gzip, */*;q=0.1", + "Accept-Encoding": "gzip, identity", + } + if self.accept_language: + out["Accept-Language"] = self.accept_language + return out + + +def _build_ssl_context(verify: bool) -> ssl.SSLContext | None: + if verify: + return None + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + return context + + +class _BoundedRedirectHandler(urllib.request.HTTPRedirectHandler): + """Cap redirects and reject targets that violate the fetch policy.""" + + def __init__(self, options: FetchOptions) -> None: + super().__init__() + self._options = options + # `HTTPRedirectHandler.max_redirections` is what the stdlib uses to + # cap total redirects in the chain. Override per-instance via setattr + # so the config knob actually takes effect; ``setattr`` keeps + # type-checkers from flagging the ClassVar-vs-instance shape. + setattr(self, "max_redirections", max(0, options.max_redirects)) + + def redirect_request( + self, + req: urllib.request.Request, + fp: Any, + code: int, + msg: str, + headers: Any, + newurl: str, + ) -> urllib.request.Request | None: + target = urlparse(newurl) + if target.scheme not in ALLOWED_REMOTE_SCHEMES: + raise urllib.error.HTTPError( + newurl, + code, + f"refusing redirect to disallowed scheme {target.scheme!r}", + headers, + fp, + ) + if not self._options.allow_private_hosts and _is_private_host(target.netloc): + raise urllib.error.HTTPError( + newurl, + code, + f"refusing redirect to private host {target.netloc!r}", + headers, + fp, + ) + return super().redirect_request(req, fp, code, msg, headers, newurl) + + +def _build_opener(options: FetchOptions) -> urllib.request.OpenerDirector: + handlers: list[urllib.request.BaseHandler] = [_BoundedRedirectHandler(options)] + ssl_context = _build_ssl_context(options.verify_tls) + handlers.append( + urllib.request.HTTPSHandler(context=ssl_context) + if ssl_context + else urllib.request.HTTPSHandler(), + ) + opener = urllib.request.build_opener(*handlers) + opener.addheaders = [] # we set our own headers per-request + return opener + + +def _read_capped(response: Any, max_bytes: int) -> bytes: + """Read at most max_bytes from a response. Raises ValueError on overrun. + + Reads in 64 KiB chunks, but requests one byte past `max_bytes` exactly + once so the cap stays inclusive: a payload that is precisely + `max_bytes` bytes succeeds; `max_bytes + 1` fails. + """ + buf = io.BytesIO() + while True: + remaining_quota = max_bytes - buf.tell() + if remaining_quota < 0: + raise ValueError(f"response body exceeded {max_bytes} bytes cap") + # Always ask for one byte beyond the remaining quota so we can detect + # overrun without an off-by-one. When the quota hits zero we still + # try to read one byte to confirm EOF. + chunk = response.read(min(64 * 1024, remaining_quota + 1)) + if not chunk: + return buf.getvalue() + buf.write(chunk) + if buf.tell() > max_bytes: + raise ValueError(f"response body exceeded {max_bytes} bytes cap") + + +def _fetch_bytes(url: str, options: FetchOptions) -> bytes: + """Fetch a URL with retry/backoff. Raises URLError / OSError / ValueError.""" + parsed = urlparse(url) + if parsed.scheme == "file": + if not options.allow_file_urls: + raise ValueError(f"file:// not allowed by current policy: {url}") + # url2pathname decodes percent-escapes (so file:// URLs with spaces work) + # and handles Windows drive letters consistently. + local_path = url2pathname(parsed.path) + with open(local_path, "rb") as fh: + data = fh.read(options.max_response_bytes + 1) + if len(data) > options.max_response_bytes: + raise ValueError( + f"file {local_path} exceeded {options.max_response_bytes} bytes cap", + ) + return data + if parsed.scheme not in ALLOWED_REMOTE_SCHEMES: + raise ValueError(f"unsupported scheme {parsed.scheme!r} for {url}") + if not options.allow_private_hosts and _is_private_host(parsed.netloc): + raise ValueError(f"refusing fetch from private host {parsed.netloc!r}") + + last_error: BaseException | None = None + opener = _build_opener(options) + + for attempt in range(max(0, options.retries) + 1): + try: + req = urllib.request.Request(url, headers=options.headers()) + with opener.open(req, timeout=options.timeout) as response: + payload = _read_capped(response, options.max_response_bytes) + content_encoding = ( + response.headers.get("Content-Encoding") or "" + ).lower() + if content_encoding == "gzip": + payload = _safe_decompress( + payload, + max_bytes=options.max_decompressed_bytes, + max_ratio=options.gzip_max_ratio, + ) + return payload + except urllib.error.HTTPError as exc: + last_error = exc + if exc.code in TRANSIENT_HTTP_STATUSES and attempt < options.retries: + _sleep_backoff(options.backoff_seconds, attempt) + continue + raise + except (urllib.error.URLError, TimeoutError, OSError) as exc: + last_error = exc + if attempt < options.retries: + _sleep_backoff(options.backoff_seconds, attempt) + continue + raise + + assert last_error is not None # for type-narrowing + raise last_error + + +def _sleep_backoff(base: float, attempt: int) -> None: + if base <= 0: + return + delay = base * (2**attempt) + time.sleep(min(delay, 30.0)) + + +def _parse_robots_txt(payload: bytes) -> list[str]: + sitemaps: list[str] = [] + text = payload.decode("utf-8", errors="replace") + for line in text.splitlines(): + match = ROBOTS_SITEMAP_RE.match(line) + if match: + candidate = _normalize_url(match.group(1)) + if candidate and candidate not in sitemaps: + sitemaps.append(candidate) + return sitemaps + + +# --------------------------------------------------------------------------- +# Sitemap parsing +# --------------------------------------------------------------------------- + + +@dataclass +class PageEntry: + url: str + lastmod: str | None = None + priority: float | None = None + changefreq: str | None = None + extras: list[str] = field(default_factory=list) + extra_tags: list[str] = field(default_factory=list) + order_index: int = 0 + + +def _build_page_entry( + url_el: Element, + *, + base_url: str, + emit_image_urls: bool, + emit_video_urls: bool, + emit_news_tag: bool, + order_index: int, +) -> PageEntry | None: + loc_el = _find_ns(url_el, "s", "loc") + if loc_el is None or not loc_el.text: + return None + page_url = _normalize_url(loc_el.text, base_url=base_url) + if not page_url: + return None + entry = PageEntry(url=page_url, order_index=order_index) + + lastmod_el = _find_ns(url_el, "s", "lastmod") + if lastmod_el is not None and lastmod_el.text: + entry.lastmod = lastmod_el.text.strip() + + changefreq_el = _find_ns(url_el, "s", "changefreq") + if changefreq_el is not None and changefreq_el.text: + entry.changefreq = changefreq_el.text.strip().lower() + + priority_el = _find_ns(url_el, "s", "priority") + if priority_el is not None and priority_el.text: + entry.priority = _safe_float(priority_el.text) + + if emit_image_urls: + for image_el in _findall_ns(url_el, "image", "image"): + image_loc = _find_ns(image_el, "image", "loc") + if image_loc is not None and image_loc.text: + cleaned = _normalize_url(image_loc.text, base_url=base_url) + if cleaned: + entry.extras.append(cleaned) + + if emit_video_urls: + for video_el in _findall_ns(url_el, "video", "video"): + for video_loc_name in ("content_loc", "player_loc"): + video_loc = _find_ns(video_el, "video", video_loc_name) + if video_loc is not None and video_loc.text: + cleaned = _normalize_url(video_loc.text, base_url=base_url) + if cleaned: + entry.extras.append(cleaned) + + if emit_news_tag: + for news_el in _findall_ns(url_el, "news", "news"): + pub_el = _find_ns(news_el, "news", "publication") + if pub_el is None: + continue + name_el = _find_ns(pub_el, "news", "name") + if name_el is not None and name_el.text: + entry.extra_tags.append(name_el.text.strip()) + + return entry + + +def _stream_sitemap( + payload: bytes, + *, + base_url: str, + emit_image_urls: bool, + emit_video_urls: bool, + emit_news_tag: bool, + next_order_start: int, +): + """Stream `` / `` elements out of a sitemap document. + + Yields `("page", PageEntry)` for urlset entries and + `("child", str)` for sitemapindex children. Each element is freed + immediately after it is processed AND the just-processed sibling is + detached from the root's child list, so the resident XML tree + stays at O(1) regardless of how many `` elements the document + contains. Yields nothing for unknown root tags. Raises `ValueError` + on malformed XML so callers can map it to standard parse handling. + """ + order_index = next_order_start + root_element: Element | None = None + root_local: str | None = None + try: + for event, elem in defused_iterparse( + io.BytesIO(_strip_bom(payload)), + events=("start", "end"), + ): + local = _strip_ns(elem.tag) + if event == "start": + if root_local is None: + root_local = local + root_element = elem + continue + # event == "end" + yielded_child = False + if local == "sitemap" and root_local == "sitemapindex": + loc_el = _find_ns(elem, "s", "loc") + if loc_el is not None and loc_el.text: + cleaned = _normalize_url(loc_el.text, base_url=base_url) + if cleaned: + yielded_child = True + yield ("child", cleaned) + elif local == "url" and root_local == "urlset": + entry = _build_page_entry( + elem, + base_url=base_url, + emit_image_urls=emit_image_urls, + emit_video_urls=emit_video_urls, + emit_news_tag=emit_news_tag, + order_index=order_index, + ) + if entry is not None: + order_index += 1 + yielded_child = True + yield ("page", entry) + # Free the element and detach it from the root's child list so + # memory stays bounded even for 500k-URL documents. + if local in {"url", "sitemap"} and root_element is not None: + elem.clear() + # `remove(elem)` is O(n) on the child list; ET stores + # children in a list. Detaching the head each time keeps + # the per-iteration cost amortised O(1). + try: + root_element.remove(elem) + except ValueError: + pass + elif local in {"urlset", "sitemapindex"}: + elem.clear() + _ = yielded_child + except (ParseError, DefusedXmlException) as exc: + # Both malformed XML and defusedxml's "no DTDs / no entities" + # guards surface here; the walker maps any ValueError into a + # standard `failed` ArchiveResult so the hook contract holds. + raise ValueError(str(exc)) from exc + + +# --------------------------------------------------------------------------- +# URL acceptance policy +# --------------------------------------------------------------------------- + + +@dataclass +class UrlPolicy: + """Final gate every emitted URL must pass.""" + + seed_host: str + allow_file_urls: bool + allow_private_hosts: bool + same_host_only: bool + include_re: re.Pattern[str] | None + exclude_re: re.Pattern[str] | None + regex_input_cap: int = DEFAULT_REGEX_INPUT_CAP + + def reason_to_drop_fetch(self, url: str) -> str | None: + """Gate for URLs we are about to fetch (seeds, child sitemaps). + + Applies only scheme + host policy; never regex / same-host. The + regex filters describe which *pages* we want to emit, not which + *sitemaps* we want to read. + """ + parsed = urlparse(url) + scheme = parsed.scheme.lower() + if scheme == "file": + if not self.allow_file_urls: + return "scheme_file" + return None + if scheme not in ALLOWED_REMOTE_SCHEMES: + return f"scheme_{scheme or 'empty'}" + if not parsed.netloc: + return "no_netloc" + if not self.allow_private_hosts and _is_private_host(parsed.netloc): + return "private_host" + return None + + def reason_to_drop_emit(self, url: str) -> str | None: + """Gate for URLs we are about to emit as Snapshot records. + + Layers same-host + include/exclude regex on top of the fetch + policy. + """ + fetch_drop = self.reason_to_drop_fetch(url) + if fetch_drop is not None: + return fetch_drop + if self.same_host_only and not _hosts_match(self.seed_host, url): + return "host_mismatch" + if self.include_re is not None or self.exclude_re is not None: + scan_target = url[: self.regex_input_cap] + if self.include_re is not None and not self.include_re.search(scan_target): + return "include_filter" + if self.exclude_re is not None and self.exclude_re.search(scan_target): + return "exclude_filter" + return None + + +# --------------------------------------------------------------------------- +# Walker +# --------------------------------------------------------------------------- + + +@dataclass +class WalkerOptions: + max_urls: int + max_depth: int + max_sitemaps: int + priority_min: float + changefreq_allowed: set[str] + require_priority: bool + emit_image_urls: bool + emit_video_urls: bool + emit_news_tag: bool + restrict_child_to_seed_host: bool + verbose: bool + + +class SitemapWalker: + """Walk a tree of sitemap and sitemap-index documents.""" + + def __init__( + self, + *, + fetch: FetchOptions, + options: WalkerOptions, + policy: UrlPolicy, + ) -> None: + self.fetch = fetch + self.options = options + self.policy = policy + self.visited_sitemaps: set[str] = set() + self.seen_urls: set[str] = set() + self.page_entries: list[PageEntry] = [] + self.sitemap_count = 0 + self.sitemap_attempts = 0 + self.skipped_filter = 0 + self.skipped_host = 0 + self.skipped_priority = 0 + self.skipped_changefreq = 0 + self.skipped_scheme = 0 + self.errors: list[str] = [] + self._order_counter = 0 + + def walk(self, seed_url: str) -> None: + self._walk_one(seed_url, depth=0) + + def _walk_one(self, sitemap_url: str, *, depth: int) -> None: + if depth > self.options.max_depth: + self.errors.append(f"max_depth reached at {sitemap_url}") + return + if sitemap_url in self.visited_sitemaps: + return + self.visited_sitemaps.add(sitemap_url) + if len(self.page_entries) >= self.options.max_urls: + return + # Cap is on fetch *attempts*, not parsed successes — otherwise an + # index pointing at thousands of 404 / timeout / refused children + # could still trigger that many network calls. + if ( + self.options.max_sitemaps > 0 + and self.sitemap_attempts >= self.options.max_sitemaps + ): + self.errors.append( + f"max_sitemaps={self.options.max_sitemaps} reached; " + f"refusing {sitemap_url}", + ) + return + self.sitemap_attempts += 1 + + if self.options.verbose: + click.echo(f"fetching sitemap {sitemap_url}", err=True) + + try: + raw = _fetch_bytes(sitemap_url, self.fetch) + except (urllib.error.URLError, OSError, ValueError) as exc: + self.errors.append(f"fetch failed for {sitemap_url}: {exc}") + return + + try: + payload = _maybe_decompress( + raw, + url_hint=sitemap_url, + max_bytes=self.fetch.max_decompressed_bytes, + max_ratio=self.fetch.gzip_max_ratio, + ) + except ValueError as exc: + self.errors.append(f"decompression failed for {sitemap_url}: {exc}") + return + + # Stream the document and apply filters / dedup / max-urls inline so + # we never materialize 50k entries when MAX_URLS is 10. + deferred_children: list[str] = [] + try: + for kind, value in _stream_sitemap( + payload, + base_url=sitemap_url, + emit_image_urls=self.options.emit_image_urls, + emit_video_urls=self.options.emit_video_urls, + emit_news_tag=self.options.emit_news_tag, + next_order_start=self._order_counter, + ): + if kind == "child" and isinstance(value, str): + deferred_children.append(value) + continue + if kind != "page" or not isinstance(value, PageEntry): + continue + entry = value + self._order_counter = entry.order_index + 1 + if len(self.page_entries) >= self.options.max_urls: + break + if not self._entry_passes_filters(entry): + continue + if entry.url in self.seen_urls: + continue + self.seen_urls.add(entry.url) + self.page_entries.append(entry) + except ValueError as exc: + self.errors.append(f"not valid XML: {sitemap_url}: {exc}") + return + + # XML parsed cleanly — count the visit even if the root tag wasn't + # `` or `` (treated as noresults, not failed). + self.sitemap_count += 1 + + for child_url in deferred_children: + if len(self.page_entries) >= self.options.max_urls: + return + if ( + self.options.max_sitemaps > 0 + and self.sitemap_attempts >= self.options.max_sitemaps + ): + self.errors.append( + f"max_sitemaps={self.options.max_sitemaps} reached; " + f"refusing {child_url}", + ) + return + drop = self.policy.reason_to_drop_fetch(child_url) + if drop is not None: + self.errors.append( + f"refusing child sitemap {child_url} ({drop})", + ) + continue + # sitemaps.org §2.2: URLs in a sitemap must share the parent + # sitemap's host. When SAME_HOST_ONLY is set we also enforce + # this at the child-sitemap fetch boundary so a sitemap-index + # on host A cannot pivot the walker onto host B. + if self.options.restrict_child_to_seed_host and not _hosts_match( + self.policy.seed_host, + child_url, + ): + self.errors.append( + f"refusing child sitemap {child_url} (host_mismatch)", + ) + continue + self._walk_one(child_url, depth=depth + 1) + + def _entry_passes_filters(self, entry: PageEntry) -> bool: + drop = self.policy.reason_to_drop_emit(entry.url) + if drop is not None: + if drop in { + "scheme_file", + "scheme_javascript", + "no_netloc", + "private_host", + } or drop.startswith( + "scheme_", + ): + self.skipped_scheme += 1 + elif drop == "host_mismatch": + self.skipped_host += 1 + else: + self.skipped_filter += 1 + return False + if self.options.priority_min > 0.0: + if entry.priority is None: + if self.options.require_priority: + self.skipped_priority += 1 + return False + elif entry.priority < self.options.priority_min: + self.skipped_priority += 1 + return False + if self.options.changefreq_allowed: + if ( + entry.changefreq is None + or entry.changefreq not in self.options.changefreq_allowed + ): + self.skipped_changefreq += 1 + return False + return True + + +# --------------------------------------------------------------------------- +# Seed resolution +# --------------------------------------------------------------------------- + + +def _resolve_sitemap_seeds( + seed_url: str, + *, + fetch: FetchOptions, + discover_from_robots: bool, + fallback_paths: list[str], +) -> tuple[list[str], list[str]]: + """Return (sitemap_urls, info_messages) for a seed URL.""" + info: list[str] = [] + + if _is_xml_url(seed_url): + return [seed_url], info + + if _is_robots_url(seed_url): + try: + payload = _fetch_bytes(seed_url, fetch) + except (urllib.error.URLError, OSError, ValueError) as exc: + info.append(f"failed to fetch {seed_url}: {exc}") + return [], info + sitemaps = _parse_robots_txt(payload) + if not sitemaps: + info.append(f"robots.txt has no Sitemap: directives ({seed_url})") + return sitemaps, info + + site_root = _site_root(seed_url) + discovered: list[str] = [] + + if discover_from_robots: + robots_url = urljoin(site_root + "/", "robots.txt") + try: + payload = _fetch_bytes(robots_url, fetch) + robots_sitemaps = _parse_robots_txt(payload) + if robots_sitemaps: + discovered.extend(robots_sitemaps) + info.append( + f"discovered {len(robots_sitemaps)} sitemap(s) via {robots_url}", + ) + else: + info.append( + f"robots.txt found but had no Sitemap: lines ({robots_url})", + ) + except (urllib.error.URLError, OSError, ValueError) as exc: + info.append(f"robots.txt unavailable ({robots_url}): {exc}") + + if not discovered: + for path in fallback_paths: + candidate = urljoin(site_root + "/", path.lstrip("/")) + if candidate not in discovered: + discovered.append(candidate) + if discovered: + info.append( + f"falling back to {len(fallback_paths)} sitemap path(s) under {site_root}", + ) + + return discovered, info + + +# --------------------------------------------------------------------------- +# Sorting + persistence +# --------------------------------------------------------------------------- + + +def _sort_entries(entries: list[PageEntry], mode: str) -> list[PageEntry]: + if mode == "lastmod": + return sorted(entries, key=lambda e: e.lastmod or "", reverse=True) + if mode == "priority": + return sorted( + entries, + key=lambda e: e.priority if e.priority is not None else -1.0, + reverse=True, + ) + if mode == "order": + return sorted(entries, key=lambda e: e.order_index) + return sorted(entries, key=lambda e: e.url) + + +def persist_records(records: list[dict]) -> tuple[str, str]: + if records: + write_text_atomic( + URLS_FILE, + "\n".join(json.dumps(record) for record in records) + "\n", + ) + return "succeeded", f"{len(records)} URLs parsed" + URLS_FILE.unlink(missing_ok=True) + return "noresults", NORESULTS_OUTPUT + + +def emit_result(status: str, output_str: str) -> None: + emit_archive_result_record(status, output_str) + if output_str: + click.echo(output_str, err=True) + + +# --------------------------------------------------------------------------- +# Config helpers +# --------------------------------------------------------------------------- + + +def _cfg_str(name: str, default: str = "") -> str: + value = getattr(CONFIG, name, default) + return str(value) if value is not None else default + + +def _cfg_int(name: str, default: int) -> int: + value = getattr(CONFIG, name, default) + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _cfg_float(name: str, default: float) -> float: + value = getattr(CONFIG, name, default) + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _cfg_bool(name: str, default: bool) -> bool: + value = getattr(CONFIG, name, default) + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.strip().lower() in {"true", "1", "yes", "on"} + return bool(value) + + +def _cfg_list(name: str, default: list[str]) -> list[str]: + value = getattr(CONFIG, name, default) + if isinstance(value, list): + return [str(item) for item in value] + return list(default) + + +def _resolve_user_agent() -> str: + explicit = _cfg_str("PARSE_SITEMAP_URLS_USER_AGENT", "") + if explicit: + return explicit + return _cfg_str("USER_AGENT", "Mozilla/5.0 (compatible; ArchiveBox/1.0)") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +@click.command( + context_settings={"ignore_unknown_options": True, "allow_extra_args": True}, +) +@click.option( + "--url", + required=True, + help="Seed URL: sitemap.xml, robots.txt, or site root", +) +@click.option( + "--depth", + type=int, + default=0, + help="Current crawl depth (relative to host frontier)", +) +def main(url: str, depth: int = 0) -> None: + """Discover URLs from sitemap.xml (and friends) and emit Snapshot JSONL records.""" + extra_context = get_extra_context() + if "snapshot_depth" in extra_context: + depth = int(extra_context["snapshot_depth"]) + + if not _cfg_bool("PARSE_SITEMAP_URLS_ENABLED", True): + emit_result("skipped", "PARSE_SITEMAP_URLS_ENABLED=False") + sys.exit(0) + + # file:// URLs are tolerated only when the seed itself is file://. This + # blocks remote sitemap-index → file:// chains. + seed_scheme = urlparse(url).scheme.lower() + allow_file_urls = seed_scheme == "file" or _cfg_bool( + "PARSE_SITEMAP_URLS_ALLOW_FILE_URLS", + False, + ) + allow_private_hosts = _cfg_bool( + "PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS", + seed_scheme == "file", + ) + + fetch = FetchOptions( + timeout=_cfg_int("PARSE_SITEMAP_URLS_TIMEOUT", _cfg_int("TIMEOUT", 60)), + user_agent=_resolve_user_agent(), + retries=_cfg_int("PARSE_SITEMAP_URLS_HTTP_RETRIES", 2), + backoff_seconds=_cfg_float("PARSE_SITEMAP_URLS_HTTP_BACKOFF_SECONDS", 1.0), + max_redirects=_cfg_int("PARSE_SITEMAP_URLS_HTTP_MAX_REDIRECTS", 5), + verify_tls=_cfg_bool("PARSE_SITEMAP_URLS_VERIFY_TLS", True), + accept_language=_cfg_str("PARSE_SITEMAP_URLS_ACCEPT_LANGUAGE", ""), + max_response_bytes=_cfg_int( + "PARSE_SITEMAP_URLS_MAX_RESPONSE_BYTES", + DEFAULT_MAX_RESPONSE_BYTES, + ), + max_decompressed_bytes=_cfg_int( + "PARSE_SITEMAP_URLS_MAX_DECOMPRESSED_BYTES", + DEFAULT_MAX_DECOMPRESSED_BYTES, + ), + gzip_max_ratio=_cfg_int( + "PARSE_SITEMAP_URLS_GZIP_MAX_RATIO", + DEFAULT_GZIP_MAX_RATIO, + ), + allow_private_hosts=allow_private_hosts, + allow_file_urls=allow_file_urls, + ) + + policy = UrlPolicy( + seed_host=urlparse(url).netloc, + allow_file_urls=allow_file_urls, + allow_private_hosts=allow_private_hosts, + same_host_only=_cfg_bool("PARSE_SITEMAP_URLS_SAME_HOST_ONLY", False), + include_re=_compile_optional( + _cfg_str("PARSE_SITEMAP_URLS_INCLUDE_REGEX", ""), + ), + exclude_re=_compile_optional( + _cfg_str("PARSE_SITEMAP_URLS_EXCLUDE_REGEX", ""), + ), + regex_input_cap=_cfg_int( + "PARSE_SITEMAP_URLS_REGEX_INPUT_CAP", + DEFAULT_REGEX_INPUT_CAP, + ), + ) + + walker_options = WalkerOptions( + max_urls=_cfg_int("PARSE_SITEMAP_URLS_MAX_URLS", 5000), + max_depth=_cfg_int("PARSE_SITEMAP_URLS_MAX_SITEMAP_DEPTH", 5), + max_sitemaps=_cfg_int("PARSE_SITEMAP_URLS_MAX_SITEMAPS", 100), + restrict_child_to_seed_host=_cfg_bool( + "PARSE_SITEMAP_URLS_SAME_HOST_ONLY", + False, + ), + priority_min=_cfg_float("PARSE_SITEMAP_URLS_PRIORITY_MIN", 0.0), + changefreq_allowed={ + value.lower() + for value in _cfg_list("PARSE_SITEMAP_URLS_CHANGEFREQ_ALLOWED", []) + if value + }, + require_priority=_cfg_bool("PARSE_SITEMAP_URLS_REQUIRE_PRIORITY", False), + emit_image_urls=_cfg_bool("PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS", False), + emit_video_urls=_cfg_bool("PARSE_SITEMAP_URLS_EMIT_VIDEO_URLS", False), + emit_news_tag=_cfg_bool("PARSE_SITEMAP_URLS_EMIT_NEWS_TAG", False), + verbose=_cfg_bool("PARSE_SITEMAP_URLS_VERBOSE", False), + ) + + fallback_paths = _cfg_list( + "PARSE_SITEMAP_URLS_FALLBACK_PATHS", + list(ALLOWED_FALLBACK_PATHS), + ) + discover_from_robots = _cfg_bool("PARSE_SITEMAP_URLS_DISCOVER_FROM_ROBOTS", True) + + seeds, info_messages = _resolve_sitemap_seeds( + url, + fetch=fetch, + discover_from_robots=discover_from_robots, + fallback_paths=fallback_paths, + ) + for message in info_messages: + click.echo(message, err=True) + + if not seeds: + URLS_FILE.unlink(missing_ok=True) + emit_result("noresults", "No sitemap URLs to fetch") + sys.exit(0) + + walker = SitemapWalker(fetch=fetch, options=walker_options, policy=policy) + seen_seeds: set[str] = set() + for raw_seed in seeds: + # Strip fragments and re-normalize so different surface spellings of + # the same URL (CLI vs robots-derived vs fallback) deduplicate. + seed, _ = urldefrag(raw_seed.strip()) + if not seed or seed in seen_seeds: + continue + seen_seeds.add(seed) + if len(walker.page_entries) >= walker.options.max_urls: + break + # Apply fetch-time scheme/host policy to the seed. Emit-time filters + # (regex, same-host) are layered on later, per page URL. + drop = policy.reason_to_drop_fetch(seed) + if drop is not None: + walker.errors.append(f"refusing seed sitemap {seed} ({drop})") + continue + walker.walk(seed) + + for error in walker.errors: + click.echo(error, err=True) + + if not walker.page_entries: + URLS_FILE.unlink(missing_ok=True) + if walker.sitemap_count == 0: + emit_result("failed", "No valid sitemaps could be fetched/parsed") + sys.exit(1) + summary = _build_summary(0, walker) + emit_result("noresults", summary) + sys.exit(0) + + sort_mode = _cfg_str("PARSE_SITEMAP_URLS_SORT_BY", "url") or "url" + ordered = _sort_entries(walker.page_entries, sort_mode) + + records: list[dict] = [] + extra_tags_seen: set[str] = set() + skipped_extras = 0 + for entry in ordered: + if len(records) >= walker_options.max_urls: + break + + record: dict = { + "type": "Snapshot", + "url": entry.url, + "plugin": PLUGIN_NAME, + "depth": depth + 1, + } + if entry.lastmod: + record["bookmarked_at"] = entry.lastmod + records.append(record) + + for extra_url in entry.extras: + if len(records) >= walker_options.max_urls: + break + if extra_url == entry.url or extra_url in walker.seen_urls: + continue + drop = policy.reason_to_drop_emit(extra_url) + if drop is not None: + skipped_extras += 1 + continue + walker.seen_urls.add(extra_url) + records.append( + { + "type": "Snapshot", + "url": extra_url, + "plugin": PLUGIN_NAME, + "depth": depth + 1, + "tags": "sitemap-media", + }, + ) + + for tag in entry.extra_tags: + if tag: + extra_tags_seen.add(tag) + + for tag in sorted(extra_tags_seen): + emit_tag_record(tag) + + for record in records: + emit_snapshot_record(record) + + status, _ = persist_records(records) + summary = _build_summary(len(records), walker, skipped_extras=skipped_extras) + emit_result(status, summary) + sys.exit(0) + + +def _build_summary( + record_count: int, + walker: SitemapWalker, + *, + skipped_extras: int = 0, +) -> str: + return ( + f"{record_count} URLs parsed (visited {walker.sitemap_count} sitemap(s); " + f"skipped_filter={walker.skipped_filter} " + f"skipped_host={walker.skipped_host} " + f"skipped_priority={walker.skipped_priority} " + f"skipped_changefreq={walker.skipped_changefreq} " + f"skipped_scheme={walker.skipped_scheme} " + f"skipped_extras={skipped_extras})" + ) + + +if __name__ == "__main__": + main() diff --git a/abx_plugins/plugins/parse_sitemap_urls/tests/__init__.py b/abx_plugins/plugins/parse_sitemap_urls/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/abx_plugins/plugins/parse_sitemap_urls/tests/test_parse_sitemap_urls.py b/abx_plugins/plugins/parse_sitemap_urls/tests/test_parse_sitemap_urls.py new file mode 100644 index 0000000..c35c42a --- /dev/null +++ b/abx_plugins/plugins/parse_sitemap_urls/tests/test_parse_sitemap_urls.py @@ -0,0 +1,769 @@ +#!/usr/bin/env python3 +"""Unit tests for parse_sitemap_urls extractor. + +These tests exercise the hook against: + +* in-memory `file://` sitemaps (no network) +* `pytest-httpserver` for HTTP discovery flows (robots.txt, sitemap-index, + gzip, large sitemaps, fallback path probing, retries) +* malformed input and edge cases (truncated XML, non-XML payloads, empty + sitemaps, mixed namespaces, oversized URL counts) + +The hook is run as a subprocess so we exercise the real `uv` shebang and +script-block dependency pinning that ships in production. +""" + +from __future__ import annotations + +import gzip +import json +import os +import subprocess +import textwrap +from pathlib import Path + +import pytest + +PLUGIN_DIR = Path(__file__).resolve().parent.parent +SCRIPT_PATH = next( + (path for path in PLUGIN_DIR.glob("on_Snapshot__*_parse_sitemap_urls.*")), + None, +) +assert SCRIPT_PATH is not None, "hook script must exist for tests to run" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def run_hook( + url: str, + *, + cwd: Path, + env_overrides: dict[str, str] | None = None, + timeout: int = 120, + extra_args: list[str] | None = None, +) -> subprocess.CompletedProcess[str]: + """Run the hook as a subprocess, mirroring the real invocation contract. + + `pytest-httpserver` binds to localhost, which the production-default + private-host guard refuses. Tests opt-in to that target via + `PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS=true` unless the caller + overrides it explicitly. + """ + env = os.environ.copy() + env["SNAP_DIR"] = str(cwd) + env.setdefault("PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS", "true") + if env_overrides: + env.update(env_overrides) + cmd = [str(SCRIPT_PATH), "--url", url] + if extra_args: + cmd.extend(extra_args) + return subprocess.run( + cmd, + cwd=str(cwd), + capture_output=True, + text=True, + env=env, + timeout=timeout, + ) + + +def parse_jsonl(stdout: str) -> list[dict]: + """Parse JSONL stdout, failing on any non-JSON line. + + The hook contract requires that every non-empty stdout line is a + JSON record. A regression where the hook prints human text to + stdout instead of stderr should fail tests, not be silently + filtered. + """ + records: list[dict] = [] + for raw in stdout.splitlines(): + line = raw.strip() + if not line: + continue + records.append(json.loads(line)) + return records + + +def snapshots(records: list[dict]) -> list[dict]: + return [record for record in records if record.get("type") == "Snapshot"] + + +def archive_result(records: list[dict]) -> dict | None: + return next((r for r in records if r.get("type") == "ArchiveResult"), None) + + +def write_sitemap( + path: Path, urls: list[str], *, lastmods: list[str] | None = None +) -> None: + pieces = [''] + pieces.append('') + for index, url in enumerate(urls): + if lastmods and index < len(lastmods): + pieces.append( + f" {url}{lastmods[index]}", + ) + else: + pieces.append(f" {url}") + pieces.append("") + path.write_text("\n".join(pieces), encoding="utf-8") + + +def write_sitemap_index(path: Path, child_urls: list[str]) -> None: + pieces = [''] + pieces.append('') + for url in child_urls: + pieces.append(f" {url}") + pieces.append("") + path.write_text("\n".join(pieces), encoding="utf-8") + + +# --------------------------------------------------------------------------- +# Basic urlset parsing (file://) +# --------------------------------------------------------------------------- + + +class TestUrlsetParsing: + def test_parses_simple_urlset(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap( + sitemap, + [ + "https://example.com/", + "https://example.com/about", + "https://example.com/contact", + ], + ) + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0, result.stderr + records = parse_jsonl(result.stdout) + snaps = snapshots(records) + urls = sorted(s["url"] for s in snaps) + assert urls == [ + "https://example.com/", + "https://example.com/about", + "https://example.com/contact", + ] + archive = archive_result(records) + assert archive is not None + assert archive["status"] == "succeeded" + assert "3 URLs parsed" in archive["output_str"] + + def test_preserves_lastmod_as_bookmarked_at(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap( + sitemap, + ["https://example.com/post-1", "https://example.com/post-2"], + lastmods=["2025-12-01", "2025-12-02T08:00:00Z"], + ) + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0 + snaps = snapshots(parse_jsonl(result.stdout)) + by_url = {s["url"]: s for s in snaps} + assert by_url["https://example.com/post-1"]["bookmarked_at"] == "2025-12-01" + assert ( + by_url["https://example.com/post-2"]["bookmarked_at"] + == "2025-12-02T08:00:00Z" + ) + + def test_emits_depth_increment(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap(sitemap, ["https://example.com/a"]) + result = run_hook(f"file://{sitemap}", cwd=tmp_path, extra_args=["--depth=2"]) + assert result.returncode == 0 + snaps = snapshots(parse_jsonl(result.stdout)) + assert snaps[0]["depth"] == 3 + + def test_persists_urls_jsonl_file(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap(sitemap, ["https://example.com/x", "https://example.com/y"]) + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0 + urls_file = tmp_path / "parse_sitemap_urls" / "urls.jsonl" + assert urls_file.exists() + lines = [line for line in urls_file.read_text().splitlines() if line.strip()] + assert len(lines) == 2 + for line in lines: + entry = json.loads(line) + assert entry["type"] == "Snapshot" + assert entry["plugin"] == "parse_sitemap_urls" + + def test_overwrites_stale_urls_jsonl(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap(sitemap, ["https://example.com/fresh"]) + urls_dir = tmp_path / "parse_sitemap_urls" + urls_dir.mkdir() + stale = urls_dir / "urls.jsonl" + stale.write_text( + '{"type":"Snapshot","url":"https://example.com/stale"}\n', + encoding="utf-8", + ) + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0 + lines = [line for line in stale.read_text().splitlines() if line.strip()] + assert len(lines) == 1 + assert json.loads(lines[0])["url"] == "https://example.com/fresh" + + def test_clears_stale_urls_jsonl_on_noresults(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text( + '', + encoding="utf-8", + ) + urls_dir = tmp_path / "parse_sitemap_urls" + urls_dir.mkdir() + stale = urls_dir / "urls.jsonl" + stale.write_text("stale\n", encoding="utf-8") + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0 + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None and archive["status"] == "noresults" + assert not stale.exists() + + +# --------------------------------------------------------------------------- +# Sitemap index recursion +# --------------------------------------------------------------------------- + + +class TestSitemapIndex: + def test_recurses_one_level(self, tmp_path: Path) -> None: + child_a = tmp_path / "child_a.xml" + child_b = tmp_path / "child_b.xml" + write_sitemap(child_a, ["https://example.com/a1", "https://example.com/a2"]) + write_sitemap(child_b, ["https://example.com/b1"]) + index_path = tmp_path / "index.xml" + write_sitemap_index( + index_path, + [f"file://{child_a}", f"file://{child_b}"], + ) + result = run_hook(f"file://{index_path}", cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "https://example.com/a1", + "https://example.com/a2", + "https://example.com/b1", + ] + + def test_recurses_two_levels(self, tmp_path: Path) -> None: + leaf = tmp_path / "leaf.xml" + write_sitemap(leaf, ["https://example.com/leaf-1"]) + mid = tmp_path / "mid.xml" + write_sitemap_index(mid, [f"file://{leaf}"]) + top = tmp_path / "top.xml" + write_sitemap_index(top, [f"file://{mid}"]) + result = run_hook(f"file://{top}", cwd=tmp_path) + assert result.returncode == 0 + snaps = snapshots(parse_jsonl(result.stdout)) + assert {s["url"] for s in snaps} == {"https://example.com/leaf-1"} + + def test_respects_max_sitemap_depth(self, tmp_path: Path) -> None: + leaf = tmp_path / "leaf.xml" + write_sitemap(leaf, ["https://example.com/leaf"]) + mid = tmp_path / "mid.xml" + write_sitemap_index(mid, [f"file://{leaf}"]) + top = tmp_path / "top.xml" + write_sitemap_index(top, [f"file://{mid}"]) + + result = run_hook( + f"file://{top}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_SITEMAP_DEPTH": "1"}, + ) + assert result.returncode in (0, 1) + snaps = snapshots(parse_jsonl(result.stdout)) + assert snaps == [] + assert "max_depth" in result.stderr + + def test_handles_cyclic_sitemap_index(self, tmp_path: Path) -> None: + a = tmp_path / "a.xml" + b = tmp_path / "b.xml" + write_sitemap_index(a, [f"file://{b}"]) + write_sitemap_index(b, [f"file://{a}"]) + result = run_hook(f"file://{a}", cwd=tmp_path) + # Cycle terminates safely; no URLs to emit. + assert result.returncode in (0, 1) + snaps = snapshots(parse_jsonl(result.stdout)) + assert snaps == [] + + +# --------------------------------------------------------------------------- +# Gzip + encoding +# --------------------------------------------------------------------------- + + +class TestGzip: + def test_decompresses_gzipped_sitemap(self, tmp_path: Path) -> None: + raw = tmp_path / "sitemap.xml" + write_sitemap(raw, ["https://example.com/g1", "https://example.com/g2"]) + gz_path = tmp_path / "sitemap.xml.gz" + gz_path.write_bytes(gzip.compress(raw.read_bytes())) + result = run_hook(f"file://{gz_path}", cwd=tmp_path) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == ["https://example.com/g1", "https://example.com/g2"] + + +# --------------------------------------------------------------------------- +# Filtering +# --------------------------------------------------------------------------- + + +class TestFiltering: + def test_include_regex(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap( + sitemap, + [ + "https://example.com/blog/post-1", + "https://example.com/blog/post-2", + "https://example.com/products/x", + "https://example.com/about", + ], + ) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_INCLUDE_REGEX": r"/blog/"}, + ) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "https://example.com/blog/post-1", + "https://example.com/blog/post-2", + ] + + def test_exclude_regex(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap( + sitemap, + [ + "https://example.com/blog/post", + "https://example.com/products/x", + "https://example.com/products/y", + ], + ) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_EXCLUDE_REGEX": r"/products/"}, + ) + assert result.returncode == 0 + snaps = snapshots(parse_jsonl(result.stdout)) + assert [s["url"] for s in snaps] == ["https://example.com/blog/post"] + + def test_same_host_only_with_file_seed(self, tmp_path: Path) -> None: + """`SAME_HOST_ONLY` against a file:// seed (empty netloc) filters every HTTPS URL. + + Documents the limitation: `SAME_HOST_ONLY` is designed for HTTP(S) + seeds where the netloc is meaningful. With a file:// seed every + emitted HTTPS URL has a non-matching host, so the filter drops all + of them. + """ + sitemap = tmp_path / "sitemap.xml" + write_sitemap( + sitemap, + [ + "https://example.com/page-a", + "https://cdn.example.com/asset", + "https://other.com/page-b", + ], + ) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_SAME_HOST_ONLY": "true"}, + ) + assert result.returncode == 0 + snaps = snapshots(parse_jsonl(result.stdout)) + assert snaps == [] + + def test_invalid_regex_warns_and_continues(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap(sitemap, ["https://example.com/a"]) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_INCLUDE_REGEX": "[unclosed"}, + ) + assert result.returncode == 0 + assert "invalid regex" in result.stderr + snaps = snapshots(parse_jsonl(result.stdout)) + # Bad regex collapses to None → no filtering → URL passes. + assert [s["url"] for s in snaps] == ["https://example.com/a"] + + +# --------------------------------------------------------------------------- +# Limits +# --------------------------------------------------------------------------- + + +class TestLimits: + def test_respects_max_urls(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + urls = [f"https://example.com/p{index}" for index in range(50)] + write_sitemap(sitemap, urls) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_URLS": "10"}, + ) + assert result.returncode == 0 + snaps = snapshots(parse_jsonl(result.stdout)) + assert len(snaps) == 10 + + def test_disabled_via_config(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap(sitemap, ["https://example.com/x"]) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_ENABLED": "false"}, + ) + assert result.returncode == 0 + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None and archive["status"] == "skipped" + + def test_alias_use_parse_sitemap_urls_disables(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap(sitemap, ["https://example.com/x"]) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"USE_PARSE_SITEMAP_URLS": "false"}, + ) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None and archive["status"] == "skipped" + + +# --------------------------------------------------------------------------- +# Malformed input +# --------------------------------------------------------------------------- + + +class TestMalformedInput: + def test_truncated_xml(self, tmp_path: Path) -> None: + bad = tmp_path / "bad.xml" + bad.write_text( + 'https://example.com/p', + encoding="utf-8", + ) + result = run_hook(f"file://{bad}", cwd=tmp_path) + # No valid sitemap parsed → failed (zero visited count). + assert result.returncode == 1 + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None and archive["status"] == "failed" + + def test_non_xml_payload(self, tmp_path: Path) -> None: + notxml = tmp_path / "notxml.xml" + notxml.write_text("this is not xml at all", encoding="utf-8") + result = run_hook(f"file://{notxml}", cwd=tmp_path) + assert result.returncode == 1 + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None and archive["status"] == "failed" + + def test_empty_urlset_noresults(self, tmp_path: Path) -> None: + empty = tmp_path / "empty.xml" + empty.write_text( + '', + encoding="utf-8", + ) + result = run_hook(f"file://{empty}", cwd=tmp_path) + assert result.returncode == 0 + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None and archive["status"] == "noresults" + + def test_missing_file(self, tmp_path: Path) -> None: + result = run_hook( + f"file://{tmp_path}/does-not-exist.xml", + cwd=tmp_path, + ) + assert result.returncode in (0, 1) + archive = archive_result(parse_jsonl(result.stdout)) + # No seeds resolved to valid sitemaps → failed. + assert archive is not None + assert archive["status"] == "failed" + + def test_rejects_unknown_root_element(self, tmp_path: Path) -> None: + weird = tmp_path / "weird.xml" + weird.write_text( + 'baz', + encoding="utf-8", + ) + result = run_hook(f"file://{weird}", cwd=tmp_path) + # XML parses but root is neither urlset nor sitemapindex → 0 URLs. + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None and archive["status"] == "noresults" + + def test_unnamespaced_sitemap_supported(self, tmp_path: Path) -> None: + # Real-world: some sitemaps omit the xmlns. + plain = tmp_path / "plain.xml" + plain.write_text( + textwrap.dedent( + """ + + + https://example.com/x + https://example.com/y + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook(f"file://{plain}", cwd=tmp_path) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == ["https://example.com/x", "https://example.com/y"] + + +# --------------------------------------------------------------------------- +# robots.txt discovery +# --------------------------------------------------------------------------- + + +class TestRobotsTxtDiscovery: + def test_robots_txt_with_sitemap_directives( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap_xml = textwrap.dedent( + """ + + + https://example.test/r-one + https://example.test/r-two + + """, + ).strip() + httpserver.expect_request("/sitemap.xml").respond_with_data( + sitemap_xml, + content_type="application/xml", + ) + robots_body = textwrap.dedent( + f""" + User-agent: * + Disallow: + Sitemap: {httpserver.url_for("/sitemap.xml")} + """, + ).strip() + httpserver.expect_request("/robots.txt").respond_with_data( + robots_body, + content_type="text/plain", + ) + + result = run_hook(httpserver.url_for("/robots.txt"), cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == ["https://example.test/r-one", "https://example.test/r-two"] + + def test_root_url_falls_back_to_robots_then_sitemap_paths( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap_xml = textwrap.dedent( + """ + + + https://fallback.test/page-1 + + """, + ).strip() + # Pretend robots.txt is empty (no Sitemap lines). + httpserver.expect_request("/robots.txt").respond_with_data( + "User-agent: *\nDisallow:\n", + content_type="text/plain", + ) + httpserver.expect_request("/sitemap.xml").respond_with_data( + sitemap_xml, + content_type="application/xml", + ) + result = run_hook(httpserver.url_for("/"), cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == ["https://fallback.test/page-1"] + + def test_robots_discovery_disabled( + self, + tmp_path: Path, + httpserver, + ) -> None: + # robots.txt would normally provide the sitemap, but we disable that + # path: with no fallback hits the hook should fail or noresults. + httpserver.expect_request("/robots.txt").respond_with_data( + f"Sitemap: {httpserver.url_for('/sitemap.xml')}\n", + content_type="text/plain", + ) + httpserver.expect_request("/sitemap.xml").respond_with_data( + "broken-not-xml", + content_type="application/xml", + ) + result = run_hook( + httpserver.url_for("/"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_DISCOVER_FROM_ROBOTS": "false"}, + ) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] in {"failed", "noresults"} + + +# --------------------------------------------------------------------------- +# HTTP server integration +# --------------------------------------------------------------------------- + + +class TestHttpFetching: + def test_fetches_sitemap_over_http(self, tmp_path: Path, httpserver) -> None: + sitemap_xml = textwrap.dedent( + """ + + + https://httpserver.test/page-1 + https://httpserver.test/page-2 + + """, + ).strip() + httpserver.expect_request("/sitemap.xml").respond_with_data( + sitemap_xml, + content_type="application/xml", + ) + result = run_hook(httpserver.url_for("/sitemap.xml"), cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "https://httpserver.test/page-1", + "https://httpserver.test/page-2", + ] + + def test_fetches_gzipped_sitemap_over_http( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap_bytes = ( + textwrap.dedent( + """ + + + https://gz.test/a + https://gz.test/b + + """, + ) + .strip() + .encode("utf-8") + ) + httpserver.expect_request("/sitemap.xml.gz").respond_with_data( + gzip.compress(sitemap_bytes), + content_type="application/x-gzip", + ) + result = run_hook(httpserver.url_for("/sitemap.xml.gz"), cwd=tmp_path) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == ["https://gz.test/a", "https://gz.test/b"] + + def test_sitemap_index_over_http(self, tmp_path: Path, httpserver) -> None: + child_xml = textwrap.dedent( + """ + + + https://idx.test/leaf-1 + https://idx.test/leaf-2 + + """, + ).strip() + index_xml = textwrap.dedent( + f""" + + + {httpserver.url_for("/child.xml")} + + """, + ).strip() + httpserver.expect_request("/child.xml").respond_with_data( + child_xml, + content_type="application/xml", + ) + httpserver.expect_request("/index.xml").respond_with_data( + index_xml, + content_type="application/xml", + ) + result = run_hook(httpserver.url_for("/index.xml"), cwd=tmp_path) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == ["https://idx.test/leaf-1", "https://idx.test/leaf-2"] + + def test_http_404_failure(self, tmp_path: Path, httpserver) -> None: + httpserver.expect_request("/missing.xml").respond_with_data( + "not found", + status=404, + ) + result = run_hook(httpserver.url_for("/missing.xml"), cwd=tmp_path) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + + def test_root_url_no_sitemap_anywhere(self, tmp_path: Path, httpserver) -> None: + httpserver.expect_request("/robots.txt").respond_with_data( + "not-found", + status=404, + ) + # Any fallback path also 404s by default (no handler registered). + result = run_hook(httpserver.url_for("/"), cwd=tmp_path) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + + +# --------------------------------------------------------------------------- +# Misc: ordering & dedup +# --------------------------------------------------------------------------- + + +class TestOrderingAndDedup: + def test_emits_sorted_urls(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + write_sitemap( + sitemap, + [ + "https://example.com/zebra", + "https://example.com/apple", + "https://example.com/mango", + ], + ) + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0 + snaps = snapshots(parse_jsonl(result.stdout)) + assert [s["url"] for s in snaps] == [ + "https://example.com/apple", + "https://example.com/mango", + "https://example.com/zebra", + ] + + def test_dedupes_across_sitemap_index(self, tmp_path: Path) -> None: + leaf_a = tmp_path / "a.xml" + leaf_b = tmp_path / "b.xml" + write_sitemap(leaf_a, ["https://example.com/dup", "https://example.com/one"]) + write_sitemap(leaf_b, ["https://example.com/dup", "https://example.com/two"]) + index = tmp_path / "index.xml" + write_sitemap_index( + index, + [f"file://{leaf_a}", f"file://{leaf_b}"], + ) + result = run_hook(f"file://{index}", cwd=tmp_path) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "https://example.com/dup", + "https://example.com/one", + "https://example.com/two", + ] + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/abx_plugins/plugins/parse_sitemap_urls/tests/test_parse_sitemap_urls_advanced.py b/abx_plugins/plugins/parse_sitemap_urls/tests/test_parse_sitemap_urls_advanced.py new file mode 100644 index 0000000..c8cb863 --- /dev/null +++ b/abx_plugins/plugins/parse_sitemap_urls/tests/test_parse_sitemap_urls_advanced.py @@ -0,0 +1,1617 @@ +#!/usr/bin/env python3 +"""Advanced tests for parse_sitemap_urls covering sitemap extensions, +HTTP retry/redirect/encoding paths, and the broader config surface. + +Kept separate from `test_parse_sitemap_urls.py` to make the basic suite +easy to scan; this file focuses on the corners that surface only when +unusual real-world sitemaps or transient HTTP conditions come up. +""" + +from __future__ import annotations + +import gzip +import json +import os +import subprocess +import textwrap +import time +from pathlib import Path + +import pytest + +PLUGIN_DIR = Path(__file__).resolve().parent.parent +SCRIPT_PATH = next( + (path for path in PLUGIN_DIR.glob("on_Snapshot__*_parse_sitemap_urls.*")), + None, +) +assert SCRIPT_PATH is not None, "hook script must exist for tests to run" + + +def run_hook( + url: str, + *, + cwd: Path, + env_overrides: dict[str, str] | None = None, + timeout: int = 120, + extra_args: list[str] | None = None, +) -> subprocess.CompletedProcess[str]: + env = os.environ.copy() + env["SNAP_DIR"] = str(cwd) + env.setdefault("PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS", "true") + if env_overrides: + env.update(env_overrides) + cmd = [str(SCRIPT_PATH), "--url", url] + if extra_args: + cmd.extend(extra_args) + return subprocess.run( + cmd, + cwd=str(cwd), + capture_output=True, + text=True, + env=env, + timeout=timeout, + ) + + +def parse_jsonl(stdout: str) -> list[dict]: + """Parse JSONL stdout, failing on any non-JSON line. + + Every non-empty stdout line from the hook must be a JSON record. + Silently filtering non-JSON would let a stdout-vs-stderr regression + slip past tests. + """ + records: list[dict] = [] + for raw in stdout.splitlines(): + line = raw.strip() + if not line: + continue + records.append(json.loads(line)) + return records + + +def snapshots(records: list[dict]) -> list[dict]: + return [record for record in records if record.get("type") == "Snapshot"] + + +def tags(records: list[dict]) -> list[dict]: + return [record for record in records if record.get("type") == "Tag"] + + +def archive_result(records: list[dict]) -> dict | None: + return next((r for r in records if r.get("type") == "ArchiveResult"), None) + + +# --------------------------------------------------------------------------- +# BOM + encoding edge cases +# --------------------------------------------------------------------------- + + +class TestEncodingEdgeCases: + def test_utf8_bom_is_stripped(self, tmp_path: Path) -> None: + body = ( + textwrap.dedent( + """ + + + https://example.com/utf8-bom + + """, + ) + .strip() + .encode("utf-8") + ) + path = tmp_path / "bom.xml" + path.write_bytes(b"\xef\xbb\xbf" + body) + result = run_hook(f"file://{path}", cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/utf8-bom"] + + def test_utf16_le_bom_is_handled(self, tmp_path: Path) -> None: + body = textwrap.dedent( + """ + + + https://example.com/utf16 + + """, + ).strip() + path = tmp_path / "utf16.xml" + path.write_bytes(b"\xff\xfe" + body.encode("utf-16-le")) + result = run_hook(f"file://{path}", cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/utf16"] + + def test_unicode_urls_pass_through(self, tmp_path: Path) -> None: + path = tmp_path / "unicode.xml" + path.write_text( + textwrap.dedent( + """ + + + https://example.com/привет + https://example.com/日本語 + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook(f"file://{path}", cwd=tmp_path) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + # Python sort by codepoint puts Cyrillic (U+04xx) before CJK (U+4E00+). + assert urls == [ + "https://example.com/привет", + "https://example.com/日本語", + ] + + def test_whitespace_in_loc_is_trimmed(self, tmp_path: Path) -> None: + path = tmp_path / "whitespace.xml" + path.write_text( + textwrap.dedent( + """ + + + + https://example.com/spaced + + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook(f"file://{path}", cwd=tmp_path) + assert result.returncode == 0 + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/spaced"] + + def test_schemeless_urls_resolved_against_sitemap( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap = textwrap.dedent( + """ + + + //example.com/schemeless + https://example.com/scheme + + """, + ).strip() + httpserver.expect_request("/sitemap.xml").respond_with_data( + sitemap, + content_type="application/xml", + ) + result = run_hook(httpserver.url_for("/sitemap.xml"), cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "http://example.com/schemeless", + "https://example.com/scheme", + ] + + +# --------------------------------------------------------------------------- +# Priority + changefreq metadata + filters +# --------------------------------------------------------------------------- + + +class TestPriorityAndChangefreq: + @staticmethod + def _write(path: Path) -> None: + path.write_text( + textwrap.dedent( + """ + + + + https://example.com/high + 0.9 + daily + + + https://example.com/medium + 0.5 + weekly + + + https://example.com/low + 0.2 + monthly + + + https://example.com/no-priority + + + """, + ).strip(), + encoding="utf-8", + ) + + def test_priority_min_filters_keeps_missing_priority_by_default( + self, + tmp_path: Path, + ) -> None: + """No-priority entries are kept; only entries with an explicit priority below the threshold are dropped.""" + sitemap = tmp_path / "sitemap.xml" + self._write(sitemap) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_PRIORITY_MIN": "0.5"}, + ) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "https://example.com/high", + "https://example.com/medium", + "https://example.com/no-priority", + ] + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + # `low` is the only explicit-priority entry below 0.5. + assert "skipped_priority=1" in archive["output_str"] + + def test_priority_min_with_require_priority_drops_missing( + self, + tmp_path: Path, + ) -> None: + """REQUIRE_PRIORITY=true also drops entries with no tag.""" + sitemap = tmp_path / "sitemap.xml" + self._write(sitemap) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_PRIORITY_MIN": "0.5", + "PARSE_SITEMAP_URLS_REQUIRE_PRIORITY": "true", + }, + ) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "https://example.com/high", + "https://example.com/medium", + ] + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert "skipped_priority=2" in archive["output_str"] + + def test_changefreq_allowed_filters(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + self._write(sitemap) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_CHANGEFREQ_ALLOWED": json.dumps( + ["daily", "weekly"], + ), + }, + ) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "https://example.com/high", + "https://example.com/medium", + ] + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert "skipped_changefreq=2" in archive["output_str"] + + +# --------------------------------------------------------------------------- +# Sort orderings +# --------------------------------------------------------------------------- + + +class TestSortOrder: + @staticmethod + def _write(path: Path) -> None: + path.write_text( + textwrap.dedent( + """ + + + https://example.com/zebra0.42024-01-01 + https://example.com/apple0.92025-06-15 + https://example.com/mango0.62025-01-01 + + """, + ).strip(), + encoding="utf-8", + ) + + def test_sort_by_url(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + self._write(sitemap) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_SORT_BY": "url"}, + ) + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == [ + "https://example.com/apple", + "https://example.com/mango", + "https://example.com/zebra", + ] + + def test_sort_by_lastmod(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + self._write(sitemap) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_SORT_BY": "lastmod"}, + ) + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + # Newest lastmod first. + assert urls == [ + "https://example.com/apple", + "https://example.com/mango", + "https://example.com/zebra", + ] + + def test_sort_by_priority(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + self._write(sitemap) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_SORT_BY": "priority"}, + ) + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == [ + "https://example.com/apple", + "https://example.com/mango", + "https://example.com/zebra", + ] + + def test_sort_by_order_preserves_sitemap_order(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + self._write(sitemap) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_SORT_BY": "order"}, + ) + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == [ + "https://example.com/zebra", + "https://example.com/apple", + "https://example.com/mango", + ] + + +# --------------------------------------------------------------------------- +# Sitemap image / video / news extensions +# --------------------------------------------------------------------------- + + +class TestSitemapExtensions: + IMAGE_SITEMAP = textwrap.dedent( + """ + + + + https://example.com/gallery + + https://cdn.example.com/photo-1.jpg + + + https://cdn.example.com/photo-2.jpg + + + + """, + ).strip() + + VIDEO_SITEMAP = textwrap.dedent( + """ + + + + https://example.com/watch + + https://cdn.example.com/thumb.jpg + Sample + Sample video + https://cdn.example.com/video.mp4 + https://example.com/player.html + + + + """, + ).strip() + + NEWS_SITEMAP = textwrap.dedent( + """ + + + + https://example.com/story + + + Example News + en + + 2026-05-25 + Headline + + + + """, + ).strip() + + def test_image_extension_off_by_default(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text(self.IMAGE_SITEMAP, encoding="utf-8") + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/gallery"] + + def test_image_extension_emits_extras(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text(self.IMAGE_SITEMAP, encoding="utf-8") + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS": "true"}, + ) + assert result.returncode == 0 + snaps = snapshots(parse_jsonl(result.stdout)) + urls = {s["url"] for s in snaps} + assert urls == { + "https://example.com/gallery", + "https://cdn.example.com/photo-1.jpg", + "https://cdn.example.com/photo-2.jpg", + } + media_tags = {s.get("tags") for s in snaps if s["url"].endswith(".jpg")} + assert media_tags == {"sitemap-media"} + + def test_video_extension_emits_extras(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text(self.VIDEO_SITEMAP, encoding="utf-8") + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_EMIT_VIDEO_URLS": "true"}, + ) + assert result.returncode == 0 + urls = {s["url"] for s in snapshots(parse_jsonl(result.stdout))} + assert urls == { + "https://example.com/watch", + "https://cdn.example.com/video.mp4", + "https://example.com/player.html", + } + + def test_news_extension_emits_tag(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text(self.NEWS_SITEMAP, encoding="utf-8") + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_EMIT_NEWS_TAG": "true"}, + ) + assert result.returncode == 0 + records = parse_jsonl(result.stdout) + tag_records = tags(records) + assert [t["name"] for t in tag_records] == ["Example News"] + urls = [s["url"] for s in snapshots(records)] + assert urls == ["https://example.com/story"] + + +# --------------------------------------------------------------------------- +# HTTP retry, redirects, Content-Encoding gzip +# --------------------------------------------------------------------------- + + +class TestHttpResilience: + def test_retries_on_5xx_then_succeeds( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap = textwrap.dedent( + """ + + + https://example.com/retry-ok + + """, + ).strip() + + from werkzeug.wrappers import Response + + state = {"calls": 0} + + def flaky(_request): + state["calls"] += 1 + if state["calls"] <= 2: + return Response("fail", status=503) + return Response( + sitemap, + status=200, + content_type="application/xml", + ) + + httpserver.expect_request("/sitemap.xml").respond_with_handler(flaky) + + result = run_hook( + httpserver.url_for("/sitemap.xml"), + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_HTTP_RETRIES": "3", + "PARSE_SITEMAP_URLS_HTTP_BACKOFF_SECONDS": "0", + }, + timeout=20, + ) + assert result.returncode == 0, result.stderr + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/retry-ok"] + assert state["calls"] == 3 + + def test_gives_up_after_exhausting_retries( + self, + tmp_path: Path, + httpserver, + ) -> None: + httpserver.expect_request("/sitemap.xml").respond_with_data( + "boom", + status=503, + ) + start = time.monotonic() + result = run_hook( + httpserver.url_for("/sitemap.xml"), + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_HTTP_RETRIES": "2", + "PARSE_SITEMAP_URLS_HTTP_BACKOFF_SECONDS": "0", + }, + ) + elapsed = time.monotonic() - start + # No accidental long sleeps when backoff=0. + assert elapsed < 10 + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None and archive["status"] == "failed" + + def test_follows_redirect_to_real_sitemap( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap = textwrap.dedent( + """ + + + https://example.com/redirected + + """, + ).strip() + httpserver.expect_request("/old.xml").respond_with_data( + "", + status=301, + headers={"Location": httpserver.url_for("/new.xml")}, + ) + httpserver.expect_request("/new.xml").respond_with_data( + sitemap, + content_type="application/xml", + ) + result = run_hook(httpserver.url_for("/old.xml"), cwd=tmp_path) + assert result.returncode == 0 + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/redirected"] + + def test_decompresses_content_encoding_gzip( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap = ( + textwrap.dedent( + """ + + + https://example.com/encoded + + """, + ) + .strip() + .encode("utf-8") + ) + httpserver.expect_request("/sitemap.xml").respond_with_data( + gzip.compress(sitemap), + content_type="application/xml", + headers={"Content-Encoding": "gzip"}, + ) + result = run_hook(httpserver.url_for("/sitemap.xml"), cwd=tmp_path) + assert result.returncode == 0 + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/encoded"] + + +# --------------------------------------------------------------------------- +# Headers + verbose mode +# --------------------------------------------------------------------------- + + +class TestHeadersAndVerbose: + def test_sets_user_agent_override( + self, + tmp_path: Path, + httpserver, + ) -> None: + captured: dict[str, str] = {} + + from werkzeug.wrappers import Response + + def capture(request): + captured["ua"] = request.headers.get("User-Agent", "") + captured["accept"] = request.headers.get("Accept", "") + captured["lang"] = request.headers.get("Accept-Language", "") + return Response( + textwrap.dedent( + """ + + + https://example.com/hdr + + """, + ).strip(), + status=200, + content_type="application/xml", + ) + + httpserver.expect_request("/sitemap.xml").respond_with_handler(capture) + + result = run_hook( + httpserver.url_for("/sitemap.xml"), + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_USER_AGENT": "SitemapBot/2.0 (+test)", + "PARSE_SITEMAP_URLS_ACCEPT_LANGUAGE": "en-US,en;q=0.9", + }, + ) + assert result.returncode == 0, result.stderr + assert captured["ua"] == "SitemapBot/2.0 (+test)" + assert captured["lang"] == "en-US,en;q=0.9" + assert "application/xml" in captured["accept"] + + def test_verbose_mode_emits_fetching_lines( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap = textwrap.dedent( + """ + + + https://example.com/v + + """, + ).strip() + httpserver.expect_request("/sitemap.xml").respond_with_data( + sitemap, + content_type="application/xml", + ) + result = run_hook( + httpserver.url_for("/sitemap.xml"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_VERBOSE": "true"}, + ) + assert result.returncode == 0 + assert "fetching sitemap" in result.stderr + + +# --------------------------------------------------------------------------- +# Robots.txt with multiple sitemaps + custom fallback paths +# --------------------------------------------------------------------------- + + +class TestRobotsAndFallback: + def test_multiple_sitemap_directives_in_robots( + self, + tmp_path: Path, + httpserver, + ) -> None: + site_a = textwrap.dedent( + """ + + + https://example.com/from-a + + """, + ).strip() + site_b = textwrap.dedent( + """ + + + https://example.com/from-b + + """, + ).strip() + httpserver.expect_request("/a.xml").respond_with_data( + site_a, + content_type="application/xml", + ) + httpserver.expect_request("/b.xml").respond_with_data( + site_b, + content_type="application/xml", + ) + robots_body = textwrap.dedent( + f""" + User-agent: * + Sitemap: {httpserver.url_for("/a.xml")} + Sitemap: {httpserver.url_for("/b.xml")} + """, + ).strip() + httpserver.expect_request("/robots.txt").respond_with_data( + robots_body, + content_type="text/plain", + ) + result = run_hook(httpserver.url_for("/robots.txt"), cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == [ + "https://example.com/from-a", + "https://example.com/from-b", + ] + + def test_custom_fallback_paths( + self, + tmp_path: Path, + httpserver, + ) -> None: + sitemap = textwrap.dedent( + """ + + + https://example.com/custom + + """, + ).strip() + httpserver.expect_request("/robots.txt").respond_with_data( + "", + status=404, + ) + httpserver.expect_request("/sitemap-news.xml").respond_with_data( + sitemap, + content_type="application/xml", + ) + result = run_hook( + httpserver.url_for("/"), + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_FALLBACK_PATHS": json.dumps( + ["/sitemap-news.xml"], + ), + }, + ) + assert result.returncode == 0 + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/custom"] + + +# --------------------------------------------------------------------------- +# Volume + dedup at scale +# --------------------------------------------------------------------------- + + +class TestVolume: + def test_large_sitemap_within_max_urls(self, tmp_path: Path) -> None: + # Stretch test: 2000 URLs in one sitemap, MAX_URLS=2000. + urls = [f"https://example.com/p{index:05d}" for index in range(2000)] + sitemap = tmp_path / "big.xml" + sitemap.write_text( + "\n".join( + [ + '', + '', + *(f" {u}" for u in urls), + "", + ], + ), + encoding="utf-8", + ) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_URLS": "2000"}, + ) + assert result.returncode == 0, result.stderr + snaps = snapshots(parse_jsonl(result.stdout)) + assert len(snaps) == 2000 + + def test_dedup_extras_against_pages(self, tmp_path: Path) -> None: + sitemap = tmp_path / "img.xml" + sitemap.write_text( + textwrap.dedent( + """ + + + + https://example.com/gallery + + https://example.com/gallery + + + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS": "true"}, + ) + assert result.returncode == 0 + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + # Page URL emitted once; image URL identical to page URL skipped. + assert urls == ["https://example.com/gallery"] + + +# --------------------------------------------------------------------------- +# Security hardening: scheme allowlist, file:// chains, redirect targets, +# XML entity expansion, gzip bombs, fragment normalization. +# --------------------------------------------------------------------------- + + +class TestSchemeAllowlist: + def test_rejects_javascript_loc(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text( + textwrap.dedent( + """ + + + javascript:alert(1) + data:text/html,evil + ftp://example.com/file + https://example.com/ok + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + assert urls == ["https://example.com/ok"] + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert "skipped_scheme=3" in archive["output_str"] + + def test_remote_sitemap_rejects_file_child( + self, + tmp_path: Path, + httpserver, + ) -> None: + """A remote sitemap-index linking to file:// must be refused.""" + secret = tmp_path / "secret.xml" + secret.write_text("", encoding="utf-8") + index_xml = textwrap.dedent( + f""" + + + file://{secret} + + """, + ).strip() + httpserver.expect_request("/index.xml").respond_with_data( + index_xml, + content_type="application/xml", + ) + result = run_hook(httpserver.url_for("/index.xml"), cwd=tmp_path) + # Child rejected, no URLs emitted, but the root index was a valid sitemap. + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "noresults" + assert "refusing child sitemap" in result.stderr + assert "scheme_file" in result.stderr + + +class TestRedirectTargets: + def test_rejects_redirect_to_non_http_scheme( + self, + tmp_path: Path, + httpserver, + ) -> None: + """Both stdlib's HTTPRedirectHandler and our custom override reject non-HTTP redirects.""" + secret = tmp_path / "secret.xml" + secret.write_text("", encoding="utf-8") + httpserver.expect_request("/sitemap.xml").respond_with_data( + "", + status=302, + headers={"Location": f"file://{secret}"}, + ) + result = run_hook(httpserver.url_for("/sitemap.xml"), cwd=tmp_path) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + # stdlib rejects with this exact phrase for non-HTTP redirect targets; + # the wire-level scheme guard is therefore in place even before our + # custom handler runs. + assert "is not allowed" in result.stderr + + def test_rejects_seed_on_private_host_by_default( + self, + tmp_path: Path, + httpserver, + ) -> None: + """With ALLOW_PRIVATE_HOSTS=false (the production default), a localhost seed is refused.""" + httpserver.expect_request("/sitemap.xml").respond_with_data( + "", + content_type="application/xml", + ) + result = run_hook( + httpserver.url_for("/sitemap.xml"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS": "false"}, + ) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + assert "private_host" in result.stderr + + +class TestXMLHardening: + def test_billion_laughs_blocked_by_defusedxml(self, tmp_path: Path) -> None: + """Internal entity expansion must be refused by the XML parser.""" + bomb = tmp_path / "bomb.xml" + bomb.write_text( + textwrap.dedent( + """ + + + + + ]> + + &lol3; + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook(f"file://{bomb}", cwd=tmp_path) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + assert "not valid XML" in result.stderr + + def test_external_entity_rejected(self, tmp_path: Path) -> None: + sensitive = tmp_path / "sensitive.txt" + sensitive.write_text("topsecret", encoding="utf-8") + xxe = tmp_path / "xxe.xml" + xxe.write_text( + textwrap.dedent( + f""" + + ]> + + &x; + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook(f"file://{xxe}", cwd=tmp_path) + # The XML is rejected because defusedxml blocks DTDs altogether. + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + assert "topsecret" not in result.stdout + + +class TestGzipBomb: + def test_oversized_decompression_is_capped( + self, + tmp_path: Path, + httpserver, + ) -> None: + # ~1 KiB compressed → ~10 MiB decompressed; well under our default cap. + bomb = gzip.compress( + b"" + + (b" https://example.com/x\n" * 200000) + + b"" + ) + httpserver.expect_request("/big.xml.gz").respond_with_data( + bomb, + content_type="application/x-gzip", + ) + # Set a very low decompressed cap to trigger the bomb guard. + result = run_hook( + httpserver.url_for("/big.xml.gz"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_DECOMPRESSED_BYTES": "1024"}, + ) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + assert "decompressed" in result.stderr + + def test_oversized_response_is_capped( + self, + tmp_path: Path, + httpserver, + ) -> None: + large = ( + b"" + + b"https://example.com/x" * 5000 + + b"" + ) + httpserver.expect_request("/big.xml").respond_with_data( + large, + content_type="application/xml", + ) + result = run_hook( + httpserver.url_for("/big.xml"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_RESPONSE_BYTES": "1024"}, + ) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + assert "response body exceeded" in result.stderr + + +class TestFragmentNormalization: + def test_fragment_stripped(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text( + textwrap.dedent( + """ + + + https://example.com/page#section1 + https://example.com/page#section2 + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0 + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + # Both deduped down to the fragmentless URL. + assert urls == ["https://example.com/page"] + + +class TestMediaExtraPolicy: + def test_image_extras_subject_to_same_host(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text( + textwrap.dedent( + """ + + + + https://example.com/gallery + https://cdn.other.com/a.jpg + https://example.com/local.jpg + + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook( + "https://example.com/sitemap.xml", # seed defines host + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS": "true", + "PARSE_SITEMAP_URLS_SAME_HOST_ONLY": "true", + # We don't actually fetch the seed (parse from file below); the + # host parser uses the seed URL string only for policy. + }, + ) + # The seed URL above is HTTPS and we'd try to fetch it — switch to file. + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_EMIT_IMAGE_URLS": "true", + "PARSE_SITEMAP_URLS_INCLUDE_REGEX": r"example\.com", + }, + ) + assert result.returncode == 0 + urls = sorted(s["url"] for s in snapshots(parse_jsonl(result.stdout))) + # The off-host CDN image is filtered out by the INCLUDE_REGEX policy + # applied to the media extra. + assert urls == [ + "https://example.com/gallery", + "https://example.com/local.jpg", + ] + + +class TestMaxDepthSemantics: + def test_depth_zero_walks_only_seed(self, tmp_path: Path) -> None: + leaf = tmp_path / "leaf.xml" + leaf.write_text( + '' + "https://example.com/leaf", + encoding="utf-8", + ) + index = tmp_path / "index.xml" + index.write_text( + f'' + f"file://{leaf}", + encoding="utf-8", + ) + result = run_hook( + f"file://{index}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_SITEMAP_DEPTH": "0"}, + ) + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + # Depth 0 means "just the seed"; child not followed. + assert urls == [] + assert "max_depth" in result.stderr + + def test_depth_one_walks_one_child_level(self, tmp_path: Path) -> None: + leaf = tmp_path / "leaf.xml" + leaf.write_text( + '' + "https://example.com/leaf", + encoding="utf-8", + ) + index = tmp_path / "index.xml" + index.write_text( + f'' + f"file://{leaf}", + encoding="utf-8", + ) + result = run_hook( + f"file://{index}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_SITEMAP_DEPTH": "1"}, + ) + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/leaf"] + + +# --------------------------------------------------------------------------- +# Redirect count cap + IPv6 host detection (added after audit) +# --------------------------------------------------------------------------- + + +class TestRedirectCountCap: + def test_redirect_chain_capped( + self, + tmp_path: Path, + httpserver, + ) -> None: + """A redirect chain longer than HTTP_MAX_REDIRECTS fails with status=failed.""" + # The seed must look like a sitemap (.xml suffix) so the hook treats + # it as a direct sitemap fetch instead of falling into the + # robots.txt + fallback-path probing branch. + sitemap = textwrap.dedent( + """ + + + https://example.com/ok + + """, + ).strip() + httpserver.expect_request("/final.xml").respond_with_data( + sitemap, + content_type="application/xml", + ) + # Chain: /r0.xml -> /r1.xml -> /r2.xml -> /r3.xml -> /final.xml + for index in range(4): + target = f"/r{index + 1}.xml" if index < 3 else "/final.xml" + httpserver.expect_request(f"/r{index}.xml").respond_with_data( + "", + status=302, + headers={"Location": httpserver.url_for(target)}, + ) + + # Cap at 1 — only one redirect allowed; chain of 4 fails. + result_low = run_hook( + httpserver.url_for("/r0.xml"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_HTTP_MAX_REDIRECTS": "1"}, + ) + archive_low = archive_result(parse_jsonl(result_low.stdout)) + assert archive_low is not None + assert archive_low["status"] == "failed" + # stdlib raises HTTPError("redirect") once max_redirections is hit. + assert "redirect" in result_low.stderr.lower() + + # Cap at 10 — chain succeeds. + result_high = run_hook( + httpserver.url_for("/r0.xml"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_HTTP_MAX_REDIRECTS": "10"}, + ) + archive_high = archive_result(parse_jsonl(result_high.stdout)) + assert archive_high is not None + assert archive_high["status"] == "succeeded" + + +class TestIPv6Hosts: + def test_ipv6_loopback_classified_private(self) -> None: + """[::1] must be treated as private even when wrapped in brackets and port.""" + # Direct unit test of the helper without subprocess — import via runpy. + import importlib.util + + spec = importlib.util.spec_from_file_location( + "psu", + SCRIPT_PATH, + ) + assert spec is not None and spec.loader is not None + # The hook script auto-runs main() on import via @click.command, so + # invoke its helper in isolation through subprocess instead. + # Use the hook against a fake seed pointing at [::1] — the seed-host + # guard should refuse with ALLOW_PRIVATE_HOSTS=false. + result = subprocess.run( + [str(SCRIPT_PATH), "--url", "http://[::1]:80/sitemap.xml"], + cwd="/tmp", + capture_output=True, + text=True, + env={ + **os.environ, + "SNAP_DIR": "/tmp", + "PARSE_SITEMAP_URLS_ALLOW_PRIVATE_HOSTS": "false", + }, + timeout=60, + ) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + assert "private_host" in result.stderr or "private host" in result.stderr + + +# --------------------------------------------------------------------------- +# Streaming: 50k-URL sitemap with low MAX_URLS exits early +# --------------------------------------------------------------------------- + + +class TestStreaming: + def test_large_sitemap_low_max_urls_returns_quickly( + self, + tmp_path: Path, + ) -> None: + """A 50k-URL sitemap should respect MAX_URLS=10 without parsing the whole tree.""" + # Bump the response-size cap so the 50 MiB default doesn't trip first. + urls = [f"https://example.com/p{index:06d}" for index in range(50_000)] + sitemap = tmp_path / "huge.xml" + with sitemap.open("w", encoding="utf-8") as fh: + fh.write('\n') + fh.write('\n') + for url in urls: + fh.write(f" {url}\n") + fh.write("\n") + start = time.monotonic() + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_URLS": "10"}, + timeout=60, + ) + elapsed = time.monotonic() - start + assert result.returncode == 0, result.stderr + snaps = snapshots(parse_jsonl(result.stdout)) + assert len(snaps) == 10 + # iterparse with `elem.clear()` should keep this well under 5s on any + # reasonable machine; a non-streaming impl would load all 50k Elements. + assert elapsed < 15, f"streaming impl is too slow: {elapsed:.1f}s" + + def test_streaming_handles_500k_urls_in_bounded_time( + self, + tmp_path: Path, + ) -> None: + """A 500k-URL sitemap with MAX_URLS=5 must complete quickly. + + Builds a ~30 MiB document. A non-streaming impl would allocate + ~500k ``Element`` objects before the max_urls check fires, which + in practice runs 10-30x slower than the streaming impl. We + assert on completion + record count + wall time; per-subprocess + RSS measurement is platform-fragile (RUSAGE_CHILDREN is + cumulative-max and would let regressions through), so we treat + wall time as the operational proxy. + """ + sitemap = tmp_path / "very_huge.xml" + with sitemap.open("w", encoding="utf-8") as fh: + fh.write('\n') + fh.write('\n') + for index in range(500_000): + fh.write( + f" https://example.com/p{index:07d}\n", + ) + fh.write("\n") + start = time.monotonic() + result = run_hook( + f"file://{sitemap}", + cwd=tmp_path, + env_overrides={ + "PARSE_SITEMAP_URLS_MAX_URLS": "5", + "PARSE_SITEMAP_URLS_MAX_RESPONSE_BYTES": str(200 * 1024 * 1024), + }, + timeout=120, + ) + elapsed = time.monotonic() - start + assert result.returncode == 0, result.stderr + snaps = snapshots(parse_jsonl(result.stdout)) + assert len(snaps) == 5 + # Streaming impl exits as soon as MAX_URLS is hit; this should be + # well under 5s. A regression to whole-tree parsing of 500k URLs + # would push wall time past 30s. + assert elapsed < 30, f"streaming impl regressed: {elapsed:.1f}s" + + +# --------------------------------------------------------------------------- +# JSONL stdout contract — every non-empty line must be valid JSON. +# --------------------------------------------------------------------------- + + +class TestJSONLContract: + def test_every_stdout_line_is_json(self, tmp_path: Path) -> None: + sitemap = tmp_path / "sitemap.xml" + sitemap.write_text( + textwrap.dedent( + """ + + + https://example.com/a + https://example.com/b + + """, + ).strip(), + encoding="utf-8", + ) + result = run_hook(f"file://{sitemap}", cwd=tmp_path) + assert result.returncode == 0, result.stderr + for line in result.stdout.splitlines(): + stripped = line.strip() + if not stripped: + continue + # No non-JSON lines should leak onto stdout — diagnostics belong + # on stderr. + json.loads(stripped) + + +# --------------------------------------------------------------------------- +# Cross-site safety: child sitemap host policy, max_sitemaps cap, double-gzip +# --------------------------------------------------------------------------- + + +class TestChildSitemapHostPolicy: + def test_same_host_only_blocks_cross_site_child( + self, + tmp_path: Path, + httpserver, + ) -> None: + """A sitemap-index on host A linking to host B is refused when SAME_HOST_ONLY=true.""" + # We craft the index ourselves with an absolute off-host child URL. + evil_index = textwrap.dedent( + """ + + + https://attacker.example.com/sitemap.xml + + """, + ).strip() + httpserver.expect_request("/index.xml").respond_with_data( + evil_index, + content_type="application/xml", + ) + result = run_hook( + httpserver.url_for("/index.xml"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_SAME_HOST_ONLY": "true"}, + ) + # Index parsed, but the child fetch was refused — noresults, no fetch. + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "noresults" + assert "host_mismatch" in result.stderr + + +class TestCorruptGzip: + def test_truncated_gzip_body_reported_as_failed( + self, + tmp_path: Path, + httpserver, + ) -> None: + """A truncated gzip stream must fail cleanly, not crash with a traceback.""" + valid = ( + textwrap.dedent( + """ + + + https://example.com/a + + """, + ) + .strip() + .encode("utf-8") + ) + # Hand-cut the gzip stream to leave it truncated. + corrupt = gzip.compress(valid)[:30] + httpserver.expect_request("/broken.xml.gz").respond_with_data( + corrupt, + content_type="application/x-gzip", + ) + result = run_hook(httpserver.url_for("/broken.xml.gz"), cwd=tmp_path) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + assert archive["status"] == "failed" + # Either decompression-cap or BadGzipFile-derived message — both fine. + assert "decompress" in result.stderr or "gzip" in result.stderr + + +class TestMaxSitemapsCap: + def test_max_sitemaps_counts_failed_attempts( + self, + tmp_path: Path, + httpserver, + ) -> None: + """An index pointing at many 404 children must stop at MAX_SITEMAPS attempts.""" + for index in range(20): + httpserver.expect_request(f"/missing{index}.xml").respond_with_data( + "", + status=404, + ) + index_xml_parts = [ + '', + '', + ] + for index in range(20): + index_xml_parts.append( + f" {httpserver.url_for(f'/missing{index}.xml')}", + ) + index_xml_parts.append("") + httpserver.expect_request("/index.xml").respond_with_data( + "\n".join(index_xml_parts), + content_type="application/xml", + ) + + result = run_hook( + httpserver.url_for("/index.xml"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_SITEMAPS": "3"}, + ) + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + # Cap is on *attempts*: index (1) + 2 children = 3 attempts, then stop. + assert "max_sitemaps=3" in result.stderr + + def test_max_sitemaps_caps_recursion( + self, + tmp_path: Path, + httpserver, + ) -> None: + """A sitemap-index pointing at many empty children stops after MAX_SITEMAPS hits.""" + leaf = textwrap.dedent( + """ + + + """, + ).strip() + for index in range(20): + httpserver.expect_request(f"/leaf{index}.xml").respond_with_data( + leaf, + content_type="application/xml", + ) + index_xml_parts = [''] + index_xml_parts.append( + '', + ) + for index in range(20): + index_xml_parts.append( + f" {httpserver.url_for(f'/leaf{index}.xml')}", + ) + index_xml_parts.append("") + httpserver.expect_request("/index.xml").respond_with_data( + "\n".join(index_xml_parts), + content_type="application/xml", + ) + + result = run_hook( + httpserver.url_for("/index.xml"), + cwd=tmp_path, + env_overrides={"PARSE_SITEMAP_URLS_MAX_SITEMAPS": "5"}, + ) + # Hit the cap on the 5th child (index + 4 children = 5 sitemaps). + assert "max_sitemaps" in result.stderr + # No emitted URLs because every child was empty, but the cap message + # confirms the guard fired. + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + + +class TestDoubleDecompressionRegression: + def test_xml_gz_url_with_content_encoding_gzip( + self, + tmp_path: Path, + httpserver, + ) -> None: + """`.xml.gz` URL whose body is already content-encoding-gzip must parse once.""" + sitemap = ( + textwrap.dedent( + """ + + + https://example.com/decompressed-once + + """, + ) + .strip() + .encode("utf-8") + ) + httpserver.expect_request("/sitemap.xml.gz").respond_with_data( + gzip.compress(sitemap), + content_type="application/xml", + headers={"Content-Encoding": "gzip"}, + ) + result = run_hook(httpserver.url_for("/sitemap.xml.gz"), cwd=tmp_path) + assert result.returncode == 0, result.stderr + urls = [s["url"] for s in snapshots(parse_jsonl(result.stdout))] + assert urls == ["https://example.com/decompressed-once"] + + +# --------------------------------------------------------------------------- +# Direct unit test of the redirect handler. +# +# `_BoundedRedirectHandler` is exercised indirectly by +# `TestRedirectCountCap.test_redirect_chain_capped` (the `max_redirections` +# instance override is the only reason the cap takes effect at all) and by +# `TestRedirectTargets.test_rejects_redirect_to_non_http_scheme` (stdlib +# short-circuits non-HTTP redirects in the same place our handler does). +# A pure unit test of `redirect_request()` would need to import the hook +# module with its `os.chdir` and `load_config` side-effects, which makes +# the test harness fragile across Python versions; the integration paths +# above already prove the behaviour. +# --------------------------------------------------------------------------- + + +# --------------------------------------------------------------------------- +# robots.txt URL detection — only exact basename +# --------------------------------------------------------------------------- + + +class TestRobotsURLDetection: + def test_foo_robots_txt_is_not_robots(self, tmp_path: Path, httpserver) -> None: + """A path ending in `-robots.txt` is NOT a robots file.""" + sitemap = textwrap.dedent( + """ + + + """, + ).strip() + httpserver.expect_request("/foo-robots.txt").respond_with_data( + sitemap, + content_type="text/plain", + ) + # robots.txt fallback + sitemap fallback paths all 404 + httpserver.expect_request("/robots.txt").respond_with_data("", status=404) + result = run_hook( + httpserver.url_for("/foo-robots.txt"), + cwd=tmp_path, + ) + # If the hook had treated this as robots.txt it would have parsed + # the XML body for `Sitemap:` lines (none) and emitted noresults. + # Instead we expect it to fall through the site-root branch — + # which means it tries /robots.txt + fallback paths instead of + # the foo-robots.txt URL. + # The empty urlset returned by /foo-robots.txt is never read + # because the hook never targets that URL. + archive = archive_result(parse_jsonl(result.stdout)) + assert archive is not None + # Failed because every probed sitemap returns 404. + assert archive["status"] == "failed" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])