From abbc8468ec5dfb22dbda185283eeb53ba8a37229 Mon Sep 17 00:00:00 2001 From: Benjamin Capodanno Date: Thu, 18 Dec 2025 14:06:38 -0800 Subject: [PATCH] feat: add ClinGen namespace to CSV export functionality Additionally, adds a few tests for existing namespaces to increase coverage of namespaced CSV export. --- src/mavedb/lib/score_sets.py | 17 +++++-- src/mavedb/routers/score_sets.py | 6 +-- tests/routers/test_score_set.py | 78 ++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 6 deletions(-) diff --git a/src/mavedb/lib/score_sets.py b/src/mavedb/lib/score_sets.py index 190d7b42..0e01c9cd 100644 --- a/src/mavedb/lib/score_sets.py +++ b/src/mavedb/lib/score_sets.py @@ -502,7 +502,7 @@ def find_publish_or_private_superseded_score_set_tail( def get_score_set_variants_as_csv( db: Session, score_set: ScoreSet, - namespaces: List[Literal["scores", "counts", "vep", "gnomad"]], + namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]], namespaced: Optional[bool] = None, start: Optional[int] = None, limit: Optional[int] = None, @@ -519,8 +519,8 @@ def get_score_set_variants_as_csv( The database session to use. score_set : ScoreSet The score set to get the variants from. - namespaces : List[Literal["scores", "counts", "vep", "gnomad"]] - The namespaces for data. Now there are only scores, counts, VEP, and gnomAD. ClinVar will be added in the future. + namespaces : List[Literal["scores", "counts", "vep", "gnomad", "clingen"]] + The namespaces for data. Now there are only scores, counts, VEP, gnomAD, and ClinGen. ClinVar will be added in the future. namespaced: Optional[bool] = None Whether namespace the columns or not. start : int, optional @@ -569,6 +569,8 @@ def get_score_set_variants_as_csv( namespaced_score_set_columns["vep"].append("vep_functional_consequence") if "gnomad" in namespaced_score_set_columns: namespaced_score_set_columns["gnomad"].append("gnomad_af") + if "clingen" in namespaced_score_set_columns: + namespaced_score_set_columns["clingen"].append("clingen_allele_id") variants: Sequence[Variant] = [] mappings: Optional[list[Optional[MappedVariant]]] = None gnomad_data: Optional[list[Optional[GnomADVariant]]] = None @@ -841,6 +843,15 @@ def variant_to_csv_row( value = na_rep key = f"gnomad.{column_key}" if namespaced else column_key row[key] = value + for column_key in columns.get("clingen", []): + if column_key == "clingen_allele_id": + clingen_allele_id = mapping.clingen_allele_id if mapping else None + if clingen_allele_id is not None: + value = str(clingen_allele_id) + else: + value = na_rep + key = f"clingen.{column_key}" if namespaced else column_key + row[key] = value return row diff --git a/src/mavedb/routers/score_sets.py b/src/mavedb/routers/score_sets.py index 959f9133..0b64502c 100644 --- a/src/mavedb/routers/score_sets.py +++ b/src/mavedb/routers/score_sets.py @@ -706,8 +706,8 @@ def get_score_set_variants_csv( urn: str, start: int = Query(default=None, description="Start index for pagination"), limit: int = Query(default=None, description="Maximum number of variants to return"), - namespaces: List[Literal["scores", "counts", "vep", "gnomad"]] = Query( - default=["scores"], description="One or more data types to include: scores, counts, clinVar, gnomAD, VEP" + namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]] = Query( + default=["scores"], description="One or more data types to include: scores, counts, ClinGen, gnomAD, VEP" ), drop_na_columns: Optional[bool] = None, include_custom_columns: Optional[bool] = None, @@ -732,7 +732,7 @@ def get_score_set_variants_csv( The index to start from. If None, starts from the beginning. limit : Optional[int] The maximum number of variants to return. If None, returns all variants. - namespaces: List[Literal["scores", "counts", "vep", "gnomad"]] + namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]] The namespaces of all columns except for accession, hgvs_nt, hgvs_pro, and hgvs_splice. We may add ClinVar in the future. drop_na_columns : bool, optional diff --git a/tests/routers/test_score_set.py b/tests/routers/test_score_set.py index 09a2c25b..4d4c2e94 100644 --- a/tests/routers/test_score_set.py +++ b/tests/routers/test_score_set.py @@ -53,6 +53,7 @@ TEST_SAVED_GENERIC_CLINICAL_CONTROL, TEST_SAVED_GNOMAD_VARIANT, TEST_USER, + VALID_CLINGEN_CA_ID, ) from tests.helpers.dependency_overrider import DependencyOverrider from tests.helpers.util.common import ( @@ -2853,6 +2854,83 @@ def test_download_scores_counts_and_post_mapped_variants_file( ) +# Additional namespace export tests: VEP, ClinGen, gnomAD +def test_download_vep_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + score_set = mock_worker_variant_insertion( + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" + ) + # Create mapped variants with VEP consequence populated + create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION) + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=vep&include_post_mapped_hgvs=true&drop_na_columns=true" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert "vep.vep_functional_consequence" in reader.fieldnames + # At least one row should contain the test consequence value + rows = list(reader) + assert any(row.get("vep.vep_functional_consequence") == "missense_variant" for row in rows) + + +def test_download_clingen_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + score_set = mock_worker_variant_insertion( + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" + ) + # Create mapped variants then set ClinGen allele id for first mapped variant + create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION) + db_score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set["urn"]).one() + first_mapped_variant = db_score_set.variants[0].mapped_variants[0] + first_mapped_variant.clingen_allele_id = VALID_CLINGEN_CA_ID + session.add(first_mapped_variant) + session.commit() + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&include_post_mapped_hgvs=true&drop_na_columns=true" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert "clingen.clingen_allele_id" in reader.fieldnames + rows = list(reader) + assert rows[0].get("clingen.clingen_allele_id") == VALID_CLINGEN_CA_ID + + +def test_download_gnomad_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + score_set = mock_worker_variant_insertion( + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" + ) + # Link a gnomAD variant to the first mapped variant (version may not match export filter) + score_set = create_seq_score_set_with_mapped_variants( + client, session, data_provider, experiment["urn"], data_files / "scores.csv" + ) + link_gnomad_variants_to_mapped_variants(session, score_set) + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=gnomad&drop_na_columns=true" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert "gnomad.gnomad_af" in reader.fieldnames + + ######################################################################################################################## # Fetching clinical controls and control options for a score set ########################################################################################################################