diff --git a/src/dcd_mapping/lookup.py b/src/dcd_mapping/lookup.py index 34de2f0..3a5c186 100644 --- a/src/dcd_mapping/lookup.py +++ b/src/dcd_mapping/lookup.py @@ -264,35 +264,30 @@ async def get_protein_accession(transcript: str) -> str | None: async def get_transcripts( - gene_symbol: str, chromosome_ac: str, start: int, end: int -) -> list[str]: - """Get transcript accessions matching given parameters (excluding non-coding RNA). + chromosome_ac: str, start: int, end: int +) -> list[tuple[str, str]]: + """Get transcript accessions matching given parameters (excluding non-coding RNA), + returning both the transcript accession and HGNC symbol. - TODO: may be able to successfully query with only one of gene symbol/chromosome ac. - In initial testing, gene symbol doesn't seem to be a meaningful filter, but should - get further confirmation. - - :param gene_symbol: HGNC-given gene symbol (usually, but not always, equivalent to - symbols available in other nomenclatures.) :param chromosome: chromosome accession (e.g. ``"NC_000007.13"``) :param start: starting position :param end: ending position - :return: candidate transcript accessions + :return: candidate transcript accessions and HGNC symbols """ try: uta = CoolSeqToolBuilder().uta_db query = f""" - SELECT tx_ac + SELECT tx_ac, hgnc FROM {uta.schema}.tx_exon_aln_v - WHERE hgnc = '{gene_symbol}' - AND ({start} BETWEEN alt_start_i AND alt_end_i OR {end} BETWEEN alt_start_i AND alt_end_i) + WHERE ({start} BETWEEN alt_start_i AND alt_end_i OR {end} BETWEEN alt_start_i AND alt_end_i) AND alt_ac = '{chromosome_ac}' AND tx_ac NOT LIKE 'NR_%'; """ # noqa: S608 result = await uta.execute_query(query) except Exception as e: raise DataLookupError from e - return [row["tx_ac"] for row in result] + + return [(row["tx_ac"], row["hgnc"]) for row in result] # ------------------------------ Gene Normalizer ------------------------------ # diff --git a/src/dcd_mapping/transcripts.py b/src/dcd_mapping/transcripts.py index 00080d3..8af38cb 100644 --- a/src/dcd_mapping/transcripts.py +++ b/src/dcd_mapping/transcripts.py @@ -2,6 +2,7 @@ import logging import re +from Bio import Align from Bio.Data.CodonTable import IUPACData from Bio.Seq import Seq from Bio.SeqUtils import seq1 @@ -10,7 +11,6 @@ from dcd_mapping.exceptions import TxSelectError from dcd_mapping.lookup import ( get_chromosome_identifier, - get_gene_symbol, get_mane_transcripts, get_protein_accession, get_seqrepo, @@ -36,46 +36,85 @@ async def _get_compatible_transcripts( - target_gene: TargetGene, align_result: AlignmentResult -) -> list[list[str]]: - """Acquire matching transcripts + align_result: AlignmentResult, +) -> set[tuple[str, str]]: + """Acquire transcripts and their HGNC symbols which overlap with all hit subranges + of an alignment result. :param metadata: metadata for scoreset :param align_result: output of ``align()`` method - :return: List of list of compatible transcripts + :return: Set of compatible transcripts """ - if align_result.chrom.startswith("chr"): - aligned_chrom = align_result.chrom[3:] - else: - aligned_chrom = align_result.chrom + aligned_chrom = ( + align_result.chrom[3:] + if align_result.chrom.startswith("chr") + else align_result.chrom + ) chromosome = get_chromosome_identifier(aligned_chrom) - gene_symbol = get_gene_symbol(target_gene) - if not gene_symbol: - msg = ( - f"Unable to find gene symbol for target gene {target_gene.target_gene_name}" - ) - raise TxSelectError(msg) - transcript_matches = [] + + transcript_matches: set[tuple[str, str]] = set() for hit_range in align_result.hit_subranges: - matches_list = await get_transcripts( - gene_symbol, chromosome, hit_range.start, hit_range.end - ) - if matches_list: - transcript_matches.append(matches_list) + matches_list = await get_transcripts(chromosome, hit_range.start, hit_range.end) + if not transcript_matches: + transcript_matches = set(matches_list) + + transcript_matches.intersection_update(matches_list) + return transcript_matches -def _reduce_compatible_transcripts(matching_transcripts: list[list[str]]) -> list[str]: - """Reduce list of list of transcripts to a list containing only entries present - in each sublist +def _local_alignment_identity(query: str, ref: str) -> float: + """Compute local alignment percent identity between two protein sequences. - :param matching_transcripts: list of list of transcript accession IDs - :return: list of transcripts shared by all sublists + Uses Smith-Waterman local alignment with BLOSUM62; gap open -10, gap extend -0.5. + Returns alignment score, or -1000 if either sequence is empty. """ - common_transcripts_set = set(matching_transcripts[0]) - for sublist in matching_transcripts[1:]: - common_transcripts_set.intersection_update(sublist) - return list(common_transcripts_set) + if not query or not ref: + return -1000 + + aligner = Align.PairwiseAligner( + mode="local", + substitution_matrix=Align.substitution_matrices.load("BLOSUM62"), + open_gap_score=-10, + extend_gap_score=-0.5, + ) + + try: + alignments = aligner.align(query, ref) + except Exception as e: + # Do not fallback to approximate similarity; propagate failure + msg = f"Local alignment failed: {e}" + raise TxSelectError(msg) from e + + if not alignments: + return -1000 + + return alignments[0].score + + +def _choose_most_similar_transcript( + protein_sequence: str, mane_transcripts: list[TranscriptDescription] +) -> TranscriptDescription | None: + """Choose the transcript whose protein reference is most similar to the + provided sequence. + + Selects the highest similarity; ties keep first encountered (stable). + """ + if not mane_transcripts: + return None + if len(mane_transcripts) == 1: + return mane_transcripts[0] + + best: TranscriptDescription | None = None + best_score = -1.0 + for tx in mane_transcripts: + ref_seq = get_sequence(tx.refseq_prot) + score = _local_alignment_identity(protein_sequence, ref_seq) + if score > best_score: + best_score = score + best = tx + + return best def _choose_best_mane_transcript( @@ -156,47 +195,77 @@ async def _select_protein_reference( :raise TxSelectError: if no matching MANE transcripts and unable to get UniProt ID/ reference sequence """ - matching_transcripts = await _get_compatible_transcripts(target_gene, align_result) - if not matching_transcripts: - common_transcripts = None - else: - common_transcripts = _reduce_compatible_transcripts(matching_transcripts) - if not common_transcripts: - if not target_gene.target_uniprot_ref: - msg = f"Unable to find matching transcripts for target gene {target_gene.target_gene_name}" - raise TxSelectError(msg) - protein_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) - np_accession = target_gene.target_uniprot_ref.id - ref_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) - if not ref_sequence: - msg = f"Unable to grab reference sequence from uniprot.org for target gene {target_gene.target_gene_name}" - raise TxSelectError(msg) - nm_accession = None - tx_mode = None - else: - mane_transcripts = get_mane_transcripts(common_transcripts) + matching_transcripts = await _get_compatible_transcripts(align_result) + + # Map HGNC symbols to their compatible transcripts + hgnc_to_transcripts: dict[str, list[str]] = {} + for tx, hgnc in matching_transcripts: + hgnc_to_transcripts.setdefault(hgnc, []).append(tx) + + per_gene_best: list[ManeDescription | TranscriptDescription] = [] + best_tx: ManeDescription | TranscriptDescription | None = None + + # Choose one best transcript per gene (based on MANE priority, falling back to longest) + for _, transcripts in hgnc_to_transcripts.items(): + if not transcripts: + continue + + mane_transcripts = get_mane_transcripts(transcripts) best_tx = _choose_best_mane_transcript(mane_transcripts) + if not best_tx: - best_tx = await _get_longest_compatible_transcript(common_transcripts) - if not best_tx: - msg = f"Unable to find matching MANE transcripts for target gene {target_gene.target_gene_name}" + best_tx = await _get_longest_compatible_transcript(transcripts) + + if best_tx: + per_gene_best.append(best_tx) + + # If we found any per-gene best candidates, Step 2: choose the most similar among them and + # select it. + if per_gene_best: + if not target_gene.target_sequence: + msg = f"Unable to find target sequence for target gene {target_gene.target_gene_name}" raise TxSelectError(msg) + + protein_sequence = _get_protein_sequence(target_gene.target_sequence) + best_tx = _choose_most_similar_transcript(protein_sequence, per_gene_best) + + # As a fallback, pick the first candidate + if not best_tx: + best_tx = per_gene_best[0] + ref_sequence = get_sequence(best_tx.refseq_prot) - nm_accession = best_tx.refseq_nuc - np_accession = best_tx.refseq_prot - tx_mode = best_tx.transcript_priority + is_full_match = ref_sequence.find(protein_sequence) != -1 + start = ref_sequence.find(protein_sequence[:10]) + + return TxSelectResult( + nm=best_tx.refseq_nuc, + np=best_tx.refseq_prot, + start=start, + is_full_match=is_full_match, + sequence=get_sequence(best_tx.refseq_prot), + transcript_mode=best_tx.transcript_priority, + ) - protein_sequence = _get_protein_sequence(target_gene.target_sequence) - is_full_match = ref_sequence.find(protein_sequence) != -1 - start = ref_sequence.find(protein_sequence[:10]) + # If we didn't find any suitable transcript, attempt to use a provided UniProt reference + if not target_gene.target_uniprot_ref: + msg = f"Unable to find matching transcripts for target gene {target_gene.target_gene_name}" + raise TxSelectError(msg) + + uniprot_sequence = get_uniprot_sequence(target_gene.target_uniprot_ref.id) + if not uniprot_sequence: + msg = f"Unable to grab reference sequence from uniprot.org for target gene {target_gene.target_gene_name}" + raise TxSelectError(msg) + + is_full_match = uniprot_sequence.find(protein_sequence) != -1 + start = uniprot_sequence.find(protein_sequence[:10]) return TxSelectResult( - nm=nm_accession, - np=np_accession, + nm=None, + np=target_gene.target_uniprot_ref.id, start=start, is_full_match=is_full_match, sequence=protein_sequence, - transcript_mode=tx_mode, + transcript_mode=None, ) diff --git a/tests/test_transcript.py b/tests/test_transcript.py index 5d4612a..9219e39 100644 --- a/tests/test_transcript.py +++ b/tests/test_transcript.py @@ -13,10 +13,25 @@ from unittest.mock import MagicMock import pytest +from cool_seq_tool.schemas import TranscriptPriority from dcd_mapping.mavedb_data import _load_scoreset_records, get_scoreset_records -from dcd_mapping.schemas import AlignmentResult, ScoresetMetadata, TxSelectResult -from dcd_mapping.transcripts import select_transcript +from dcd_mapping.schemas import ( + AlignmentResult, + ScoresetMetadata, + SequenceRange, + TargetGene, + TargetSequenceType, + TargetType, + TxSelectResult, +) +from dcd_mapping.transcripts import ( + TranscriptDescription, + _choose_most_similar_transcript, + _percent_similarity, + _select_protein_reference, + select_transcript, +) @pytest.fixture() @@ -163,6 +178,148 @@ async def test_1_b_2( check_transcript_results_equality(actual, expected) +# --- Similarity helper tests --- + + +def make_mane(nm: str, np: str, priority: TranscriptPriority): + # Use generic TranscriptDescription for testing similarity logic + return TranscriptDescription( + refseq_nuc=nm, refseq_prot=np, transcript_priority=priority + ) + + +def test_percent_similarity_basic(): + assert _percent_similarity("AAAA", "AAAA") == 1.0 + assert _percent_similarity("AAAA", "BAAAA") == 1.0 # substring fast path + assert 0.0 <= _percent_similarity("ABCD", "WXYZ") <= 1.0 + + +def test_choose_most_similar_transcript_simple(monkeypatch): + # Query is most similar to NP_2 + query = "MKTFFV" + seqs = { + "NP_1": "MKAAAA", + "NP_2": "MKTFFV", + "NP_3": "MKTYFV", + } + + def fake_get_sequence(ac): + return seqs[ac] + + monkeypatch.setattr("dcd_mapping.transcripts.get_sequence", fake_get_sequence) + + mane_list = [ + make_mane("NM_1", "NP_1", TranscriptPriority.MANE_PLUS_CLINICAL), + make_mane("NM_2", "NP_2", TranscriptPriority.MANE_SELECT), + make_mane("NM_3", "NP_3", TranscriptPriority.MANE_PLUS_CLINICAL), + ] + + best = _choose_most_similar_transcript(query, mane_list) + assert best.refseq_prot == "NP_2" + + +def test_choose_most_similar_transcript_tie_keeps_first(monkeypatch): + # NP_1 and NP_2 have identical sequences vs query; NP_1 should win (stable tie) + query = "ABCDE" + seqs = { + "NP_1": "ABCDE", + "NP_2": "ABCDE", + "NP_3": "ABCXX", + } + + def fake_get_sequence(ac): + return seqs[ac] + + monkeypatch.setattr("dcd_mapping.transcripts.get_sequence", fake_get_sequence) + + mane_list = [ + make_mane("NM_1", "NP_1", TranscriptPriority.MANE_PLUS_CLINICAL), + make_mane("NM_2", "NP_2", TranscriptPriority.MANE_SELECT), + make_mane("NM_3", "NP_3", TranscriptPriority.MANE_PLUS_CLINICAL), + ] + + best = _choose_most_similar_transcript(query, mane_list) + assert best.refseq_prot == "NP_1" + + +@pytest.mark.asyncio(scope="module") +async def test_end_to_end_per_gene_then_similarity(monkeypatch): + """E2E: per-gene best via MANE priority, then global similarity among winners.""" + # Mock compatible transcripts grouped by HGNC symbol + compatible = { + ("NM_G1_A", "G1"), + ("NM_G1_B", "G1"), + ("NM_G2_A", "G2"), + ("NM_G2_B", "G2"), + } + + async def fake_get_compatible(_align_result): + return compatible + + monkeypatch.setattr( + "dcd_mapping.transcripts._get_compatible_transcripts", fake_get_compatible + ) + + # MANE per gene: G1 has a MANE_SELECT; G2 no select -> plus clinical wins + def fake_get_mane_transcripts(tx_list): + s = set(tx_list) + if s == {"NM_G1_A", "NM_G1_B"}: + return [ + make_mane("NM_G1_A", "NP_G1_A", TranscriptPriority.MANE_PLUS_CLINICAL), + make_mane("NM_G1_B", "NP_G1_B", TranscriptPriority.MANE_SELECT), + ] + if s == {"NM_G2_A", "NM_G2_B"}: + return [ + make_mane("NM_G2_A", "NP_G2_A", TranscriptPriority.MANE_PLUS_CLINICAL), + make_mane( + "NM_G2_B", + "NP_G2_B", + TranscriptPriority.LONGEST_COMPATIBLE_REMAINING, + ), + ] + return [] + + monkeypatch.setattr( + "dcd_mapping.transcripts.get_mane_transcripts", fake_get_mane_transcripts + ) + + # Sequence database: make query closest to NP_G2_A (G2 winner) + seqs = { + "NP_G1_A": "MKAAAA", + "NP_G1_B": "MKBBBB", + "NP_G2_A": "MKTFFV", + "NP_G2_B": "MKTYFV", + } + + def fake_get_sequence(ac): + return seqs[ac] + + monkeypatch.setattr("dcd_mapping.transcripts.get_sequence", fake_get_sequence) + + # Build target gene and minimal align result + query = "MKTFFV" + target_gene = TargetGene( + target_gene_name="Dummy", + target_gene_category=TargetType.PROTEIN_CODING, + target_sequence=query, + target_sequence_type=TargetSequenceType.PROTEIN, + ) + + align_result = AlignmentResult( + chrom="chr1", + strand=1, + coverage=None, + ident_pct=None, + query_range=SequenceRange(start=1, end=6), + query_subranges=[SequenceRange(start=1, end=6)], + hit_range=SequenceRange(start=1, end=6), + hit_subranges=[SequenceRange(start=1, end=6)], + ) + + tx = await _select_protein_reference(target_gene, align_result) + assert tx.np == "NP_G2_A" + + @pytest.mark.asyncio(scope="module") async def test_tx_src( scoreset_metadata_fixture: dict[str, ScoresetMetadata],