Skip to content

feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73

Draft
malteos wants to merge 2 commits into
feat/warc-by-cdxfrom
feat/warc-range-sources
Draft

feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73
malteos wants to merge 2 commits into
feat/warc-by-cdxfrom
feat/warc-range-sources

Conversation

@malteos

@malteos malteos commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Summary

Makes the WARC-repackaging command's range-job stage pluggable behind a RangeJobSource
abstraction, and renames the command warc_by_cdxrepackage (it now repackages WARC ranges
from several sources, not just CDX).

Range jobs (a WARC file + byte range) can now come from:

--target-source --engine Where range jobs come from
cdx CDX index files (local or S3, via fsspec)
sql athena CC columnar index via AWS Athena
sql duckdb CC columnar index parquet on S3 via DuckDB (read_parquet)
csv a range-jobs CSV/TSV (e.g. produced by a previous run)

Athena and DuckDB share one SQL core (sources/sql_base.py); they differ only in the FROM clause
and execution. The pipeline orchestrator now owns queueing, the record limit, counting, and
stop-sentinel emission (in a finally) — which also fixes a latent bug where a source raising
mid-run left the WARC readers hung forever. Each source owns its own stage-1 client/connection;
WARCFilter manages only the read/write S3 clients.

Base branch: this PR targets feat/warc-by-cdx (PR #54), not main.

CLI usage

Global flags (--crawl, --limit) come before the repackage subcommand; source flags come after.

CDX files (default)

cdxt repackage \
    --target-source cdx \
    --cdx-path filtered_CC-MAIN-2024-30.cdx.gz \
    --prefix ./out/EXAMPLE \
    --warc-download-prefix https://data.commoncrawl.org
# multiple indices via glob:
cdxt repackage --target-source cdx --cdx-path s3://bucket/cdx/ --cdx-glob '*.cdx.gz' --prefix ./out/EXAMPLE

SQL — Athena (guided by hostnames/domains, pruned by crawl)

cdxt --crawl CC-MAIN-2026-17 repackage \
    --target-source sql --engine athena \
    --hostnames commoncrawl.org www.commoncrawl.org \
    --athena-database ccindex \
    --athena-s3-output s3://my-bucket/athena-results/ \
    --prefix s3://my-bucket/out/CC \
    --warc-download-prefix s3://commoncrawl

The guided filter matches on url_host_name (exact host) via --hostnames and/or
url_host_registered_domain via --domains (which also covers subdomains). They can be combined
(predicates are OR-ed):

# every host under the example.com registered domain, plus one exact extra host
cdxt --crawl CC-MAIN-2026-17 repackage \
    --target-source sql --engine duckdb \
    --domains example.com \
    --hostnames blog.example.org \
    --prefix ./out/EX

SQL — DuckDB (reads the public parquet directly; no Athena charge)

cdxt --crawl CC-MAIN-2026-17 repackage \
    --target-source sql --engine duckdb \
    --hostnames commoncrawl.org \
    --prefix ./out/CC
# requires the optional dependency:  pip install cdx_toolkit[duckdb]

SQL — raw query (power users)

The query must SELECT warc_filename, warc_record_offset, warc_record_length:

cdxt repackage \
    --target-source sql --engine athena \
    --query "SELECT warc_filename, warc_record_offset, warc_record_length
             FROM ccindex
             WHERE subset = 'warc'
               AND crawl = 'CC-MAIN-2026-17'
               AND url_host_registered_domain = 'commoncrawl.org'
               AND content_mime_type = 'application/pdf'
             LIMIT 5000" \
    --athena-database ccindex \
    --athena-s3-output s3://my-bucket/athena-results/ \
    --prefix ./out/PDFS \
    --confirm-cost
# or load it from a file with --query-file ./my_query.sql

CSV (consume a previously materialized range-jobs file)

cdxt repackage \
    --target-source csv \
    --csv-path ranges.csv \
    --prefix ./out/CC \
    --warc-download-prefix https://data.commoncrawl.org

Cost guard

SQL index scans bill by data scanned (Athena: ~$5/TB), so repackage prompts before a potentially
expensive query. It runs without a prompt only when the query is restricted to ≤ 10 crawls.
Otherwise (no --crawl → all crawls, > 10 crawls, or a raw --query whose pruning can't be
verified) it asks for confirmation; in a non-interactive shell it aborts unless --confirm-cost is
passed. cdx and csv sources never prompt.

WARNING  This duckdb query is not restricted to specific crawls ... and may scan ALL crawls.
Index SQL scans can be expensive (Athena bills per TB scanned). Proceed? [y/N]

Range-jobs CSV (materialization)

Any source can write its resolved range jobs to a CSV with --range-jobs-output. Add --no-fetch
to only produce the CSV (cheap — no WARC download), then consume it later. This decouples the
(possibly expensive) index query from extraction, makes runs reproducible/shareable, and lets the
whole reader/writer pipeline be tested without AWS.

# 1) produce the ranges once (cheap, single-crawl query, no WARC fetch)
cdxt --crawl CC-MAIN-2026-17 repackage \
    --target-source sql --engine duckdb --hostnames commoncrawl.org \
    --range-jobs-output ranges.csv --no-fetch

# 2) fetch + repackage from the CSV, as many times as you like
cdxt repackage --target-source csv --csv-path ranges.csv \
    --warc-download-prefix https://data.commoncrawl.org --prefix ./out/CC

CSV formats

Default (relative filename) — the consumer prepends --warc-download-prefix:

filename,offset,length
crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-20260101000000-20260101010000-00000.warc.gz,111440525,9754
crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-20260101000000-20260101010000-00001.warc.gz,98231,12044

Self-contained (--csv-self-contained) — full URLs, used as-is (no prefix needed). The reader
auto-detects the mode from the header (url vs filename):

url,offset,length
https://data.commoncrawl.org/crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-...-00000.warc.gz,111440525,9754
s3://commoncrawl/crawl-data/CC-MAIN-2026-17/segments/1234567890.12/warc/CC-MAIN-...-00001.warc.gz,98231,12044

.tsv/.tsv.gz inputs are read as tab-delimited; .gz inputs are decompressed.

Optional dependency

DuckDB is optional: pip install cdx_toolkit[duckdb]. Without it, the other sources work unchanged;
selecting --engine duckdb raises a clear error.

Testing

  • Unit (no AWS): SQL builder per engine (incl. --domains / combined host+domain),
    escape_sql_literal injection rejection, CSV
    reader/writer round-trip in both modes + header auto-detection, the make_source factory
    validation, the confirm_cost guard, --no-fetch, and a regression test that the producer still
    releases readers (_STOP) when a source raises.
  • CSV round-trip e2e (offline produce from the CDX fixture + HTTP consume), both modes.
  • Gated e2e (skipped in CI): Athena and DuckDB querying CC-MAIN-2026-17 / commoncrawl.org
    (single partition — cheap, never an all-crawls scan), including a DuckDB --domains run.

Verified locally: full suite 205 passed; DuckDB e2e ran live and passed; all 6 S3
aioboto3 read/write tests pass
; flake8 clean. (Athena gated test skips where Athena permissions
aren't available; its code path is covered by unit tests and shares the orchestration/fetch path
proven by the DuckDB e2e.)

…}/csv)

Rename the warc_by_cdx command to 'repackage' and make the range-job stage
pluggable behind a RangeJobSource abstraction:

- sources/: base (RangeJobSource + CostEstimate), sql_base (shared query
  builder + crawl resolution), cdx, athena, duckdb (optional dep), csv
  (reader + RangeJobCsvWriter), and a make_source factory.
- CLI: --target-source {cdx,sql,csv} with --engine {athena,duckdb}; shared
  --hostnames/--query/--query-file; --duckdb-index-path; --csv-path; CSV
  materialization via --range-jobs-output/--no-fetch/--csv-self-contained;
  generalized --confirm-cost guard.
- WARCFilter: takes an injected source; orchestrator owns queueing, the
  record limit, counting, and _STOP emission in a finally (fixes the prior
  hung-readers bug when a source raised). Sources own their own stage-1
  client/connection; WARCFilter manages only read/write S3 clients.
- DuckDB reads the CC columnar parquet directly via read_parquet with
  per-crawl partition globbing; Athena unchanged in behaviour.
- setup.py: cdx_toolkit[duckdb] extra; conftest: requires_duckdb.

Tests: unit (sql_base, csv round-trip, make_source, confirm_cost, producer
stop-sentinel regression, --no-fetch); CSV round-trip e2e from the CDX
fixture; gated Athena/DuckDB e2e (CC-MAIN-2026-17/commoncrawl.org, single
partition). DuckDB e2e verified live; 205 passed.
@malteos malteos marked this pull request as draft June 12, 2026 10:58
Extend the guided SQL filter (athena + duckdb) to match on
url_host_registered_domain in addition to url_host_name. --hostnames and
--domains can be combined (OR-ed); at least one (or a raw --query) is required.
TLD optimizer hint is derived from both. Also make the DuckDB source resilient
to transient S3 read timeouts (http_timeout/http_retries).

Tests: unit coverage for domain-only / combined host+domain query building
(athena + duckdb) and factory validation; gated DuckDB domain e2e
(CC-MAIN-2026-17 / commoncrawl.org, --limit 10) verified live.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant