Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ raise *"Multiple suitable storage providers found"* for any HTTP(S) URL.
mtime read from `Content-Length` and `Last-Modified` response headers. Servers that do not
support `HEAD` requests are handled gracefully (size and mtime default to 0). No checksum
is available for generic URLs.
- Resumable downloads: interrupted transfers are continued from where they left off using
HTTP `Range` requests (`206 Partial Content`). Servers that do not support range requests
fall back to a full re-download. Partial files are preserved across retries for
connection/timeout errors, and discarded on checksum mismatches or other errors.

### Removed

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ A Snakemake storage plugin for downloading files via HTTP with local caching, ch
- **Checksum verification**: Automatically verifies checksums (from Zenodo API, data.pypsa.org manifests, or GCS object metadata)
- **Rate limit handling**: Automatically respects Zenodo's rate limits using `X-RateLimit-*` headers with exponential backoff retry
- **Concurrent download control**: Limits simultaneous downloads to prevent overwhelming servers
- **Resumable downloads**: Interrupted transfers resume from where they left off using HTTP range requests
- **Progress bars**: Shows download progress with tqdm
- **Immutable URLs**: Returns mtime=0 for Zenodo and data.pypsa.org (persistent URLs); uses actual mtime for GCS and generic HTTP
- **Environment variable support**: Configure via environment variables for CI/CD workflows
Expand Down Expand Up @@ -154,6 +155,7 @@ The plugin automatically:
- Uses `X-RateLimit-Reset` to calculate wait time
- Retries failed requests with exponential backoff (up to 5 attempts)
- Handles transient errors: HTTP errors, timeouts, checksum mismatches, and network issues
- Resumes interrupted downloads using `Range` requests where supported by the server

## URL Handling

Expand Down
33 changes: 27 additions & 6 deletions src/snakemake_storage_plugin_cached_http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,23 @@ def _get_rate_limit_wait_time(self, headers: httpx.Headers) -> float | None:
return wait_seconds

@asynccontextmanager
async def httpr(self, method: str, url: str):
async def httpr(self, method: str, url: str, headers: dict | None = None):
"""
HTTP request wrapper with rate limiting and exception logging.

Args:
method: HTTP method (e.g., "get", "post")
url: URL to request
headers: Optional additional HTTP headers

Yields:
httpx.Response object
"""
try:
async with self.client() as client, client.stream(method, url) as response:
async with (
self.client() as client,
client.stream(method, url, headers=headers) as response,
):
wait_time = self._get_rate_limit_wait_time(response.headers)
if wait_time is not None:
logger.info(
Expand Down Expand Up @@ -728,20 +732,33 @@ async def managed_retrieve(self):
return

try:
# Check for existing partial file to resume
offset = local_path.stat().st_size if local_path.exists() else 0
headers = {"Range": f"bytes={offset}-"} if offset > 0 else None

# Download using a get request, rate limit errors are detected and raise
# WorkflowError to trigger a retry
async with self.provider.httpr("get", query) as response:
if response.status_code != 200:
async with self.provider.httpr("get", query, headers=headers) as response:
if response.status_code == 206:
# Server supports resume - append to existing partial file
mode = "ab"
logger.info(f"Resuming {filename} from byte {offset}")
elif response.status_code == 200:
# Server doesn't support Range - discard partial and restart
mode = "wb"
offset = 0
else:
raise WorkflowError(
f"Failed to download: HTTP {response.status_code} ({query})"
)

total_size = int(response.headers.get("content-length", 0))
total_size = int(response.headers.get("content-length", 0)) + offset

# Download to local path with progress bar
with local_path.open(mode="wb") as f:
with local_path.open(mode=mode) as f:
with tqdm(
total=total_size,
initial=offset,
unit="B",
unit_scale=True,
desc=filename,
Expand All @@ -758,7 +775,11 @@ async def managed_retrieve(self):
if self.provider.cache:
self.provider.cache.put(query, local_path)

except (TimeoutError, ConnectionError, httpx.TransportError):
# Mid-transfer interruption - keep partial file for resume on next retry
raise
except:
# Any other error (wrong checksum, HTTP error, unexpected) - delete and restart
if local_path.exists():
local_path.unlink()
raise
84 changes: 84 additions & 0 deletions tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import logging
import os
import time
from contextlib import asynccontextmanager
from unittest.mock import MagicMock
from urllib.parse import urlparse

import pytest

from snakemake_storage_plugin_cached_http import (
FileMetadata,
StorageObject,
StorageProvider,
StorageProviderSettings,
Expand Down Expand Up @@ -339,3 +342,84 @@ async def test_cache_staleness_for_mutable_sources(
downloaded_content = cached_path.read_bytes()
assert b'"stale": true' not in downloaded_content
assert downloaded_content == original_content


def make_mock_httpr(content: bytes, fail_at: int | None):
"""
Factory for a mock httpr context manager simulating a range-capable HTTP server.

Args:
content: The full file content to serve.
fail_at: If set, drop the connection after serving this many bytes on the
first (non-Range) request, simulating a mid-transfer interruption.
If None, serve the full content without interruption.
"""

@asynccontextmanager
async def mock_httpr(method, request_url, headers=None):
range_header = headers.get("Range") if headers else None
mock_httpr.received_range_headers.append(range_header)

response = MagicMock()
if range_header is None:
response.status_code = 200
chunk = content
drop_at = fail_at
else:
response.status_code = 206
offset = int(range_header.removeprefix("bytes=").removesuffix("-"))
chunk = content[offset:]
drop_at = None

async def aiter_bytes(chunk_size=8192):
if drop_at is None:
yield chunk
else:
yield chunk[:drop_at]
raise ConnectionError("peer closed connection")

response.aiter_bytes = aiter_bytes
response.headers = {"content-length": str(len(chunk))}

yield response

mock_httpr.received_range_headers = []

return mock_httpr


@pytest.mark.asyncio
async def test_resume_on_partial_file(storage_provider, tmp_path):
"""Test that downloads resume from partial files using HTTP Range requests."""
url = TEST_CONFIGS["zenodo"]["url"]
full_content = b'{"port": "test", "lat": 1.0, "lon": 2.0}'

fail_at = 10
mock_httpr = make_mock_httpr(full_content, fail_at=fail_at)
storage_provider.httpr = mock_httpr
storage_provider.cache = None

obj = StorageObject(
query=url,
keep_local=False,
retrieve=True,
provider=storage_provider,
)

local_path = tmp_path / "resume_test" / "file.json"
local_path.parent.mkdir(parents=True, exist_ok=True)
obj.local_path = lambda: local_path

async def mock_get_metadata(url):
return FileMetadata(checksum=None, size=0, mtime=0)

obj.provider.get_metadata = mock_get_metadata

await obj.managed_retrieve()

assert len(mock_httpr.received_range_headers) == 2
assert mock_httpr.received_range_headers[0] is None # first attempt: no Range header
assert mock_httpr.received_range_headers[1] == f"bytes={fail_at}-"
assert local_path.read_bytes() == full_content