Skip to content

feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285

Draft
thodson-usgs wants to merge 2 commits into
DOI-USGS:mainfrom
thodson-usgs:httpx-migration
Draft

feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285
thodson-usgs wants to merge 2 commits into
DOI-USGS:mainfrom
thodson-usgs:httpx-migration

Conversation

@thodson-usgs
Copy link
Copy Markdown
Collaborator

@thodson-usgs thodson-usgs commented May 21, 2026

Summary

Adds a parallel fan-out path to multi_value_chunked. When API_USGS_CONCURRENT resolves to >1 (default 16), the decorator runs the sub-requests of an over-budget plan concurrently under one shared httpx.AsyncClient instead of issuing them serially. Falls back to the serial sync path (with a one-time UserWarning) when no async fetch sibling is wired or when an asyncio event loop is already running (Jupyter / IPython kernels, async apps — asyncio.run would otherwise raise).

Note

Rebased onto main now that the httpx upgrade (#289) has merged — this PR is a single commit containing only the async chunker.

Benchmark: ~6.7× speedup on a 6-state get_daily discharge pull over 17,298 stream sites (chunked into 64 sub-requests) — parallel @ API_USGS_CONCURRENT=16: 3.11s; serial @ =1: 20.94s (~1,920 rows each). Distinct date windows per side so the server cache can't bias either run; both authenticated via API_USGS_PAT.

API_USGS_CONCURRENT

Value Behavior
unset / blank parallel, cap = 16 (_CONCURRENCY_DEFAULT)
≥ 2 parallel, semaphore-capped at that value
1 serial (sync ChunkedCall.resume() path)
unbounded parallel, no per-call cap — caller owns the burst risk
0, negative, malformed ValueError at call time

Connection-pool sharing across all sub-requests of a single chunked call via the _chunked_client (sync) / _chunked_async_client (async) ContextVars — _walk_pages / _walk_pages_async read them as fallbacks before opening a fresh client.

Retries (API_USGS_RETRIES)

Each sub-request is retried on a transient failure with exponential backoff + full jitter, honoring a server Retry-After, so a large fan-out completes through the AWS API Gateway's burst throttling and the occasional backend straggler instead of aborting on the first 429/5xx/timeout. .resume() remains the escape hatch once retries are exhausted.

Value Behavior
unset / blank retry, cap = 4 (_RETRIES_DEFAULT)
N ≥ 1 up to N retries per sub-request
0 no retry — first transient surfaces as a resumable ChunkInterrupted
negative, malformed ValueError at call time
  • RetryPolicy — a frozen value object owning the timing decisions (from_env / should_retry / backoff). Full jitter (random.uniform(0, ceiling)) de-correlates the ≤16 concurrent retries so a throttle event doesn't make them re-burst in lockstep.
  • Retry-After split. A short hint is slept off inline; one longer than retry_after_cap (60s) — e.g. a multi-minute quota-window reset — escalates immediately to the resumable interruption rather than blocking the call.
  • _retryable is narrower than the resumability classifier: it retries RateLimited / ServiceUnavailable / httpx.TransportError (connect/read timeouts) but not httpx.InvalidURL (a too-long cursor won't fix on retry).
  • The async retry runs inside the semaphore, so a backing-off chunk holds its slot — effective concurrency shrinks under throttling (a free, partial adaptive-concurrency effect). Backoffs surface on the progress line (retrying (attempt N, waiting Ns)).

Architecture

_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent) is the orchestrator: it dispatches every sub-request concurrently via asyncio.gather(return_exceptions=True). Completed pairs survive a sibling's transient failure, so partial state stays recoverable through ChunkedCall.resume() on the sync path.

Failure precedence in the gather

  1. Cancellation / interrupt signals (CancelledError, KeyboardInterrupt, SystemExit) propagate unmodified — never wrapped as transients. Cancellation is asyncio's abort signal; rewriting it as ChunkInterrupted would silently consume the user's stop request.
  2. Recognized transients (RateLimited, ServiceUnavailable, bare httpx.HTTPError) wrap as ChunkInterrupted so the user gets a resumable handle even when a non-transient bug landed earlier in submission order.
  3. Otherwise raise the first failure in submission order, preserving its type.

Sync ↔ async bridge

_execute_in_parallel owns the asyncio.run dispatch with two recoverable-misconfig fallbacks (each emits a one-time UserWarning, then runs serial):

  • Event-loop detection. asyncio.run() raises inside an already-running loop. The bridge calls asyncio.get_running_loop() first and, when one is active, falls back to the serial path so Jupyter / IPython users don't see a confusing RuntimeError.
  • Missing-fetch_async warning. If API_USGS_CONCURRENT requests parallel but the decorator wasn't wired with fetch_async=, the wrapper warns + runs serial rather than silently no-op'ing the env var.

The async paginate path (_paginate_async, _walk_pages_async, _client_for_async, _fetch_once_async) mirrors the sync path exactly, sharing the parse_response / follow_up callbacks and the _ogc_parse_response parser, so there is one pagination contract for both.

Test plan

  • 435 mocked tests pass, 2 skipped; ruff check clean.
  • Async-path coverage in tests/waterdata_chunking_test.py:
    • one-call-per-sub-request happy path
    • mid-fan-out transient yields resumable ChunkInterrupted whose .call.resume() re-issues only the unfinished indices via the sync path
    • fallback-to-serial parametrized over running-loop and missing-fetch_async
    • CancelledError-wins-over-transient-sibling regression test (asyncio's cancellation contract takes precedence over the chunker's retry semantics)
  • Retry coverage: RetryPolicy math + full-jitter bounds, _retryable taxonomy (incl. InvalidURL not retried), sync + async transient-then-success, exhausted-still-resumable, and long-Retry-After escalation — all with _SLEEP/_ASLEEP patched to no-ops.
  • Async-progress integration tests in tests/waterdata_progress_test.py (reporter calls from _paginate_async and _fan_out_async; note_retry render/clear).
  • _walk_pages_async initial-parse-error test in tests/waterdata_utils_test.py.
  • Live-API CI sweep.

Design notes / out of scope

  • No pre-emptive quota gating. An earlier revision raised a RequestExceedsQuota before a burst that the x-ratelimit-remaining header said couldn't fit; it was dropped as not worth the complexity. The reactive retry-with-backoff above is the chosen alternative — absorb transients rather than predict them — and a transient that outlives the retries still surfaces as a resumable ChunkInterrupted.
  • Adaptive concurrency (AIMD) — a client-side token-bucket / additive-increase-multiplicative-decrease rate limiter that converges to the gateway's steady-state rate would reduce retries further. The semaphore-held-during-backoff behavior is a partial version; a full controller is a separate change.
  • High-concurrency memory: _fan_out_async materializes all (df, response) pairs before combining. Consider streaming-combine via asyncio.as_completed if users push concurrency very high.
  • asyncio.TaskGroup (3.11+): would replace gather once the 3.9 floor is dropped, though the partial-completion contract fights TaskGroup's cancel-on-first-failure default — the gather form may stay the right shape regardless.
  • NEWS.md entry — left for the merger to draft.

🤖 Generated with Claude Code

@thodson-usgs thodson-usgs force-pushed the httpx-migration branch 18 times, most recently from c618ea8 to 06d43aa Compare May 24, 2026 14:34
@thodson-usgs thodson-usgs force-pushed the httpx-migration branch 9 times, most recently from 57ff5bd to 6d1d5db Compare May 25, 2026 15:38
@thodson-usgs thodson-usgs changed the title feat(waterdata): Migrate to httpx and add async parallel chunker feat(waterdata): Add async parallel chunker over httpx.AsyncClient May 25, 2026
@thodson-usgs thodson-usgs added the enhancement New feature or request label May 25, 2026
@thodson-usgs thodson-usgs force-pushed the httpx-migration branch 2 times, most recently from 57bf0ee to 2ec3f51 Compare May 26, 2026 20:51
Adds a parallel fan-out path to `multi_value_chunked`. When
`API_USGS_CONCURRENT` resolves to >1 (default: 16), the decorator
runs the sub-requests of an over-budget plan concurrently under
one shared `httpx.AsyncClient`, instead of issuing them serially.
Falls back to the serial sync path (with a one-time UserWarning)
when no async fetch sibling is wired or when an asyncio event
loop is already running (Jupyter, IPython, async apps —
`asyncio.run` would otherwise raise).

Architecture (`dataretrieval/waterdata/chunking.py`):

* `_fan_out_async(plan, fetch_once, fetch_async, *, max_concurrent)`
  is the orchestrator: it dispatches every sub-request concurrently
  via `asyncio.gather(return_exceptions=True)`. Completed pairs
  survive a sibling's transient failure, so partial state stays
  recoverable through `ChunkedCall.resume()` on the sync path.
* Failure precedence in the gather:
    1. Cancellation/interrupt signals (CancelledError,
       KeyboardInterrupt, SystemExit) propagate unmodified — never
       wrapped as transients. Cancellation is asyncio's abort
       signal; rewriting it as ChunkInterrupted would silently
       consume the user's stop request.
    2. Recognized transients (RateLimited, ServiceUnavailable, bare
       httpx.HTTPError) wrap as ChunkInterrupted so the user gets
       a resumable handle even when a non-transient bug landed
       earlier in submission order.
    3. Otherwise raise the first failure in submission order,
       preserving its type.
* `_execute_in_parallel` owns the sync→async bridge: `asyncio.run`
  dispatch with the `fetch_async is None` and running-event-loop
  fallbacks (each a one-time UserWarning, then serial).
* `_publish_async_client` / `get_active_async_client` /
  `_chunked_async_client` ContextVar let async paginated-loop
  helpers (`_walk_pages_async`, `_paginate_async`) reuse one
  `AsyncClient` connection pool across every concurrent
  sub-request.

Wiring (`dataretrieval/waterdata/utils.py`):

* `_walk_pages_async`, `_paginate_async`, `_client_for_async`,
  `_fetch_once_async` — async siblings of the sync paginate path,
  sharing the same `parse_response` / `follow_up` callbacks and
  the `_ogc_parse_response` parser.
* The `@chunking.multi_value_chunked(fetch_async=_fetch_once_async)`
  decorator on `_fetch_once` wires the async sibling so the
  parallel path is available to every Water Data OGC getter.
* `ChunkedCall.record()` encapsulates the completion write so the
  serial loop and the parallel fan-out share it; `_chunks` is a
  sparse index map so a parallel partial-failure resumes correctly
  via the sync path.

Concurrency cap (`API_USGS_CONCURRENT`):

* Integer N >= 1: bounded fan-out (semaphore-gated, N=1 forces
  serial sync). Default 16 — the server-friendly sweet spot.
* `unbounded`: no per-call cap (`Semaphore(sys.maxsize)`).
* Unset: default 16.

Retries (`API_USGS_RETRIES`, default 4; `0` disables): each
sub-request is retried on a transient failure with exponential
backoff + full jitter, so a large fan-out completes through the
AWS API Gateway's burst throttling and the occasional backend
straggler instead of aborting on the first 429/5xx/timeout.

* `RetryPolicy` — a frozen value object owning the timing decisions
  (`from_env`, `should_retry`, `backoff`). Full jitter
  (`random.uniform(0, ceiling)`) de-correlates the concurrent
  retries so they don't re-burst in lockstep. A server `Retry-After`
  overrides the computed backoff, unless it exceeds `retry_after_cap`
  (60s) — a multi-minute quota-window reset escalates to the
  resumable interruption instead of blocking the call inline.
* `_retryable` — chain-walking predicate, narrower than
  `_classify_chunk_error`: retries `RateLimited` / `ServiceUnavailable`
  / `httpx.TransportError` but NOT `httpx.InvalidURL`.
* `_retry_sync` / `_retry_async` drivers wrap the per-sub-request
  fetch at both seams (`ChunkedCall._issue`, `_fan_out_async.track`);
  the async retry runs inside the semaphore, so a backing-off chunk
  holds its slot and effective concurrency shrinks under throttling.
  On exhaustion the last exception re-raises into the existing
  `wrap_failure` path, so `.resume()` stays the escape hatch.
* `ProgressReporter.note_retry` surfaces the backoff on the status
  line ("retrying (attempt N, waiting Ns)"), cleared by the next page.

Test scaffolding: `tests/conftest.py` extends the `_serial_chunker`
autouse fixture to pin `API_USGS_CONCURRENT=1` and `API_USGS_RETRIES=0`
so the existing mocked suite stays on the deterministic serial path
with transients surfacing immediately; async and retry tests opt back
in by re-setting the env vars inside their body.

Tests: async-path coverage in `tests/waterdata_chunking_test.py`
(one-call-per-sub-request, mid-fan-out transient yields resumable
ChunkInterrupted, fallback-to-serial parametrized over
running-loop and missing-fetch_async, cancellation-wins-over-
transient-sibling regression), plus retry coverage (policy
math/jitter bounds, `_retryable` taxonomy, sync+async
transient-then-success, exhausted-still-resumable, long-`Retry-After`
escalation). `tests/waterdata_progress_test.py` adds progress
integration for `_fan_out_async` / `_paginate_async` and the
`note_retry` render/clear. `tests/waterdata_utils_test.py` adds a
`_walk_pages_async` initial-parse-error test.

Test suite: 435 passing, 2 skipped (mocked); ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant