feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73
Draft
malteos wants to merge 2 commits into
Draft
feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73malteos wants to merge 2 commits into
malteos wants to merge 2 commits into
Conversation
…}/csv)
Rename the warc_by_cdx command to 'repackage' and make the range-job stage
pluggable behind a RangeJobSource abstraction:
- sources/: base (RangeJobSource + CostEstimate), sql_base (shared query
builder + crawl resolution), cdx, athena, duckdb (optional dep), csv
(reader + RangeJobCsvWriter), and a make_source factory.
- CLI: --target-source {cdx,sql,csv} with --engine {athena,duckdb}; shared
--hostnames/--query/--query-file; --duckdb-index-path; --csv-path; CSV
materialization via --range-jobs-output/--no-fetch/--csv-self-contained;
generalized --confirm-cost guard.
- WARCFilter: takes an injected source; orchestrator owns queueing, the
record limit, counting, and _STOP emission in a finally (fixes the prior
hung-readers bug when a source raised). Sources own their own stage-1
client/connection; WARCFilter manages only read/write S3 clients.
- DuckDB reads the CC columnar parquet directly via read_parquet with
per-crawl partition globbing; Athena unchanged in behaviour.
- setup.py: cdx_toolkit[duckdb] extra; conftest: requires_duckdb.
Tests: unit (sql_base, csv round-trip, make_source, confirm_cost, producer
stop-sentinel regression, --no-fetch); CSV round-trip e2e from the CDX
fixture; gated Athena/DuckDB e2e (CC-MAIN-2026-17/commoncrawl.org, single
partition). DuckDB e2e verified live; 205 passed.
Extend the guided SQL filter (athena + duckdb) to match on url_host_registered_domain in addition to url_host_name. --hostnames and --domains can be combined (OR-ed); at least one (or a raw --query) is required. TLD optimizer hint is derived from both. Also make the DuckDB source resilient to transient S3 read timeouts (http_timeout/http_retries). Tests: unit coverage for domain-only / combined host+domain query building (athena + duckdb) and factory validation; gated DuckDB domain e2e (CC-MAIN-2026-17 / commoncrawl.org, --limit 10) verified live.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Makes the WARC-repackaging command's range-job stage pluggable behind a
RangeJobSourceabstraction, and renames the command
warc_by_cdx→repackage(it now repackages WARC rangesfrom several sources, not just CDX).
Range jobs (a WARC file + byte range) can now come from:
--target-source--enginecdxsqlathenasqlduckdbread_parquet)csvAthena and DuckDB share one SQL core (
sources/sql_base.py); they differ only in theFROMclauseand execution. The pipeline orchestrator now owns queueing, the record limit, counting, and
stop-sentinel emission (in a
finally) — which also fixes a latent bug where a source raisingmid-run left the WARC readers hung forever. Each source owns its own stage-1 client/connection;
WARCFiltermanages only the read/write S3 clients.CLI usage
Global flags (
--crawl,--limit) come before therepackagesubcommand; source flags come after.CDX files (default)
cdxt repackage \ --target-source cdx \ --cdx-path filtered_CC-MAIN-2024-30.cdx.gz \ --prefix ./out/EXAMPLE \ --warc-download-prefix https://data.commoncrawl.org # multiple indices via glob: cdxt repackage --target-source cdx --cdx-path s3://bucket/cdx/ --cdx-glob '*.cdx.gz' --prefix ./out/EXAMPLESQL — Athena (guided by hostnames/domains, pruned by crawl)
cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine athena \ --hostnames commoncrawl.org www.commoncrawl.org \ --athena-database ccindex \ --athena-s3-output s3://my-bucket/athena-results/ \ --prefix s3://my-bucket/out/CC \ --warc-download-prefix s3://commoncrawlThe guided filter matches on
url_host_name(exact host) via--hostnamesand/orurl_host_registered_domainvia--domains(which also covers subdomains). They can be combined(predicates are OR-ed):
# every host under the example.com registered domain, plus one exact extra host cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine duckdb \ --domains example.com \ --hostnames blog.example.org \ --prefix ./out/EXSQL — DuckDB (reads the public parquet directly; no Athena charge)
cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine duckdb \ --hostnames commoncrawl.org \ --prefix ./out/CC # requires the optional dependency: pip install cdx_toolkit[duckdb]SQL — raw query (power users)
The query must
SELECT warc_filename, warc_record_offset, warc_record_length:cdxt repackage \ --target-source sql --engine athena \ --query "SELECT warc_filename, warc_record_offset, warc_record_length FROM ccindex WHERE subset = 'warc' AND crawl = 'CC-MAIN-2026-17' AND url_host_registered_domain = 'commoncrawl.org' AND content_mime_type = 'application/pdf' LIMIT 5000" \ --athena-database ccindex \ --athena-s3-output s3://my-bucket/athena-results/ \ --prefix ./out/PDFS \ --confirm-cost # or load it from a file with --query-file ./my_query.sqlCSV (consume a previously materialized range-jobs file)
cdxt repackage \ --target-source csv \ --csv-path ranges.csv \ --prefix ./out/CC \ --warc-download-prefix https://data.commoncrawl.orgCost guard
SQL index scans bill by data scanned (Athena: ~$5/TB), so
repackageprompts before a potentiallyexpensive query. It runs without a prompt only when the query is restricted to ≤ 10 crawls.
Otherwise (no
--crawl→ all crawls, > 10 crawls, or a raw--querywhose pruning can't beverified) it asks for confirmation; in a non-interactive shell it aborts unless
--confirm-costispassed.
cdxandcsvsources never prompt.Range-jobs CSV (materialization)
Any source can write its resolved range jobs to a CSV with
--range-jobs-output. Add--no-fetchto only produce the CSV (cheap — no WARC download), then consume it later. This decouples the
(possibly expensive) index query from extraction, makes runs reproducible/shareable, and lets the
whole reader/writer pipeline be tested without AWS.
CSV formats
Default (relative filename) — the consumer prepends
--warc-download-prefix:Self-contained (
--csv-self-contained) — full URLs, used as-is (no prefix needed). The readerauto-detects the mode from the header (
urlvsfilename):.tsv/.tsv.gzinputs are read as tab-delimited;.gzinputs are decompressed.Optional dependency
DuckDB is optional:
pip install cdx_toolkit[duckdb]. Without it, the other sources work unchanged;selecting
--engine duckdbraises a clear error.Testing
--domains/ combined host+domain),escape_sql_literalinjection rejection, CSVreader/writer round-trip in both modes + header auto-detection, the
make_sourcefactoryvalidation, the
confirm_costguard,--no-fetch, and a regression test that the producer stillreleases readers (
_STOP) when a source raises.CC-MAIN-2026-17/commoncrawl.org(single partition — cheap, never an all-crawls scan), including a DuckDB
--domainsrun.Verified locally: full suite 205 passed; DuckDB e2e ran live and passed; all 6 S3
aioboto3 read/write tests pass; flake8 clean. (Athena gated test skips where Athena permissions
aren't available; its code path is covered by unit tests and shares the orchestration/fetch path
proven by the DuckDB e2e.)