|
| 1 | +# stdlib |
| 2 | +from itertools import chain |
| 3 | +from typing import Iterable, Iterator, Tuple |
| 4 | + |
| 5 | +# 3rd party |
| 6 | +from chemistry_tools.formulae import Formula |
| 7 | +from domdf_python_tools.words import word_join |
| 8 | +from pyms.BillerBiemann import get_maxima_indices, num_ions_threshold |
| 9 | +from pyms.eic import ExtractedIntensityMatrix, build_extracted_intensity_matrix |
| 10 | +from pyms.IntensityMatrix import IntensityMatrix |
| 11 | +from pyms.Noise.Analysis import window_analyzer |
| 12 | +from pyms.Peak import Peak |
| 13 | + |
| 14 | +# this package |
| 15 | +from pyms_lc_esi.adducts import Adduct, get_adduct_spectra |
| 16 | + |
| 17 | +__all__ = ["make_im_for_adducts", "peak_finder", "peaks_from_maxima", "sum_area"] |
| 18 | + |
| 19 | + |
| 20 | +def sum_area( |
| 21 | + apex_index: int, |
| 22 | + e_im: ExtractedIntensityMatrix, |
| 23 | + ) -> Tuple[float, int, int]: |
| 24 | + """ |
| 25 | + Returns the area and absolute bounds (as scans) for the peak with the apex at ``apex_index``. |
| 26 | +
|
| 27 | + .. TODO:: explain how bounds are determined |
| 28 | +
|
| 29 | + :param apex_index: The scan index of the apex of the peak. |
| 30 | + :param e_im: |
| 31 | + """ |
| 32 | + |
| 33 | + # print(peak, peak.rt / 60) |
| 34 | + |
| 35 | + # print(f"Apexes at {apex_index}") |
| 36 | + # print("Intensity at apex:") |
| 37 | + |
| 38 | + apex_intensity = sum(e_im.intensity_array[apex_index]) |
| 39 | + rhs_area = lhs_area = last_intensity = apex_intensity |
| 40 | + left_bound = right_bound = apex_index |
| 41 | + bound_area_tolerance = 0.0005 / 2 # half of 0.05 % |
| 42 | + |
| 43 | + for idx_offset, scan in enumerate(e_im.intensity_array[apex_index + 1:]): |
| 44 | + scan_intensity = sum(scan) |
| 45 | + |
| 46 | + if scan_intensity <= last_intensity and scan_intensity > (rhs_area * bound_area_tolerance): |
| 47 | + last_intensity = scan_intensity |
| 48 | + rhs_area += scan_intensity |
| 49 | + right_bound = idx_offset + apex_index + 1 |
| 50 | + else: |
| 51 | + break |
| 52 | + |
| 53 | + last_intensity = apex_intensity |
| 54 | + |
| 55 | + for idx_offset, scan in enumerate(reversed(e_im.intensity_array[:apex_index])): |
| 56 | + scan_intensity = sum(scan) |
| 57 | + |
| 58 | + if scan_intensity <= last_intensity and scan_intensity > (lhs_area * bound_area_tolerance): |
| 59 | + last_intensity = scan_intensity |
| 60 | + lhs_area += scan_intensity |
| 61 | + left_bound = (apex_index - idx_offset) - 1 |
| 62 | + else: |
| 63 | + break |
| 64 | + |
| 65 | + area = lhs_area + rhs_area - apex_intensity # apex intensity was counted for each half |
| 66 | + |
| 67 | + # print("Peak area:") |
| 68 | + # print(area) |
| 69 | + |
| 70 | + # print(f"right bound: {right_bound}", eic.get_time_at_index(right_bound) / 60) |
| 71 | + # print(f"left bound: {left_bound}", eic.get_time_at_index(left_bound) / 60) |
| 72 | + return area, left_bound, right_bound |
| 73 | + |
| 74 | + |
| 75 | +def peaks_from_maxima(e_im: ExtractedIntensityMatrix, points=3): |
| 76 | + """ |
| 77 | +
|
| 78 | + :param e_im: |
| 79 | + :param points: |
| 80 | + """ |
| 81 | + |
| 82 | + # TODO: combine close peaks, via scans param. |
| 83 | + intensity_list = [] |
| 84 | + peak_list = [] |
| 85 | + |
| 86 | + for row in e_im._intensity_array: |
| 87 | + intensity_list.append(sum(row)) |
| 88 | + |
| 89 | + for apex_idx in get_maxima_indices(intensity_list, points=points): |
| 90 | + rt = e_im.get_time_at_index(apex_idx) |
| 91 | + ms = e_im.get_ms_at_index(apex_idx) |
| 92 | + peak = Peak(rt, ms) |
| 93 | + peak.bounds = (0, apex_idx, 0) |
| 94 | + |
| 95 | + peak_list.append(peak) |
| 96 | + |
| 97 | + return peak_list |
| 98 | + |
| 99 | + |
| 100 | +def make_im_for_adducts( |
| 101 | + im: IntensityMatrix, |
| 102 | + analyte: Formula, |
| 103 | + adducts: Iterable[Adduct], |
| 104 | + left_bound: float = 0.1, |
| 105 | + right_bound: float = 0.1, |
| 106 | + ): |
| 107 | + """ |
| 108 | + Conxtructs a :class:`pyms.eic.ExtractedIntensityMatrix` for the given adducts of the analyte. |
| 109 | +
|
| 110 | + :param im: |
| 111 | + :param analyte: |
| 112 | + :param adducts: |
| 113 | + :param left_bound: |
| 114 | + :param right_bound: |
| 115 | + """ |
| 116 | + |
| 117 | + # Compile a list of masses for the adducts |
| 118 | + spectra = get_adduct_spectra(analyte, adducts) |
| 119 | + |
| 120 | + print(spectra) |
| 121 | + all_masses = sorted(set(chain.from_iterable(spectrum.mass_list for spectrum in spectra.values()))) |
| 122 | + |
| 123 | + print(f"Constructing ExtractedIntensityMatrix for m/z {word_join(map(str, all_masses))}") |
| 124 | + |
| 125 | + # Construct the extracted intensity matrix for the adducts |
| 126 | + return build_extracted_intensity_matrix(im, masses=all_masses, left_bound=left_bound, right_bound=right_bound) |
| 127 | + |
| 128 | + |
| 129 | +def peak_finder(e_im: ExtractedIntensityMatrix, points: int = 3) -> Iterator[Peak]: |
| 130 | + |
| 131 | + # Find peaks |
| 132 | + # peaks = BillerBiemann(e_im, points=8, scans=3) |
| 133 | + peaks = peaks_from_maxima(e_im, points=points) |
| 134 | + |
| 135 | + eic = e_im.eic |
| 136 | + |
| 137 | + # Filter small peaks from peak list |
| 138 | + noise_level = window_analyzer(eic) |
| 139 | + print(f"Filtering peaks with fewer than {2}/{len(e_im.mass_list)} masses.") |
| 140 | + print("noise_level:", noise_level) |
| 141 | + filtered_peak_list = num_ions_threshold(peaks, n=2, cutoff=noise_level) |
| 142 | + # filtered_peak_list = num_ions_threshold(peaks, n=1, cutoff=noise_level) |
| 143 | + # filtered_peak_list = num_ions_threshold(peaks, n=3, cutoff=5000) |
| 144 | + # filtered_peak_list = peaks |
| 145 | + |
| 146 | + for peak in filtered_peak_list[::-1]: |
| 147 | + apex_index = eic.get_index_at_time(peak.rt) |
| 148 | + |
| 149 | + # Estimate peak area |
| 150 | + peak.area, left_bound, right_bound = sum_area(apex_index, e_im) |
| 151 | + |
| 152 | + # Assign bounds to peak as offsets. |
| 153 | + peak.bounds = [apex_index - left_bound, apex_index, right_bound - apex_index] |
| 154 | + |
| 155 | + if (right_bound - left_bound) > 3 and peak.area > 1000: |
| 156 | + # TODO: make 1000 dependent on data |
| 157 | + |
| 158 | + yield peak |
| 159 | + |
| 160 | + |
| 161 | +# def fill_peak(eic: ExtractedIonChromatogram, peak: Peak, ax: Optional[Axes] = None) -> PolyCollection: |
| 162 | +# if ax is None: |
| 163 | +# # 3rd party |
| 164 | +# import matplotlib.pyplot |
| 165 | +# ax = matplotlib.pyplot.gca() |
| 166 | + |
| 167 | +# left_bound, apex, right_bound = peak.bounds |
| 168 | +# left_bound = apex - left_bound |
| 169 | +# right_bound = apex + right_bound |
| 170 | + |
| 171 | +# # Fill in the peak |
| 172 | +# time_range = [eic.get_time_at_index(idx) / 60 for idx in (range(left_bound, right_bound + 1))] |
| 173 | +# intensities = eic.intensity_array.tolist()[left_bound:right_bound + 1] |
| 174 | +# return ax.fill_between(time_range, intensities) |
0 commit comments