Skip to content
153 changes: 140 additions & 13 deletions pyprophet/io/export/osw.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
import os
import pickle
from shutil import copyfile
import sqlite3
from typing import Literal, Tuple
import re
import sqlite3
from typing import Tuple

import click
import duckdb
import pandas as pd
import numpy as np
import click
import pandas as pd
from loguru import logger

from ..._config import ExportIOConfig
from .._base import BaseOSWReader, BaseOSWWriter
from ..util import (
check_sqlite_table,
check_duckdb_table,
unimod_to_codename,
write_scores_sql_command,
load_sqlite_scanner,
get_table_columns,
get_table_columns_with_types,
load_sqlite_scanner,
unimod_to_codename,
write_scores_sql_command,
)
from .._base import BaseOSWReader, BaseOSWWriter
from ..._config import ExportIOConfig


class OSWReader(BaseOSWReader):
Expand Down Expand Up @@ -167,10 +166,67 @@ def _check_alignment_presence(self, con):
con, "SCORE_ALIGNMENT"
)

def _has_im_boundaries(self, con) -> bool:
"""Return True if the FEATURE table contains IM boundary columns.

Older OSW files may not have these columns; this helper centralises the
PRAGMA check so callers don't duplicate the logic.
"""
try:
cols = [
r[1] for r in con.execute("PRAGMA table_info('FEATURE')").fetchall()
]
except Exception:
return False
return "EXP_IM_LEFTWIDTH" in cols and "EXP_IM_RIGHTWIDTH" in cols

def _has_im(self, con) -> bool:
"""Return True if the FEATURE table contains the EXP_IM column.

Older OSW files may not have this column; centralise the PRAGMA
check so callers don't duplicate the logic.
"""
try:
cols = [
r[1] for r in con.execute("PRAGMA table_info('FEATURE')").fetchall()
]
except Exception:
return False
return "EXP_IM" in cols

def _read_unscored_data(self, con):
"""Read data from unscored files."""
score_sql = self._build_score_sql(con)

# IM columns may or may not be present; centralised checks
has_im_boundaries = self._has_im_boundaries(con)
has_im = self._has_im(con)

# Compose EXP_IM (or NULL) plus IM boundary columns (or NULLs)
im_cols_sql = (
(
"""FEATURE.EXP_IM AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth"""
)
if has_im and has_im_boundaries
else (
"""FEATURE.EXP_IM AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth"""
)
if has_im and not has_im_boundaries
else (
"""NULL AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth"""
)
if (not has_im) and has_im_boundaries
else """NULL AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth"""
)

query = f"""
SELECT
RUN.ID AS id_run,
Expand All @@ -191,7 +247,8 @@ def _read_unscored_data(self, con):
FEATURE_MS1.AREA_INTENSITY AS aggr_prec_Peak_Area,
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth
FEATURE.RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
{score_sql}
FROM PRECURSOR
INNER JOIN PRECURSOR_PEPTIDE_MAPPING ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
Expand Down Expand Up @@ -224,6 +281,34 @@ def _read_peptidoform_data(self, con, cfg):
"""Read data with peptidoform IPF information."""
score_ms1_pep, link_ms1 = self._get_ms1_score_info(con)

# IM columns may or may not be present; centralised checks
has_im_boundaries = self._has_im_boundaries(con)
has_im = self._has_im(con)

im_cols_sql = (
(
"""FEATURE.EXP_IM AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth,"""
)
if has_im and has_im_boundaries
else (
"""FEATURE.EXP_IM AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth,"""
)
if has_im and not has_im_boundaries
else (
"""NULL AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth,"""
)
if (not has_im) and has_im_boundaries
else """NULL AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth,"""
)

query = f"""
SELECT RUN.ID AS id_run,
PEPTIDE.ID AS id_peptide,
Expand All @@ -247,6 +332,7 @@ def _read_peptidoform_data(self, con, cfg):
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
{score_ms1_pep} AS ms1_pep,
SCORE_MS2.PEP AS ms2_pep,
SCORE_IPF.PRECURSOR_PEAKGROUP_PEP AS precursor_pep,
Expand Down Expand Up @@ -275,6 +361,34 @@ def _read_augmented_data(self, con, cfg):
"""Read standard data augmented with IPF information."""
score_ms1_pep, link_ms1 = self._get_ms1_score_info(con)

# IM columns may or may not be present; centralised checks
has_im_boundaries = self._has_im_boundaries(con)
has_im = self._has_im(con)

