Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 251 additions & 9 deletions src/anyvlm/restapi/vlm.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
"""Define route(s) for the variant-level matching (VLM) protocol"""

import gzip
import logging
import tempfile
import uuid
from pathlib import Path
from typing import Annotated
from typing import Annotated, BinaryIO, Literal

from fastapi import Query, Request
from anyvar.utils.liftover_utils import ReferenceAssembly
from fastapi import HTTPException, Query, Request, UploadFile
from ga4gh.va_spec.base.core import CohortAlleleFrequencyStudyResult
from pydantic import BaseModel

from anyvlm.anyvar.base_client import BaseAnyVarClient
from anyvlm.functions.build_vlm_response import build_vlm_response_from_caf_data
from anyvlm.functions.get_caf import get_caf
from anyvlm.functions.ingest_vcf import VcfAfColumnsError
from anyvlm.functions.ingest_vcf import ingest_vcf as ingest_vcf_function
from anyvlm.main import app
from anyvlm.schemas.vlm import (
VlmResponse,
)
from anyvlm.schemas.vlm import VlmResponse
from anyvlm.utils.types import (
ChromosomeName,
EndpointTag,
Expand All @@ -21,13 +27,249 @@
UcscAssemblyBuild,
)

# Create alias for easier mocking in tests
ingest_vcf = ingest_vcf_function

_logger = logging.getLogger(__name__)

# Constants
MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 # 5GB
UPLOAD_CHUNK_SIZE = 1024 * 1024 # 1MB
REQUIRED_INFO_FIELDS = {"AC", "AN", "AC_Het", "AC_Hom", "AC_Hemi"}


# ====================
# Response Models
# ====================


class VcfIngestionResponse(BaseModel):
"""Response model for VCF ingestion endpoint."""

status: Literal["success", "error"]
message: str
details: str | None = None


# ====================
# Validation Helpers
# ====================


def validate_filename_extension(filename: str) -> None:
"""Validate that filename has .vcf.gz extension.

:param filename: name of uploaded file
:raise ValueError: if extension is not .vcf.gz
"""
if not filename.endswith(".vcf.gz"):
raise ValueError("Only .vcf.gz files are accepted")


def ingest_vcf(vcf_path: Path) -> None:
"""Ingest variants and cohort allele frequency data from an input VCF
def validate_gzip_magic_bytes(file_obj: BinaryIO) -> None:
"""Validate that file has gzip magic bytes.

:param vcf_path: VCF file location
:param file_obj: file-like object to validate
:raise ValueError: if file is not gzipped
"""
raise NotImplementedError
header = file_obj.read(2)
file_obj.seek(0) # Reset file pointer

if header != b"\x1f\x8b":
raise ValueError("File is not a valid gzip file")


def validate_file_size(size: int) -> None:
"""Validate that file size is within limits.

:param size: file size in bytes
:raise ValueError: if file exceeds maximum size
"""
if size > MAX_FILE_SIZE:
max_gb = MAX_FILE_SIZE / (1024**3)
raise ValueError(f"File too large. Maximum size: {max_gb:.1f}GB")


def validate_vcf_header(file_path: Path) -> None:
"""Validate VCF file format and required INFO fields.

:param file_path: path to VCF file
:raise ValueError: if VCF is malformed or missing required fields
"""
with gzip.open(file_path, "rt") as f:
# Check first line is VCF format declaration
first_line = f.readline().strip()
if not first_line.startswith("##fileformat=VCF"):
raise ValueError("Not a valid VCF file (missing format declaration)")

# Scan headers for required INFO fields
found_fields = set()

for line in f:
if line.startswith("##INFO=<ID="):
# Extract field ID
field_id = line.split("ID=")[1].split(",")[0]
found_fields.add(field_id)
elif line.startswith("#CHROM"):
# End of headers
break

missing = REQUIRED_INFO_FIELDS - found_fields
if missing:
raise ValueError(
f"VCF missing required INFO fields: {', '.join(sorted(missing))}"
)


# ====================
# File Handling
# ====================


async def save_upload_file_temp(upload_file: UploadFile) -> Path:
"""Save uploaded file to temporary location using streaming.

