|
| 1 | +""" |
| 2 | +Classes for the error models used in the simulation. This will generate errors of the type contained in |
| 3 | +variant_models.py, each of which is based on a type from the variants submodule. |
| 4 | +Also contained is a simple container for storing errors, and a subclass for the simulation type to be performed. |
| 5 | +Currently, only the traditional type is supported, but we are working on adding a Markov chain based model in the near |
| 6 | +future. |
| 7 | +""" |
| 8 | + |
| 9 | +import logging |
| 10 | + |
| 11 | +from numpy.random import Generator |
| 12 | +from Bio.Seq import Seq |
| 13 | +from Bio import SeqRecord |
| 14 | + |
| 15 | +from neat import variants |
| 16 | + |
| 17 | +from ..common import ALLOWED_NUCL, NUC_IND |
| 18 | +from .default_mutation_model import * |
| 19 | +from .default_sequencing_error_model import * |
| 20 | +from .variant_models import InsertionModel, DeletionModel, SnvModel |
| 21 | + |
| 22 | +__all__ = [ |
| 23 | + "ErrorContainer" |
| 24 | +] |
| 25 | + |
| 26 | +_LOG = logging.getLogger(__name__) |
| 27 | + |
| 28 | + |
| 29 | +class TraditionalModel: |
| 30 | + """ |
| 31 | + This is the traditional NEAT model for generating quality scores. This will be replaced by |
| 32 | + a Markov model or an optional markov model |
| 33 | +
|
| 34 | + :param transition_matrix: 2x2 matrix that gives the probability of each base transitioning to another. |
| 35 | + :param quality_scores: numpy array of ints of the PHRED quality scores possible from the sequencing machine |
| 36 | + :param qual_score_probs: At each position along the read_length, this gives the mean and standard deviation of |
| 37 | + quality scores read from the dataset used to construct the model. |
| 38 | + :param is_uniform: Some machines use uniform quality scores. This makes simulation a little easier. |
| 39 | + """ |
| 40 | + |
| 41 | + def __init__(self, |
| 42 | + transition_matrix: np.ndarray = default_error_transition_matrix, |
| 43 | + quality_scores: np.ndarray = default_quality_scores, |
| 44 | + qual_score_probs: np.ndarray = default_qual_score_probs, |
| 45 | + is_uniform: bool = False, |
| 46 | + ): |
| 47 | + |
| 48 | + self.transition_matrix = transition_matrix |
| 49 | + self.quality_scores = quality_scores |
| 50 | + self.quality_score_probabilities = qual_score_probs |
| 51 | + self.is_uniform = is_uniform |
| 52 | + |
| 53 | + # pre-compute the error rate for each quality score. This is the inverse of the phred score equation |
| 54 | + self.quality_score_error_rate: dict[int, float] = {x: 10. ** (-x / 10) for x in self.quality_scores} |
| 55 | + |
| 56 | + self.uniform_quality_score = None |
| 57 | + if self.is_uniform: |
| 58 | + # Set score to the lowest of the max of the quality scores and the input avg error. |
| 59 | + self.uniform_quality_score = min([max(self.quality_scores), int(-10. * np.log10(self.average_error) + 0.5)]) |
| 60 | + |
| 61 | + def get_quality_scores(self, |
| 62 | + input_read_length: int, |
| 63 | + orig_read_len: int, |
| 64 | + rng |
| 65 | + ) -> list: |
| 66 | + """ |
| 67 | + Takes a read_length and rng and returns an array of quality scores |
| 68 | +
|
| 69 | + :param input_read_length: The desired length of the quality score array |
| 70 | + :param orig_read_len: the original read length for the model |
| 71 | + :param rng: random number generator. |
| 72 | + :return: An array of quality scores. |
| 73 | + """ |
| 74 | + if self.uniform_quality_score: |
| 75 | + return [self.uniform_quality_score] * input_read_length |
| 76 | + else: |
| 77 | + quality_index_map = self.quality_index_remap(input_read_length, orig_read_len) |
| 78 | + temp_qual_array = [] |
| 79 | + for i in quality_index_map: |
| 80 | + score = rng.normal( |
| 81 | + self.quality_score_probabilities[i][0], |
| 82 | + scale=self.quality_score_probabilities[i][1] |
| 83 | + ) |
| 84 | + # make sure score is in range and an int |
| 85 | + score = round(score) |
| 86 | + if score > 42: |
| 87 | + score = 42 |
| 88 | + if score < 1: |
| 89 | + score = 1 |
| 90 | + |
| 91 | + temp_qual_array.append(score) |
| 92 | + |
| 93 | + return temp_qual_array |
| 94 | + |
| 95 | + def quality_index_remap(self, input_read_length, original_read_length): |
| 96 | + """ |
| 97 | + Adjusts the quality map to the suitable read length. |
| 98 | +
|
| 99 | + :param input_read_length: The desired length for the current read. |
| 100 | + :param original_read_length: The set read-length for the model. |
| 101 | + :return: An index map from the default read length to the new one. |
| 102 | + """ |
| 103 | + if input_read_length == original_read_length: |
| 104 | + return np.arange(original_read_length) |
| 105 | + else: |
| 106 | + # This is basically a way to evenly spread the distribution across the number of bases in the read |
| 107 | + return np.array([max([0, original_read_length * n // input_read_length]) for n in range(input_read_length)]) |
| 108 | + |
| 109 | + |
| 110 | +class SequencingErrorModel(SnvModel, DeletionModel, InsertionModel, TraditionalModel): |
| 111 | + """ |
| 112 | + This is a SequencingErrorModel class, based on the old SequencingError. This covers both errors and quality |
| 113 | + scores, since they are intimately related. There are three types of |
| 114 | + errors possible: substitutions, insertions, and deletions, similar to mutations, but |
| 115 | + simpler. Note that the three input probabilities must add up to 1.0 and the length the list of indel lengths |
| 116 | + must be equal to the length of its corresponding probabilities. |
| 117 | +
|
| 118 | + :param read_length: The read length derived from real data. |
| 119 | + :param rescale_qualities: If set to true, NEAT will attempt to rescale the qualities based on the input error |
| 120 | + model, rather than using the qualities derived from the real data. |
| 121 | + :param variant_probs: Probability dict for each valid variant type |
| 122 | + :param indel_len_model: Similar to mutation model, but simpler because errors tend not to be complicated. The |
| 123 | + three possible variant types for errors are Insertion, Deletion, and SNV |
| 124 | + :param rng: optional random number generator. For generating this model, no RNG is needed. But for a run, |
| 125 | + we'll need the rng to perform certain methods. |
| 126 | + :param avg_seq_error: A float giving the average rate of sequencing errors, |
| 127 | + either defined by data or user input. |
| 128 | + """ |
| 129 | + |
| 130 | + def __init__(self, |
| 131 | + read_length: int = default_read_length, |
| 132 | + variant_probs: dict[variants: float] = default_error_variant_probs, |
| 133 | + indel_len_model: dict[int: float] = default_indel_len_model, |
| 134 | + insertion_model: np.ndarray = default_insertion_model, |
| 135 | + rescale_qualities: bool = False, |
| 136 | + avg_seq_error: float = default_avg_seq_error, |
| 137 | + rng: Generator = None): |
| 138 | + |
| 139 | + SnvModel.__init__(self) |
| 140 | + InsertionModel.__init__(self, indel_len_model) |
| 141 | + DeletionModel.__init__(self, indel_len_model) |
| 142 | + |
| 143 | + self.variant_probs = variant_probs |
| 144 | + |
| 145 | + self.read_length = read_length |
| 146 | + self.read_length = read_length |
| 147 | + self.rescale_qualities = rescale_qualities |
| 148 | + self.insertion_model = insertion_model |
| 149 | + self.average_error = avg_seq_error |
| 150 | + self.rng = rng |
| 151 | + |
| 152 | + def get_sequencing_errors(self, |
| 153 | + read_length: int, |
| 154 | + padding: int, |
| 155 | + reference_segment: SeqRecord, |
| 156 | + quality_scores: np.ndarray, |
| 157 | + rng |
| 158 | + ): |
| 159 | + """ |
| 160 | + Inserts errors of type substitution, insertion, or deletion into read_data, and assigns a quality score |
| 161 | + based on the container model. |
| 162 | + :param read_length: The length of the read to generate errors for. |
| 163 | + :param padding: this is the amount of space we have in the read for deletions. |
| 164 | + :param reference_segment: The section of the reference from which the read is drawn |
| 165 | + :param quality_scores: Array of quality scores for the read |
| 166 | + :return: Modified sequence and associated quality scores |
| 167 | + :param rng: random number generator. |
| 168 | + """ |
| 169 | + |
| 170 | + error_indexes = [] |
| 171 | + introduced_errors = [] |
| 172 | + |
| 173 | + # The use case here would be someone running a simulation where they want no sequencing errors. |
| 174 | + # No need to run any loops in this case. |
| 175 | + if self.average_error == 0: |
| 176 | + return introduced_errors |
| 177 | + else: |
| 178 | + for i in range(read_length): |
| 179 | + if rng.random() < self.quality_score_error_rate[quality_scores[i]]: |
| 180 | + error_indexes.append(i) |
| 181 | + |
| 182 | + total_indel_length = 0 |
| 183 | + # To prevent deletion collisions |
| 184 | + del_blacklist = [] |
| 185 | + |
| 186 | + for index in error_indexes[::-1]: |
| 187 | + # determine error type. Most will be SNVs |
| 188 | + error_type = SingleNucleotideVariant |
| 189 | + |
| 190 | + # Not too sure about how realistic it is to model errors as indels, but I'm leaving the code in for now. |
| 191 | + |
| 192 | + # This is to prevent deletion error collisions and to keep there from being too many indel errors. |
| 193 | + if 0 < index < self.read_length - max( |
| 194 | + self.deletion_len_model) and total_indel_length > self.read_length // 4: |
| 195 | + error_type = self.rng.choice(a=list(self.variant_probs), p=list(self.variant_probs.values())) |
| 196 | + |
| 197 | + # Deletion error |
| 198 | + if error_type == Deletion: |
| 199 | + deletion_length = self.get_deletion_length() |
| 200 | + if padding - deletion_length < 0: |
| 201 | + # No space in this read to add this deletion |
| 202 | + continue |
| 203 | + deletion_reference = reference_segment.seq[index: index + deletion_length + 1] |
| 204 | + deletion_alternate = deletion_reference[0] |
| 205 | + introduced_errors.append( |
| 206 | + ErrorContainer(Deletion, index, deletion_length, deletion_reference, deletion_alternate) |
| 207 | + ) |
| 208 | + total_indel_length += deletion_length |
| 209 | + |
| 210 | + del_blacklist.extend(list(range(index, index + deletion_length))) |
| 211 | + padding -= deletion_length |
| 212 | + |
| 213 | + elif error_type == Insertion: |
| 214 | + insertion_length = self.get_insertion_length() |
| 215 | + insertion_reference = reference_segment[index] |
| 216 | + insert_string = ''.join(self.rng.choice(ALLOWED_NUCL, size=insertion_length)) |
| 217 | + insertion_alternate = insertion_reference + insert_string |
| 218 | + introduced_errors.append( |
| 219 | + ErrorContainer(Insertion, index, insertion_length, insertion_reference, insertion_alternate) |
| 220 | + ) |
| 221 | + total_indel_length += insertion_length |
| 222 | + |
| 223 | + # Insert substitution error |
| 224 | + # Programmer note: if you add new error types, they can be added as elifs above, leaving the final |
| 225 | + # else dedicated to SNVs. |
| 226 | + else: |
| 227 | + snv_reference = reference_segment[index] |
| 228 | + nuc_index = NUC_IND[snv_reference] |
| 229 | + # take the zero index because this returns a list of length 1. |
| 230 | + snv_alt = rng.choice(ALLOWED_NUCL, p=self.transition_matrix[nuc_index]) |
| 231 | + introduced_errors.append( |
| 232 | + ErrorContainer(SingleNucleotideVariant, index, 1, snv_reference, snv_alt) |
| 233 | + ) |
| 234 | + |
| 235 | + # Remove blacklisted errors |
| 236 | + for i in range(len(introduced_errors) - 1, -1, -1): |
| 237 | + if introduced_errors[i].location in del_blacklist: |
| 238 | + del introduced_errors[i] |
| 239 | + |
| 240 | + return introduced_errors, max(padding, 0) |
| 241 | + |
| 242 | + def generate_quality_scores( |
| 243 | + self, |
| 244 | + inp_read_len: int, |
| 245 | + ): |
| 246 | + if self.uniform_quality_score: |
| 247 | + return np.array([self.uniform_quality_score] * inp_read_len) |
| 248 | + else: |
| 249 | + |
| 250 | + temp_quality_array = self.get_quality_scores( |
| 251 | + inp_read_len, |
| 252 | + self.read_length, |
| 253 | + self.rng |
| 254 | + ) |
| 255 | + |
| 256 | + if self.rescale_qualities: |
| 257 | + # Note that rescaling won't have much effect on binned quality scores. |
| 258 | + # First we rescale the quality score literals then convert back to phred score (with the 0.5 |
| 259 | + # causing borderline cases to take the next highest number). |
| 260 | + rescaled_quals = [max([0, int(-10. * np.log10(self.average_error * |
| 261 | + self.quality_score_error_rate[n]) + 0.5)]) |
| 262 | + for n in temp_quality_array] |
| 263 | + # Now rebin the quality scores. |
| 264 | + temp_qual_array = np.array(rescaled_quals) |
| 265 | + else: |
| 266 | + temp_qual_array = np.array(temp_quality_array) |
| 267 | + |
| 268 | + return temp_qual_array[:inp_read_len] |
| 269 | + |
| 270 | + |
| 271 | +class ErrorContainer: |
| 272 | + """ |
| 273 | + Holds data for a single error |
| 274 | +
|
| 275 | + :param error_type - the type of error this is |
| 276 | + :param location - the index of the start position of the variant in 0-based coordinates |
| 277 | + :param length - the length of the error |
| 278 | + :param ref - the reference sequence of the error includes base before insertion or deletion, as applicable, |
| 279 | + which is the same notation used in a VCF file. |
| 280 | + :param alt - the alternate sequence of the error (i.e., the error itself) |
| 281 | + """ |
| 282 | + def __init__(self, |
| 283 | + error_type: VariantTypes, |
| 284 | + location: int, |
| 285 | + length: int, |
| 286 | + ref: str or Seq, |
| 287 | + alt: str or Seq): |
| 288 | + self.error_type = error_type |
| 289 | + self.location = location |
| 290 | + self.length = length |
| 291 | + self.ref = ref |
| 292 | + self.alt = alt |
0 commit comments