im_cols_sql = (
(
"""FEATURE.EXP_IM AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth,"""
)
if has_im and has_im_boundaries
else (
"""FEATURE.EXP_IM AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth,"""
)
if has_im and not has_im_boundaries
else (
"""NULL AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth,"""
)
if (not has_im) and has_im_boundaries
else """NULL AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth,"""
)

query = f"""
SELECT RUN.ID AS id_run,
PEPTIDE.ID AS id_peptide,
Expand All @@ -298,6 +412,7 @@ def _read_augmented_data(self, con, cfg):
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
SCORE_MS2.RANK AS peak_group_rank,
SCORE_MS2.SCORE AS d_score,
SCORE_MS2.QVALUE AS m_score,
Expand Down Expand Up @@ -326,6 +441,17 @@ def _read_standard_data(self, con, cfg):
# Check if we should attempt alignment integration
use_alignment = cfg.use_alignment and self._check_alignment_presence(con)

# IM boundary columns may or may not be present; centralised check
has_im_boundaries = self._has_im_boundaries(con)

im_cols_sql = (
"""FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth,"""
if has_im_boundaries
else """NULL AS IM_leftWidth,
NULL AS IM_rightWidth,"""
)

# First, get features that pass MS2 QVALUE threshold
query = f"""
SELECT RUN.ID AS id_run,
Expand All @@ -350,6 +476,7 @@ def _read_standard_data(self, con, cfg):
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
SCORE_MS2.RANK AS peak_group_rank,
SCORE_MS2.SCORE AS d_score,
SCORE_MS2.QVALUE AS m_score,
Expand Down
88 changes: 87 additions & 1 deletion pyprophet/io/export/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ def _read_unscored_data(self, con) -> pd.DataFrame:
"""
feature_vars_sql = self._build_feature_vars_sql()

# IM columns may or may not be present in the parquet file
has_im = "EXP_IM" in self._columns
has_im_boundaries = (
"IM_leftWidth" in self._columns and "IM_rightWidth" in self._columns
)

im_cols_sql = (
(
"EXP_IM AS EXP_IM, IM_leftWidth AS IM_leftWidth, IM_rightWidth AS IM_rightWidth"
)
if has_im and has_im_boundaries
else ("EXP_IM AS EXP_IM, NULL AS IM_leftWidth, NULL AS IM_rightWidth")
if has_im and not has_im_boundaries
else (
"NULL AS EXP_IM, IM_leftWidth AS IM_leftWidth, IM_rightWidth AS IM_rightWidth"
)
if (not has_im) and has_im_boundaries
else "NULL AS EXP_IM, NULL AS IM_leftWidth, NULL AS IM_rightWidth"
)

query = f"""
SELECT
RUN_ID AS id_run,
Expand All @@ -115,7 +135,8 @@ def _read_unscored_data(self, con) -> pd.DataFrame:
FEATURE_MS1_AREA_INTENSITY AS aggr_prec_Peak_Area,
FEATURE_MS1_APEX_INTENSITY AS aggr_prec_Peak_Apex,
LEFT_WIDTH AS leftWidth,
RIGHT_WIDTH AS rightWidth
RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
{feature_vars_sql}
FROM data
WHERE PROTEIN_ID IS NOT NULL -- Filter to precursor rows
Expand All @@ -129,6 +150,26 @@ def _read_peptidoform_data(self, con) -> pd.DataFrame:
"""
score_ms1_pep, _link_ms1 = self._get_ms1_score_info()

# IM columns may or may not be present in the parquet file
has_im = "EXP_IM" in self._columns
has_im_boundaries = (
"IM_leftWidth" in self._columns and "IM_rightWidth" in self._columns
)

im_cols_sql = (
(
"EXP_IM AS EXP_IM, IM_leftWidth AS IM_leftWidth, IM_rightWidth AS IM_rightWidth"
)
if has_im and has_im_boundaries
else ("EXP_IM AS EXP_IM, NULL AS IM_leftWidth, NULL AS IM_rightWidth")
if has_im and not has_im_boundaries
else (
"NULL AS EXP_IM, IM_leftWidth AS IM_leftWidth, IM_rightWidth AS IM_rightWidth"
)
if (not has_im) and has_im_boundaries
else "NULL AS EXP_IM, NULL AS IM_leftWidth, NULL AS IM_rightWidth"
)

query = f"""
SELECT
RUN_ID AS id_run,
Expand All @@ -153,6 +194,7 @@ def _read_peptidoform_data(self, con) -> pd.DataFrame:
FEATURE_MS1_APEX_INTENSITY AS aggr_prec_Peak_Apex,
LEFT_WIDTH AS leftWidth,
RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
{score_ms1_pep} AS ms1_pep,
SCORE_MS2_PEP AS ms2_pep,
SCORE_IPF_PRECURSOR_PEAKGROUP_PEP AS precursor_pep,
Expand All @@ -175,6 +217,26 @@ def _read_augmented_data(self, con) -> pd.DataFrame:
"""
score_ms1_pep, _link_ms1 = self._get_ms1_score_info()

# IM columns may or may not be present in the parquet file
has_im = "EXP_IM" in self._columns
has_im_boundaries = (
"IM_leftWidth" in self._columns and "IM_rightWidth" in self._columns
)

im_cols_sql = (
(
"EXP_IM AS EXP_IM, IM_leftWidth AS IM_leftWidth, IM_rightWidth AS IM_rightWidth"
)
if has_im and has_im_boundaries
else ("EXP_IM AS EXP_IM, NULL AS IM_leftWidth, NULL AS IM_rightWidth")
if has_im and not has_im_boundaries
else (
"NULL AS EXP_IM, IM_leftWidth AS IM_leftWidth, IM_rightWidth AS IM_rightWidth"
)
if (not has_im) and has_im_boundaries
else "NULL AS EXP_IM, NULL AS IM_leftWidth, NULL AS IM_rightWidth"
)

# First get main data
query = f"""
SELECT
Expand All @@ -200,6 +262,7 @@ def _read_augmented_data(self, con) -> pd.DataFrame:
FEATURE_MS1_APEX_INTENSITY AS aggr_prec_Peak_Apex,
LEFT_WIDTH AS leftWidth,
RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
SCORE_MS2_PEAK_GROUP_RANK AS peak_group_rank,
SCORE_MS2_SCORE AS d_score,
SCORE_MS2_Q_VALUE AS m_score,
Expand Down Expand Up @@ -262,6 +325,26 @@ def _read_standard_data(self, con) -> pd.DataFrame:
use_alignment = self.config.use_alignment and self._has_alignment