:param upload_file: FastAPI UploadFile object
:return: path to saved temporary file
:raise: Any exceptions during file operations (caller should handle cleanup)
"""
temp_dir = Path(tempfile.gettempdir())
temp_path = temp_dir / f"anyvlm_{uuid.uuid4()}.vcf.gz"

try:
# Stream upload to disk (memory efficient)
with open(temp_path, "wb") as f:
while chunk := await upload_file.read(UPLOAD_CHUNK_SIZE):
f.write(chunk)
return temp_path
except Exception:
# Cleanup on error
if temp_path.exists():
temp_path.unlink()
raise


# ====================
# Endpoints
# ====================


@app.post(
"/ingest_vcf",
summary="Upload and ingest VCF file",
description=(
"Upload a compressed VCF file (.vcf.gz) to register variants and store allele frequency data. "
"**Requirements:** File must be gzip-compressed (.vcf.gz), contain required INFO fields "
"(AC, AN, AC_Het, AC_Hom, AC_Hemi), and be under 5GB. "
"Processing is synchronous with a 30-minute timeout."
),
tags=[EndpointTag.SEARCH],
response_model=VcfIngestionResponse,
)
async def ingest_vcf_endpoint(
request: Request,
file: UploadFile,
assembly: Annotated[
ReferenceAssembly,
Query(..., description="Reference genome assembly (GRCh37 or GRCh38)"),
],
) -> VcfIngestionResponse:
"""Upload and ingest a VCF file with allele frequency data.

Requirements: .vcf.gz format, <5GB, INFO fields (AC, AN, AC_Het, AC_Hom, AC_Hemi).
Synchronous processing with 30-minute timeout. Variants batched in groups of 1000.

:param request: FastAPI request object
:param file: uploaded VCF file
:param assembly: reference assembly used in VCF
:return: ingestion status response
"""
temp_path: Path | None = None

try:
# Validate filename extension
if not file.filename:
raise HTTPException(400, "Filename is required")

try:
validate_filename_extension(file.filename)
except ValueError as e:
raise HTTPException(400, str(e)) from e

# Validate content type (if provided)
if file.content_type and file.content_type not in {
"application/gzip",
"application/x-gzip",
"application/octet-stream",
}:
raise HTTPException(
400,
f"Invalid content type: {file.content_type}",
)

# Validate gzip magic bytes
try:
validate_gzip_magic_bytes(file.file)
except ValueError as e:
raise HTTPException(400, str(e)) from e

# Check file size
file.file.seek(0, 2) # Seek to end
file_size = file.file.tell()
file.file.seek(0) # Reset

try:
validate_file_size(file_size)
except ValueError as e:
raise HTTPException(400, str(e)) from e

# Save to temporary file
_logger.info("Saving uploaded file %s (%d bytes)", file.filename, file_size)
temp_path = await save_upload_file_temp(file)

# Validate VCF format and required fields
try:
validate_vcf_header(temp_path)
except ValueError as e:
raise HTTPException(
422,
f"VCF validation failed: {str(e)}",
) from e

# Process VCF
anyvar_client = request.app.state.anyvar_client
_logger.info("Starting VCF ingestion for %s", file.filename)

try:
ingest_vcf_function(temp_path, anyvar_client, assembly)
except VcfAfColumnsError as e:
_logger.exception("VCF missing required INFO columns")
raise HTTPException(
422, f"VCF validation failed: {e}"
) from e
except Exception as e:
_logger.exception("VCF ingestion failed")
raise HTTPException(
500, f"Ingestion failed: {e}"
) from e

_logger.info("Successfully ingested VCF: %s", file.filename)
return VcfIngestionResponse(
status="success",
message=f"Successfully ingested {file.filename}",
)

except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
_logger.exception("Unexpected error during VCF upload")
raise HTTPException(500, f"Upload failed: {e}") from e
finally:
# Always cleanup temporary file
if temp_path and temp_path.exists():
_logger.debug("Cleaning up temporary file: %s", temp_path)
temp_path.unlink()


@app.get(
Expand Down
Binary file added tests/data/vcf/malformed_header.vcf.gz
Binary file not shown.
Binary file added tests/data/vcf/missing_info_fields.vcf.gz
Binary file not shown.
Binary file added tests/data/vcf/not_a_vcf.txt.gz
Binary file not shown.
Binary file added tests/data/vcf/valid_small.vcf.gz
Binary file not shown.
Loading
Loading