diff --git a/CHANGELOG.md b/CHANGELOG.md index a42de91..a625012 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ raise *"Multiple suitable storage providers found"* for any HTTP(S) URL. mtime read from `Content-Length` and `Last-Modified` response headers. Servers that do not support `HEAD` requests are handled gracefully (size and mtime default to 0). No checksum is available for generic URLs. +- Resumable downloads: interrupted transfers are continued from where they left off using + HTTP `Range` requests (`206 Partial Content`). Servers that do not support range requests + fall back to a full re-download. Partial files are preserved across retries for + connection/timeout errors, and discarded on checksum mismatches or other errors. ### Removed diff --git a/README.md b/README.md index d21cd3f..bf1f09a 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ A Snakemake storage plugin for downloading files via HTTP with local caching, ch - **Checksum verification**: Automatically verifies checksums (from Zenodo API, data.pypsa.org manifests, or GCS object metadata) - **Rate limit handling**: Automatically respects Zenodo's rate limits using `X-RateLimit-*` headers with exponential backoff retry - **Concurrent download control**: Limits simultaneous downloads to prevent overwhelming servers +- **Resumable downloads**: Interrupted transfers resume from where they left off using HTTP range requests - **Progress bars**: Shows download progress with tqdm - **Immutable URLs**: Returns mtime=0 for Zenodo and data.pypsa.org (persistent URLs); uses actual mtime for GCS and generic HTTP - **Environment variable support**: Configure via environment variables for CI/CD workflows @@ -154,6 +155,7 @@ The plugin automatically: - Uses `X-RateLimit-Reset` to calculate wait time - Retries failed requests with exponential backoff (up to 5 attempts) - Handles transient errors: HTTP errors, timeouts, checksum mismatches, and network issues +- Resumes interrupted downloads using `Range` requests where supported by the server ## URL Handling diff --git a/src/snakemake_storage_plugin_cached_http/__init__.py b/src/snakemake_storage_plugin_cached_http/__init__.py index c5dd22e..7e4c960 100644 --- a/src/snakemake_storage_plugin_cached_http/__init__.py +++ b/src/snakemake_storage_plugin_cached_http/__init__.py @@ -264,19 +264,23 @@ def _get_rate_limit_wait_time(self, headers: httpx.Headers) -> float | None: return wait_seconds @asynccontextmanager - async def httpr(self, method: str, url: str): + async def httpr(self, method: str, url: str, headers: dict | None = None): """ HTTP request wrapper with rate limiting and exception logging. Args: method: HTTP method (e.g., "get", "post") url: URL to request + headers: Optional additional HTTP headers Yields: httpx.Response object """ try: - async with self.client() as client, client.stream(method, url) as response: + async with ( + self.client() as client, + client.stream(method, url, headers=headers) as response, + ): wait_time = self._get_rate_limit_wait_time(response.headers) if wait_time is not None: logger.info( @@ -728,20 +732,33 @@ async def managed_retrieve(self): return try: + # Check for existing partial file to resume + offset = local_path.stat().st_size if local_path.exists() else 0 + headers = {"Range": f"bytes={offset}-"} if offset > 0 else None + # Download using a get request, rate limit errors are detected and raise # WorkflowError to trigger a retry - async with self.provider.httpr("get", query) as response: - if response.status_code != 200: + async with self.provider.httpr("get", query, headers=headers) as response: + if response.status_code == 206: + # Server supports resume - append to existing partial file + mode = "ab" + logger.info(f"Resuming {filename} from byte {offset}") + elif response.status_code == 200: + # Server doesn't support Range - discard partial and restart + mode = "wb" + offset = 0 + else: raise WorkflowError( f"Failed to download: HTTP {response.status_code} ({query})" ) - total_size = int(response.headers.get("content-length", 0)) + total_size = int(response.headers.get("content-length", 0)) + offset # Download to local path with progress bar - with local_path.open(mode="wb") as f: + with local_path.open(mode=mode) as f: with tqdm( total=total_size, + initial=offset, unit="B", unit_scale=True, desc=filename, @@ -758,7 +775,11 @@ async def managed_retrieve(self): if self.provider.cache: self.provider.cache.put(query, local_path) + except (TimeoutError, ConnectionError, httpx.TransportError): + # Mid-transfer interruption - keep partial file for resume on next retry + raise except: + # Any other error (wrong checksum, HTTP error, unexpected) - delete and restart if local_path.exists(): local_path.unlink() raise diff --git a/tests/test_download.py b/tests/test_download.py index be1478c..7f65d38 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -8,11 +8,14 @@ import logging import os import time +from contextlib import asynccontextmanager +from unittest.mock import MagicMock from urllib.parse import urlparse import pytest from snakemake_storage_plugin_cached_http import ( + FileMetadata, StorageObject, StorageProvider, StorageProviderSettings, @@ -339,3 +342,84 @@ async def test_cache_staleness_for_mutable_sources( downloaded_content = cached_path.read_bytes() assert b'"stale": true' not in downloaded_content assert downloaded_content == original_content + + +def make_mock_httpr(content: bytes, fail_at: int | None): + """ + Factory for a mock httpr context manager simulating a range-capable HTTP server. + + Args: + content: The full file content to serve. + fail_at: If set, drop the connection after serving this many bytes on the + first (non-Range) request, simulating a mid-transfer interruption. + If None, serve the full content without interruption. + """ + + @asynccontextmanager + async def mock_httpr(method, request_url, headers=None): + range_header = headers.get("Range") if headers else None + mock_httpr.received_range_headers.append(range_header) + + response = MagicMock() + if range_header is None: + response.status_code = 200 + chunk = content + drop_at = fail_at + else: + response.status_code = 206 + offset = int(range_header.removeprefix("bytes=").removesuffix("-")) + chunk = content[offset:] + drop_at = None + + async def aiter_bytes(chunk_size=8192): + if drop_at is None: + yield chunk + else: + yield chunk[:drop_at] + raise ConnectionError("peer closed connection") + + response.aiter_bytes = aiter_bytes + response.headers = {"content-length": str(len(chunk))} + + yield response + + mock_httpr.received_range_headers = [] + + return mock_httpr + + +@pytest.mark.asyncio +async def test_resume_on_partial_file(storage_provider, tmp_path): + """Test that downloads resume from partial files using HTTP Range requests.""" + url = TEST_CONFIGS["zenodo"]["url"] + full_content = b'{"port": "test", "lat": 1.0, "lon": 2.0}' + + fail_at = 10 + mock_httpr = make_mock_httpr(full_content, fail_at=fail_at) + storage_provider.httpr = mock_httpr + storage_provider.cache = None + + obj = StorageObject( + query=url, + keep_local=False, + retrieve=True, + provider=storage_provider, + ) + + local_path = tmp_path / "resume_test" / "file.json" + local_path.parent.mkdir(parents=True, exist_ok=True) + obj.local_path = lambda: local_path + + async def mock_get_metadata(url): + return FileMetadata(checksum=None, size=0, mtime=0) + + obj.provider.get_metadata = mock_get_metadata + + await obj.managed_retrieve() + + assert len(mock_httpr.received_range_headers) == 2 + assert mock_httpr.received_range_headers[0] is None # first attempt: no Range header + assert mock_httpr.received_range_headers[1] == f"bytes={fail_at}-" + assert local_path.read_bytes() == full_content + +