diff --git a/pyproject.toml b/pyproject.toml index 4ee889e..92be3ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -152,7 +152,7 @@ ignore-variadic-names = true [tool.ruff.lint.per-file-ignores] "__init__.py" = ["F401", "E402"] -"tests/*" = ["ANN001", "ANN2", "ANN102", "S101", "B011", "INP001", "D", "C400", "PLR2004"] +"tests/*" = ["ANN001", "ANN2", "ANN102", "S101", "B011", "INP001", "D", "C400", "PLR2004", "PLC0415"] [tool.ruff.format] docstring-code-format = true diff --git a/src/anyvlm/restapi/vlm.py b/src/anyvlm/restapi/vlm.py index f0388d6..7445018 100644 --- a/src/anyvlm/restapi/vlm.py +++ b/src/anyvlm/restapi/vlm.py @@ -1,17 +1,23 @@ """Define route(s) for the variant-level matching (VLM) protocol""" +import gzip +import logging +import tempfile +import uuid from pathlib import Path -from typing import Annotated +from typing import Annotated, BinaryIO, Literal -from fastapi import Query, Request +from anyvar.utils.liftover_utils import ReferenceAssembly +from fastapi import HTTPException, Query, Request, UploadFile +from pydantic import BaseModel from anyvlm.anyvar.base_client import BaseAnyVarClient from anyvlm.functions.build_vlm_response import build_vlm_response_from_caf_data from anyvlm.functions.get_caf import get_caf +from anyvlm.functions.ingest_vcf import VcfAfColumnsError +from anyvlm.functions.ingest_vcf import ingest_vcf as ingest_vcf_function from anyvlm.main import app -from anyvlm.schemas.vlm import ( - VlmResponse, -) +from anyvlm.schemas.vlm import VlmResponse from anyvlm.storage.base_storage import Storage from anyvlm.utils.types import ( AnyVlmCohortAlleleFrequencyResult, @@ -22,13 +28,247 @@ UcscAssemblyBuild, ) +# Create alias for easier mocking in tests +ingest_vcf = ingest_vcf_function + +_logger = logging.getLogger(__name__) + +# Constants +MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 # 5GB +UPLOAD_CHUNK_SIZE = 1024 * 1024 # 1MB +REQUIRED_INFO_FIELDS = {"AC", "AN", "AC_Het", "AC_Hom", "AC_Hemi"} + + +# ==================== +# Response Models +# ==================== + + +class VcfIngestionResponse(BaseModel): + """Response model for VCF ingestion endpoint.""" + + status: Literal["success", "error"] + message: str + details: str | None = None + + +# ==================== +# Validation Helpers +# ==================== + + +def validate_filename_extension(filename: str) -> None: + """Validate that filename has .vcf.gz extension. + + :param filename: name of uploaded file + :raise ValueError: if extension is not .vcf.gz + """ + if not filename.endswith(".vcf.gz"): + raise ValueError("Only .vcf.gz files are accepted") + -def ingest_vcf(vcf_path: Path) -> None: - """Ingest variants and cohort allele frequency data from an input VCF +def validate_gzip_magic_bytes(file_obj: BinaryIO) -> None: + """Validate that file has gzip magic bytes. - :param vcf_path: VCF file location + :param file_obj: file-like object to validate + :raise ValueError: if file is not gzipped """ - raise NotImplementedError + header = file_obj.read(2) + file_obj.seek(0) # Reset file pointer + + if header != b"\x1f\x8b": + raise ValueError("File is not a valid gzip file") + + +def validate_file_size(size: int) -> None: + """Validate that file size is within limits. + + :param size: file size in bytes + :raise ValueError: if file exceeds maximum size + """ + if size > MAX_FILE_SIZE: + max_gb = MAX_FILE_SIZE / (1024**3) + raise ValueError(f"File too large. Maximum size: {max_gb:.1f}GB") + + +def validate_vcf_header(file_path: Path) -> None: + """Validate VCF file format and required INFO fields. + + :param file_path: path to VCF file + :raise ValueError: if VCF is malformed or missing required fields + """ + with gzip.open(file_path, "rt") as f: + # Check first line is VCF format declaration + first_line = f.readline().strip() + if not first_line.startswith("##fileformat=VCF"): + raise ValueError("Not a valid VCF file (missing format declaration)") + + # Scan headers for required INFO fields + found_fields = set() + + for line in f: + if line.startswith("##INFO= Path: + """Save uploaded file to temporary location using streaming. + + :param upload_file: FastAPI UploadFile object + :return: path to saved temporary file + :raise: Any exceptions during file operations (caller should handle cleanup) + """ + temp_dir = Path(tempfile.gettempdir()) + temp_path = temp_dir / f"anyvlm_{uuid.uuid4()}.vcf.gz" + + try: + # Stream upload to disk (memory efficient) + # Using blocking I/O here is acceptable as we're writing to local disk + with temp_path.open("wb") as f: + while chunk := await upload_file.read(UPLOAD_CHUNK_SIZE): + f.write(chunk) + except Exception: + # Cleanup on error + if temp_path.exists(): + temp_path.unlink() + raise + else: + return temp_path + + +# ==================== +# Endpoints +# ==================== + + +@app.post( + "/ingest_vcf", + summary="Upload and ingest VCF file", + description=( + "Upload a compressed VCF file (.vcf.gz) to register variants and store allele frequency data. " + "**Requirements:** File must be gzip-compressed (.vcf.gz), contain required INFO fields " + "(AC, AN, AC_Het, AC_Hom, AC_Hemi), and be under 5GB. " + "Processing is synchronous with a 30-minute timeout." + ), + tags=[EndpointTag.SEARCH], + response_model=VcfIngestionResponse, +) +async def ingest_vcf_endpoint( + request: Request, + file: UploadFile, + assembly: Annotated[ + ReferenceAssembly, + Query(..., description="Reference genome assembly (GRCh37 or GRCh38)"), + ], +) -> VcfIngestionResponse: + """Upload and ingest a VCF file with allele frequency data. + + Requirements: .vcf.gz format, <5GB, INFO fields (AC, AN, AC_Het, AC_Hom, AC_Hemi). + Synchronous processing with 30-minute timeout. Variants batched in groups of 1000. + + :param request: FastAPI request object + :param file: uploaded VCF file + :param assembly: reference assembly used in VCF + :return: ingestion status response + """ + temp_path: Path | None = None + + try: + # Validate filename extension + if not file.filename: + raise HTTPException(400, "Filename is required") # noqa: TRY301 + + try: + validate_filename_extension(file.filename) + except ValueError as e: + raise HTTPException(400, str(e)) from e + + # Validate content type (if provided) + if file.content_type and file.content_type not in { + "application/gzip", + "application/x-gzip", + "application/octet-stream", + }: + raise HTTPException( # noqa: TRY301 + 400, + f"Invalid content type: {file.content_type}", + ) + + # Validate gzip magic bytes + try: + validate_gzip_magic_bytes(file.file) + except ValueError as e: + raise HTTPException(400, str(e)) from e + + # Check file size + file.file.seek(0, 2) # Seek to end + file_size = file.file.tell() + file.file.seek(0) # Reset + + try: + validate_file_size(file_size) + except ValueError as e: + raise HTTPException(400, str(e)) from e + + # Save to temporary file + _logger.info("Saving uploaded file %s (%d bytes)", file.filename, file_size) + temp_path = await save_upload_file_temp(file) + + # Validate VCF format and required fields + try: + validate_vcf_header(temp_path) + except ValueError as e: + raise HTTPException( + 422, + f"VCF validation failed: {e!s}", + ) from e + + # Process VCF + anyvar_client = request.app.state.anyvar_client + _logger.info("Starting VCF ingestion for %s", file.filename) + + try: + ingest_vcf_function(temp_path, anyvar_client, assembly) + except VcfAfColumnsError as e: + _logger.exception("VCF missing required INFO columns") + raise HTTPException(422, f"VCF validation failed: {e}") from e + except Exception as e: + _logger.exception("VCF ingestion failed") + raise HTTPException(500, f"Ingestion failed: {e}") from e + + _logger.info("Successfully ingested VCF: %s", file.filename) + return VcfIngestionResponse( + status="success", + message=f"Successfully ingested {file.filename}", + ) + + except HTTPException: + # Re-raise HTTP exceptions + raise + except Exception as e: + _logger.exception("Unexpected error during VCF upload") + raise HTTPException(500, f"Upload failed: {e}") from e + finally: + # Always cleanup temporary file + if temp_path and temp_path.exists(): + _logger.debug("Cleaning up temporary file: %s", temp_path) + temp_path.unlink() @app.get( diff --git a/tests/conftest.py b/tests/conftest.py index 3c165cc..4ea70bc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -20,6 +20,12 @@ load_dotenv() +# Set required environment variables for tests if not already set +environ.setdefault("BEACON_NODE_ID", "org.anyvlm.test") +environ.setdefault("HANDOVER_TYPE_ID", "test-id") +environ.setdefault("HANDOVER_TYPE_LABEL", "Test Label") +environ.setdefault("BEACON_HANDOVER_URL", "https://test.example.com") + @pytest.fixture(scope="session") def test_data_dir() -> Path: diff --git a/tests/data/vcf/malformed_header.vcf.gz b/tests/data/vcf/malformed_header.vcf.gz new file mode 100644 index 0000000..20bfe98 Binary files /dev/null and b/tests/data/vcf/malformed_header.vcf.gz differ diff --git a/tests/data/vcf/missing_info_fields.vcf.gz b/tests/data/vcf/missing_info_fields.vcf.gz new file mode 100644 index 0000000..30fda00 Binary files /dev/null and b/tests/data/vcf/missing_info_fields.vcf.gz differ diff --git a/tests/data/vcf/not_a_vcf.txt.gz b/tests/data/vcf/not_a_vcf.txt.gz new file mode 100644 index 0000000..7c4e256 Binary files /dev/null and b/tests/data/vcf/not_a_vcf.txt.gz differ diff --git a/tests/data/vcf/valid_small.vcf.gz b/tests/data/vcf/valid_small.vcf.gz new file mode 100644 index 0000000..bb117bc Binary files /dev/null and b/tests/data/vcf/valid_small.vcf.gz differ diff --git a/tests/unit/test_vcf_upload_endpoint.py b/tests/unit/test_vcf_upload_endpoint.py new file mode 100644 index 0000000..82af675 --- /dev/null +++ b/tests/unit/test_vcf_upload_endpoint.py @@ -0,0 +1,401 @@ +"""Test VCF upload endpoint functionality.""" + +import io +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from anyvar.utils.liftover_utils import ReferenceAssembly +from fastapi.testclient import TestClient + +from anyvlm.functions.ingest_vcf import VcfAfColumnsError +from anyvlm.main import app + +# Constants for testing +MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 # 5GB +UPLOAD_CHUNK_SIZE = 1024 * 1024 # 1MB + + +@pytest.fixture(scope="module") +def client(): + """Create FastAPI test client with mock anyvar_client.""" + # Set up mock anyvar client on app state + mock_anyvar_client = MagicMock() + app.state.anyvar_client = mock_anyvar_client + return TestClient(app=app) + + +@pytest.fixture(scope="module") +def test_vcf_dir(test_data_dir: Path) -> Path: + """Path to VCF test data directory.""" + return test_data_dir / "vcf" + + +@pytest.fixture(scope="module") +def valid_vcf_gz(test_vcf_dir: Path) -> Path: + """Path to valid compressed VCF.""" + return test_vcf_dir / "valid_small.vcf.gz" + + +@pytest.fixture(scope="module") +def missing_fields_vcf_gz(test_vcf_dir: Path) -> Path: + """Path to VCF missing required INFO fields.""" + return test_vcf_dir / "missing_info_fields.vcf.gz" + + +@pytest.fixture(scope="module") +def malformed_vcf_gz(test_vcf_dir: Path) -> Path: + """Path to VCF with malformed header.""" + return test_vcf_dir / "malformed_header.vcf.gz" + + +@pytest.fixture(scope="module") +def not_vcf_gz(test_vcf_dir: Path) -> Path: + """Path to gzipped text file (not a VCF).""" + return test_vcf_dir / "not_a_vcf.txt.gz" + + +# ==================== +# Validation Helper Tests +# ==================== + + +class TestFileValidation: + """Test file validation functions.""" + + def test_validate_filename_extension_valid(self): + """Test that .vcf.gz extension passes validation.""" + from anyvlm.restapi.vlm import validate_filename_extension + + # Should not raise + validate_filename_extension("test.vcf.gz") + validate_filename_extension("path/to/file.vcf.gz") + + def test_validate_filename_extension_invalid(self): + """Test that non-.vcf.gz extensions fail validation.""" + from anyvlm.restapi.vlm import validate_filename_extension + + with pytest.raises(ValueError, match="Only .vcf.gz files"): + validate_filename_extension("test.vcf") + + with pytest.raises(ValueError, match="Only .vcf.gz files"): + validate_filename_extension("test.gz") + + with pytest.raises(ValueError, match="Only .vcf.gz files"): + validate_filename_extension("test.txt.gz") + + def test_validate_gzip_magic_bytes_valid(self, valid_vcf_gz: Path): + """Test gzip magic bytes validation with valid file.""" + from anyvlm.restapi.vlm import validate_gzip_magic_bytes + + with valid_vcf_gz.open("rb") as f: + content = f.read() + file_obj = io.BytesIO(content) + validate_gzip_magic_bytes(file_obj) + # Verify file pointer was reset + assert file_obj.tell() == 0 + + def test_validate_gzip_magic_bytes_invalid(self): + """Test gzip magic bytes validation with invalid file.""" + from anyvlm.restapi.vlm import validate_gzip_magic_bytes + + # Non-gzip content + file_obj = io.BytesIO(b"Not a gzip file") + with pytest.raises(ValueError, match="not a valid gzip file"): + validate_gzip_magic_bytes(file_obj) + + def test_validate_file_size_within_limit(self, valid_vcf_gz: Path): + """Test file size validation for file within limit.""" + from anyvlm.restapi.vlm import validate_file_size + + file_size = valid_vcf_gz.stat().st_size + assert file_size < MAX_FILE_SIZE # Sanity check + + # Should not raise + validate_file_size(file_size) + + def test_validate_file_size_exceeds_limit(self): + """Test file size validation for file exceeding limit.""" + from anyvlm.restapi.vlm import validate_file_size + + too_large = MAX_FILE_SIZE + 1 + with pytest.raises(ValueError, match="File too large"): + validate_file_size(too_large) + + def test_validate_vcf_header_valid(self, valid_vcf_gz: Path): + """Test VCF header validation with valid file.""" + from anyvlm.restapi.vlm import validate_vcf_header + + # Should not raise + validate_vcf_header(valid_vcf_gz) + + def test_validate_vcf_header_missing_format_declaration( + self, malformed_vcf_gz: Path + ): + """Test VCF header validation fails on missing fileformat.""" + from anyvlm.restapi.vlm import validate_vcf_header + + with pytest.raises(ValueError, match="Not a valid VCF"): + validate_vcf_header(malformed_vcf_gz) + + def test_validate_vcf_header_missing_required_fields( + self, missing_fields_vcf_gz: Path + ): + """Test VCF header validation fails on missing INFO fields.""" + from anyvlm.restapi.vlm import validate_vcf_header + + with pytest.raises(ValueError, match="VCF missing required INFO fields.*AN"): + validate_vcf_header(missing_fields_vcf_gz) + + +# ==================== +# Endpoint Integration Tests +# ==================== + + +class TestIngestVcfEndpoint: + """Test the /ingest_vcf HTTP endpoint.""" + + def test_endpoint_exists(self, client: TestClient): + """Test that the endpoint exists and accepts POST.""" + response = client.post("/ingest_vcf") + # Should not be 404 + assert response.status_code != 404 + + def test_missing_file_parameter(self, client: TestClient): + """Test request without file parameter.""" + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + ) + assert response.status_code == 422 # Unprocessable Entity + assert "file" in response.text.lower() or "required" in response.text.lower() + + def test_missing_assembly_parameter(self, client: TestClient, valid_vcf_gz: Path): + """Test request without assembly parameter.""" + with valid_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post("/ingest_vcf", files=files) + + assert response.status_code == 422 + assert ( + "assembly" in response.text.lower() or "required" in response.text.lower() + ) + + def test_invalid_assembly_value(self, client: TestClient, valid_vcf_gz: Path): + """Test request with invalid assembly value.""" + with valid_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh99"}, # Invalid + files=files, + ) + + assert response.status_code == 422 + + def test_invalid_file_extension(self, client: TestClient, valid_vcf_gz: Path): + """Test upload with wrong file extension.""" + with valid_vcf_gz.open("rb") as f: + # Use .vcf extension (should be .vcf.gz) + files = {"file": ("test.vcf", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + files=files, + ) + + assert response.status_code == 400 + json_response = response.json() + assert "detail" in json_response + assert ".vcf.gz" in json_response["detail"] + + def test_not_gzipped_file(self, client: TestClient): + """Test upload of non-gzipped content.""" + # Plain text, not gzipped + content = b"This is not gzipped" + files = {"file": ("test.vcf.gz", io.BytesIO(content), "application/gzip")} + + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + files=files, + ) + + assert response.status_code == 400 + json_response = response.json() + assert "detail" in json_response + assert "gzip" in json_response["detail"].lower() + + def test_not_a_vcf_file(self, client: TestClient, not_vcf_gz: Path): + """Test upload of gzipped file that's not a VCF.""" + with not_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + files=files, + ) + + assert response.status_code == 422 + json_response = response.json() + assert "detail" in json_response + assert "vcf" in json_response["detail"].lower() + + def test_vcf_missing_required_fields( + self, client: TestClient, missing_fields_vcf_gz: Path + ): + """Test upload of VCF missing required INFO fields.""" + with missing_fields_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + files=files, + ) + + assert response.status_code == 422 + json_response = response.json() + assert "detail" in json_response + assert ( + "info" in json_response["detail"].lower() + or "field" in json_response["detail"].lower() + ) + + @patch("anyvlm.restapi.vlm.ingest_vcf_function") + def test_successful_upload_and_ingestion( + self, mock_ingest: MagicMock, client: TestClient, valid_vcf_gz: Path + ): + """Test successful VCF upload and ingestion.""" + # Mock the ingest_vcf function to avoid needing real AnyVar + mock_ingest.return_value = None + + with valid_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + files=files, + ) + + assert response.status_code == 200 + json_response = response.json() + assert json_response["status"] == "success" + assert "message" in json_response + + # Verify ingest_vcf was called + assert mock_ingest.called + call_args = mock_ingest.call_args + + # Check Path argument + assert isinstance(call_args[0][0], Path) + + # Check AnyVar client was passed + assert call_args[0][1] is not None + + # Check assembly (3rd positional argument) + assert call_args[0][2] == ReferenceAssembly.GRCH38 + + @patch("anyvlm.restapi.vlm.ingest_vcf_function") + def test_ingestion_failure_propagates( + self, mock_ingest: MagicMock, client: TestClient, valid_vcf_gz: Path + ): + """Test that ingestion errors are properly handled and reported.""" + # Mock ingest_vcf to raise an error + mock_ingest.side_effect = VcfAfColumnsError("Missing AC_Het field") + + with valid_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + files=files, + ) + + assert response.status_code == 422 + json_response = response.json() + assert "detail" in json_response + assert "AC_Het" in json_response["detail"] + + def test_temp_file_cleanup_on_success(self, client: TestClient, valid_vcf_gz: Path): + """Test that temporary files are cleaned up after successful ingestion.""" + with patch("anyvlm.restapi.vlm.ingest_vcf_function") as mock_ingest: + mock_ingest.return_value = None + + with valid_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + files=files, + ) + + assert response.status_code == 200 + + # Verify the temp file path that was passed to ingest_vcf no longer exists + if mock_ingest.called: + temp_path = mock_ingest.call_args[0][0] + assert not temp_path.exists(), "Temporary file should be cleaned up" + + def test_temp_file_cleanup_on_error(self, client: TestClient, valid_vcf_gz: Path): + """Test that temporary files are cleaned up even when ingestion fails.""" + with patch("anyvlm.restapi.vlm.ingest_vcf_function") as mock_ingest: + mock_ingest.side_effect = Exception("Ingestion failed") + + with valid_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh38"}, + files=files, + ) + + assert response.status_code == 500 + + # Verify cleanup happened + if mock_ingest.called: + temp_path = mock_ingest.call_args[0][0] + assert not temp_path.exists(), ( + "Temporary file should be cleaned up even on error" + ) + + def test_assembly_grch37_parameter(self, client: TestClient, valid_vcf_gz: Path): + """Test that GRCh37 assembly parameter is accepted and used.""" + with patch("anyvlm.restapi.vlm.ingest_vcf_function") as mock_ingest: + mock_ingest.return_value = None + + with valid_vcf_gz.open("rb") as f: + files = {"file": ("test.vcf.gz", f, "application/gzip")} + response = client.post( + "/ingest_vcf", + params={"assembly": "GRCh37"}, + files=files, + ) + + assert response.status_code == 200 + + # Verify GRCh37 was passed (3rd positional argument) + call_args = mock_ingest.call_args + assert call_args[0][2] == ReferenceAssembly.GRCH37 + + +# ==================== +# File Size Limit Tests +# ==================== + + +class TestFileSizeLimits: + """Test file size limit enforcement.""" + + def test_file_size_check_with_mock_large_file(self): + """Test that files exceeding size limit are rejected.""" + # Create a mock file that reports large size + mock_large_file = MagicMock() + mock_large_file.filename = "huge.vcf.gz" + + # We'll need to test this at the validation function level + # since mocking the actual upload size is complex + from anyvlm.restapi.vlm import validate_file_size + + with pytest.raises(ValueError, match="File too large"): + validate_file_size(MAX_FILE_SIZE + 1)