# First, get features that pass MS2 QVALUE threshold
# IM columns may or may not be present in the parquet file
has_im = "EXP_IM" in self._columns
has_im_boundaries = (
"IM_leftWidth" in self._columns and "IM_rightWidth" in self._columns
)

im_cols_sql = (
(
"EXP_IM AS EXP_IM, IM_leftWidth AS IM_leftWidth, IM_rightWidth AS IM_rightWidth,"
)
if has_im and has_im_boundaries
else ("EXP_IM AS EXP_IM, NULL AS IM_leftWidth, NULL AS IM_rightWidth,")
if has_im and not has_im_boundaries
else (
"NULL AS EXP_IM, IM_leftWidth AS IM_leftWidth, IM_rightWidth AS IM_rightWidth,"
)
if (not has_im) and has_im_boundaries
else "NULL AS EXP_IM, NULL AS IM_leftWidth, NULL AS IM_rightWidth,"
)

query = f"""
SELECT
RUN_ID AS id_run,
Expand All @@ -286,6 +369,7 @@ def _read_standard_data(self, con) -> pd.DataFrame:
FEATURE_MS1_APEX_INTENSITY AS aggr_prec_Peak_Apex,
LEFT_WIDTH AS leftWidth,
RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
SCORE_MS2_PEAK_GROUP_RANK AS peak_group_rank,
SCORE_MS2_SCORE AS d_score,
SCORE_MS2_Q_VALUE AS m_score,
Expand Down Expand Up @@ -334,6 +418,7 @@ def _read_standard_data(self, con) -> pd.DataFrame:
aligned_ids_df = pd.DataFrame({"id": new_aligned_ids})
con.register("aligned_ids_temp", aligned_ids_df)

# For recovered aligned features include IM columns the same way
aligned_query = f"""
SELECT
RUN_ID AS id_run,
Expand All @@ -358,6 +443,7 @@ def _read_standard_data(self, con) -> pd.DataFrame:
FEATURE_MS1_APEX_INTENSITY AS aggr_prec_Peak_Apex,
LEFT_WIDTH AS leftWidth,
RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
SCORE_MS2_PEAK_GROUP_RANK AS peak_group_rank,
SCORE_MS2_SCORE AS d_score,
SCORE_MS2_Q_VALUE AS m_score
Expand Down
Loading