Skip to content

Commit

Permalink
Merge pull request #85 from wilhelm-lab/feature/non_cleavable_XL
Browse files Browse the repository at this point in the history
Feature/non cleavable xl
  • Loading branch information
mostafakalhor authored Apr 17, 2024
2 parents 405f42d + 5358f58 commit 434f41e
Show file tree
Hide file tree
Showing 21 changed files with 393,284 additions and 446 deletions.
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ def activate_virtualenv_in_precommit_hooks(session: Session) -> None:
session's virtual environment. This allows pre-commit to locate hooks in
that environment when invoked from git.
Args:
session: The Session object.
:param session: The Session object.
"""
assert session.bin is not None # noqa: S101

Expand Down Expand Up @@ -111,6 +110,7 @@ def precommit(session: Session) -> None:
"flake8-docstrings",
"flake8-rst-docstrings",
"isort",
"darglint",
"pep8-naming",
"pre-commit",
"pre-commit-hooks",
Expand Down
246 changes: 234 additions & 12 deletions spectrum_fundamentals/annotation/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd

from spectrum_fundamentals import constants
from spectrum_fundamentals.fragments import initialize_peaks
from spectrum_fundamentals.fragments import initialize_peaks, initialize_peaks_xl

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -148,11 +148,126 @@ def annotate_spectra(
continue
raw_file_annotations.append(results)
results_df = pd.DataFrame(raw_file_annotations)
results_df.columns = ["INTENSITIES", "MZ", "CALCULATED_MASS", "removed_peaks"]

if "CROSSLINKER_TYPE" not in index_columns:
results_df.columns = ["INTENSITIES", "MZ", "CALCULATED_MASS", "removed_peaks"]
else:
results_df.columns = [
"INTENSITIES_A",
"INTENSITIES_B",
"MZ_A",
"MZ_B",
"CALCULATED_MASS_A",
"CALCULATED_MASS_B",
"removed_peaks_a",
"removed_peaks_b",
]
return results_df


def peak_pos_xl_cms2(unmod_seq: str, crosslinker_position: int) -> np.ndarray:
"""
Determines the positions of all potential normal and xl fragments within the vector generated by generate_annotation_matrix.
This function is used only for cleavable crosslinked peptides.
:param unmod_seq: Unmodified peptide sequence
:param crosslinker_position: The position of the crosslinker
:raises ValueError: if peptides exceed a length of 30
:return: position of different fragments as list
"""
xl_mask_array = np.full(constants.VEC_LENGTH_CMS2, -1.0)

if len(unmod_seq) < constants.SEQ_LEN + 1:
if crosslinker_position != 1:
# b peaks
peaks_b = np.tile([3, 4, 5], crosslinker_position - 1) + np.repeat(
np.arange(crosslinker_position - 1) * 6, 3
)
xl_mask_array[peaks_b] = 0.0

# ylong peaks
first_pos_ylong = (len(unmod_seq) - crosslinker_position) * 6 + 174 # first position for ylong
peaks_ylong = np.arange(first_pos_ylong, first_pos_ylong + 3)
peaks_ylong = np.tile(peaks_ylong, crosslinker_position - 1)
peaks_ylong += np.repeat(np.arange(crosslinker_position - 1) * 6, 3)

xl_mask_array[peaks_ylong] = 0.0

# yshort peaks
xl_mask_array[[x - 174 for x in peaks_ylong]] = 0.0

if len(unmod_seq) != crosslinker_position:

# y peaks
peaks_y = np.tile([0, 1, 2], len(unmod_seq) - crosslinker_position) + np.repeat(
np.arange(len(unmod_seq) - crosslinker_position) * 6, 3
)
xl_mask_array[peaks_y] = 0.0

# blong peaks
first_pos_blong = (crosslinker_position - 1) * 6 + 174 + 3 # first position for blong
peaks_blong = np.arange(first_pos_blong, first_pos_blong + 3)
peaks_blong = np.tile(peaks_blong, len(unmod_seq) - crosslinker_position)
peaks_blong += np.repeat(np.arange(len(unmod_seq) - crosslinker_position) * 6, 3)
xl_mask_array[peaks_blong] = 0.0

# bshort peaks
xl_mask_array[[x - 174 for x in peaks_blong]] = 0.0

else:
raise ValueError(f"Peptides exceeding a length of 30 are not supported: {len(unmod_seq)}")

return xl_mask_array


def generate_annotation_matrix_xl(
matched_peaks: pd.DataFrame, unmod_seq: str, crosslinker_position: int
) -> Tuple[np.ndarray, np.ndarray]:
"""
Generate the annotation matrix in the xl_prosit format from matched peaks.
:param matched_peaks: matched peaks needed to be converted
:param unmod_seq: unmodified peptide sequence
:param crosslinker_position: position of crosslinker
:return: numpy array of intensities and numpy array of masses
"""
intensity = peak_pos_xl_cms2(unmod_seq, crosslinker_position)
mass = intensity.copy()

ion_type = matched_peaks.columns.get_loc("ion_type")
no_col = matched_peaks.columns.get_loc("no")
charge_col = matched_peaks.columns.get_loc("charge")
intensity_col = matched_peaks.columns.get_loc("intensity")
exp_mass_col = matched_peaks.columns.get_loc("exp_mass")

for peak in matched_peaks.values:
if peak[ion_type] == "y":
peak_pos = ((peak[no_col] - 1) * 6) + (peak[charge_col] - 1)
elif peak[ion_type] == "b":
peak_pos = ((peak[no_col] - 1) * 6) + (peak[charge_col] - 1) + 3
elif peak[ion_type] == "y-short":
peak_pos = ((peak[no_col] - 1) * 6) + (peak[charge_col] - 1)
elif peak[ion_type] == "b-short":
peak_pos = ((peak[no_col] - 1) * 6) + (peak[charge_col] - 1) + 3
elif peak[ion_type] == "y-long":
peak_pos = ((peak[no_col] - 1) * 6) + (peak[charge_col] - 1) + 174
else:
peak_pos = ((peak[no_col] - 1) * 6) + (peak[charge_col] - 1) + 174 + 3

if peak_pos >= constants.VEC_LENGTH_CMS2:
continue
intensity[peak_pos] = peak[intensity_col]
mass[peak_pos] = peak[exp_mass_col]

# convert elements representing charge 3 to -1 (we do not anotate +3)
index_charge_3 = range(2, constants.VEC_LENGTH_CMS2, 3)
intensity[index_charge_3] = -1
mass[index_charge_3] = -1

return intensity, mass


def generate_annotation_matrix(
matched_peaks: pd.DataFrame, unmod_seq: str, charge: int
) -> Tuple[np.ndarray, np.ndarray]:
Expand Down Expand Up @@ -190,7 +305,7 @@ def generate_annotation_matrix(
exp_mass_col = matched_peaks.columns.get_loc("exp_mass")

for peak in matched_peaks.values:
if peak[ion_type] == "y":
if peak[ion_type].startswith("y"):
peak_pos = ((peak[no_col] - 1) * 6) + (peak[charge_col] - 1)
else:
peak_pos = ((peak[no_col] - 1) * 6) + (peak[charge_col] - 1) + 3
Expand All @@ -210,17 +325,23 @@ def generate_annotation_matrix(

def parallel_annotate(
spectrum: np.ndarray,
index_columns: dict,
index_columns: Dict[str, int],
mass_tolerance: Optional[float] = None,
unit_mass_tolerance: Optional[str] = None,
) -> Optional[Tuple[np.ndarray, np.ndarray, float, int]]:
) -> Optional[
Union[
Tuple[np.ndarray, np.ndarray, float, int],
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, float, float, int, int],
]
]:
"""
Perform parallel annotation of a spectrum.
This function takes a spectrum and its index columns and performs parallel annotation of the spectrum. It starts by
initializing the peaks and extracting necessary data from the spectrum. It then matches the peaks to the spectrum and
generates an annotation matrix based on the matched peaks. If there are multiple matches found, it removes the redundant
matches. Finally, it returns annotated spectrum with meta data including intensity values, masses, calculated masses,
This function takes a spectrum and its index columns and performs parallel annotation of the spectrum.
It starts by initializing the peaks and extracting necessary data from the spectrum.
It then matches the peaks to the spectrum and generates an annotation matrix based on the matched peaks.
If there are multiple matches found, it removes the redundant matches.
Finally, it returns annotated spectrum with meta data including intensity values, masses, calculated masses,
and any peaks that were removed. The function is designed to run in different threads to speed up the annotation pipeline.
:param spectrum: a np.ndarray that contains the spectrum to be annotated
Expand All @@ -230,19 +351,44 @@ def parallel_annotate(
:return: a tuple containing intensity values (np.ndarray), masses (np.ndarray), calculated mass (float),
and any removed peaks (List[str])
"""
xl_type_col = index_columns.get("CROSSLINKER_TYPE")
if xl_type_col is None:
if spectrum[index_columns["PEPTIDE_LENGTH"]] > 30: # this was in initialize peaks but can be checked prior
return None
return _annotate_linear_spectrum(spectrum, index_columns, mass_tolerance, unit_mass_tolerance)

if (spectrum[index_columns["PEPTIDE_LENGTH_A"]] > 30) or (spectrum[index_columns["PEPTIDE_LENGTH_B"]] > 30):
return None
return _annotate_crosslinked_spectrum(
spectrum, index_columns, spectrum[xl_type_col], mass_tolerance, unit_mass_tolerance
)


def _annotate_linear_spectrum(
spectrum: np.ndarray,
index_columns: Dict[str, int],
mass_tolerance: Optional[float],
unit_mass_tolerance: Optional[str],
):
"""
Annotate a linear peptide spectrum.
:param spectrum: Spectrum to be annotated
:param index_columns: Index columns of the spectrum
:param mass_tolerance: Mass tolerance for calculating min and max mass
:param unit_mass_tolerance: Unit for the mass tolerance (da or ppm)
:return: Annotated spectrum
"""
mod_seq_column = "MODIFIED_SEQUENCE"
if "MODIFIED_SEQUENCE_MSA" in index_columns:
mod_seq_column = "MODIFIED_SEQUENCE_MSA"

fragments_meta_data, tmt_n_term, unmod_sequence, calc_mass = initialize_peaks(
spectrum[index_columns[mod_seq_column]],
spectrum[index_columns["MASS_ANALYZER"]],
spectrum[index_columns["PRECURSOR_CHARGE"]],
mass_tolerance,
unit_mass_tolerance,
)
if not unmod_sequence:
return None
matched_peaks = match_peaks(
fragments_meta_data,
spectrum[index_columns["INTENSITIES"]],
Expand All @@ -251,12 +397,88 @@ def parallel_annotate(
unmod_sequence,
spectrum[index_columns["PRECURSOR_CHARGE"]],
)

if len(matched_peaks) == 0:
intensity = np.full(174, 0.0)
mass = np.full(174, 0.0)
return intensity, mass, calc_mass, 0

matched_peaks, removed_peaks = handle_multiple_matches(matched_peaks)
intensities, mass = generate_annotation_matrix(
matched_peaks, unmod_sequence, spectrum[index_columns["PRECURSOR_CHARGE"]]
)
return intensities, mass, calc_mass, removed_peaks


def _annotate_crosslinked_spectrum(
spectrum: np.ndarray,
index_columns: Dict[str, int],
crosslinker_type: str,
mass_tolerance: Optional[float] = None,
unit_mass_tolerance: Optional[str] = None,
):
"""
Annotate a crosslinked peptide spectrum.
:param spectrum: Spectrum to be annotated
:param index_columns: Index columns of the spectrum
:param crosslinker_type: Type of crosslinker used
:param mass_tolerance: Mass tolerance for calculating min and max mass
:param unit_mass_tolerance: Unit for the mass tolerance (da or ppm)
:raises ValueError: if unsupported crosslinker type was supplied.
:return: Annotated spectrum
"""
if crosslinker_type in ["BS3", "DSS"]:
non_cl_xl = True
elif crosslinker_type in ["DSSO", "DSBU", "BUURBU"]:
non_cl_xl = False
else:
raise ValueError(f"Unsupported crosslinker type provided: {crosslinker_type}")

def _xl_annotation_workflow(seq_id: str, non_cl_xl: bool):
inputs = [
spectrum[index_columns[f"MODIFIED_SEQUENCE_{seq_id[0]}"]],
spectrum[index_columns["MASS_ANALYZER"]],
spectrum[index_columns[f"CROSSLINKER_POSITION_{seq_id[0]}"]],
crosslinker_type,
mass_tolerance,
unit_mass_tolerance,
]
if non_cl_xl:
inputs.append(spectrum[index_columns[f"MODIFIED_SEQUENCE_{seq_id[1]}"]])
array_size = 174

else:
array_size = 348
fragments_meta_data, tmt_n_term, unmod_sequence, calc_mass = initialize_peaks_xl(*inputs)
matched_peaks = match_peaks(
fragments_meta_data,
np.array(spectrum[index_columns["INTENSITIES"]]),
np.array(spectrum[index_columns["MZ"]]), # Convert to numpy array
tmt_n_term,
unmod_sequence,
spectrum[index_columns["PRECURSOR_CHARGE"]],
)

if len(matched_peaks) == 0:
intensities = np.full(array_size, 0.0)
mass = np.full(array_size, 0.0)
removed_peaks = 0
else:
matched_peaks, removed_peaks = handle_multiple_matches(matched_peaks)
if non_cl_xl:
intensities, mass = generate_annotation_matrix(
matched_peaks, unmod_sequence, spectrum[index_columns["PRECURSOR_CHARGE"]]
)
else:
intensities, mass = generate_annotation_matrix_xl(
matched_peaks, unmod_sequence, spectrum[index_columns[f"CROSSLINKER_POSITION_{seq_id[0]}"]]
)

return intensities, mass, removed_peaks, calc_mass

intensities_a, mass_a, removed_peaks_a, calc_mass_a = _xl_annotation_workflow(seq_id="AB", non_cl_xl=non_cl_xl)
intensities_b, mass_b, removed_peaks_b, calc_mass_b = _xl_annotation_workflow(seq_id="BA", non_cl_xl=non_cl_xl)

return intensities_a, intensities_b, mass_a, mass_b, calc_mass_a, calc_mass_b, removed_peaks_a, removed_peaks_b
Loading

0 comments on commit 434f41e

Please sign in to comment.