-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdna_to_fastq_encoder.py
147 lines (114 loc) · 5.35 KB
/
dna_to_fastq_encoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
"""
This module defines the DnaToFastqEncoder class, which is designed to decode binary-encoded DNA sequences into the
FASTQ format. The FASTQ format is a text-based format for storing both a nucleotide sequences and its corresponding
quality scores. Both the sequence and quality scores are encoded within a single binary file, where specific bits
represent different nucleotides and their quality.
Dependencies:
- Python 3.6 or later
Usage example:
from dna_to_fastq_Encoder import DnaToFastqEncoder
encoder = DnaToFastqEncoder(filepath='path/to/binary/file', l=100)
encoder.decode_to_fastq()
"""
class DnaToFastqEncoder:
""" A class used to decode DNA into FASTQ format
This class takes a binary file as input, where each byte encodes a nucleotide and its quality score, and decodes it
into human-readable FASTQ format.
Args:
filepath (str): The binary file to decode
l (int): The length of the decoded DNA sequence
"""
nucleotide_dict = {0: 'A', 1: 'C', 2: 'G', 3: 'T'}
def __init__(self, filepath, l):
"""
Args:
filepath (str): The file to decode
l (int): The length of the decoded DNA sequence
Raises:
ValueError: If the L-value is 0 or less.
"""
self.filepath = filepath
if l <= 0:
raise ValueError(f"L-value error: Cannot decode sequences of length {l}. L-value must be a positive integer")
else:
self.l = l
self._is_data_loaded = False
self.data = None
def read_from_file(self):
""" Reads the file content into a bytes object
In the object state it stores whether the data has been loaded.
Raises:
FileNotFoundError: If the file doesn't exist.
"""
try:
with open(self.filepath, 'rb') as file:
self.data = file.read()
self._is_data_loaded = True
except FileNotFoundError:
raise FileNotFoundError(f"File {self.filepath} not found") from None
def fastq_string(self, nucleotide_seq, quality_scores, i):
""" Creates string representing decoded nucleotide in FASTQ format
Args:
nucleotide_seq (str): Decoded nucleotide sequence
quality_scores (str): Decoded quality scores
i (int): Index of the record
Returns:
string: String in the FASTQ format.
"""
return f"@READ_{i}\n{nucleotide_seq}\n+READ_{i}\n{quality_scores}"
def decode_dna(self, piece):
""" Decode piece of the DNA sequence
Takes a piece of the loaded binary data and decodes ot into nucleotides and quality scores. A piece is a bytes
object.
Args:
piece (bytes): Piece of the input DNA sequence with the length of L.
Returns:
tuple: Tuple of two strings nucleotides and their corresponding quality scores.
"""
nucleotide_seq = ""
quality_scores = ""
# Each byte decodes one nucleotide, therefore for example for l-value 7 the nucleotide sequence length
# will be 7
for byte in piece:
# Get the code of the nucleotide represented by two most significant bits with bitwise shift to right:
# '11011000' >> 6 -> '00000011' represents "T"
code = byte >> 6
# Look up code in the nucleotide_dict
nucleotide_seq += self.nucleotide_dict[code]
# Get six least significant bits from byte using mask: '11011000' & '00111111' -> 00011000
quality_score = byte & 0b00111111
quality_scores += chr(quality_score + 33)
return nucleotide_seq, quality_scores
def generate_decoded_sequences(self):
""" Slices the data into L number of sequences and decodes them into nucleotide sequences and corresponding
quality scores.
Yields:
tuple: A tuple containing three elements:
- str: The decoded nucleotide sequence
- str: The decoded quality score
- int: The index of the sequences
Raises:
RuntimeError: If the input data does not contain enough bytes to form a single sequence of length L.
"""
if not self._is_data_loaded:
self.read_from_file()
# Number of pieces depends on the l-value
n_pieces = len(self.data) // self.l
if n_pieces == 0:
raise RuntimeError(f"Insufficient data to decode: The input file '{self.filepath}' does not contain enough"
f" data to form a single sequence of length {self.l}.")
for i in range(n_pieces):
# Get the piece slice. Number of bytes in piece depends on the L-value
piece = self.data[i * self.l:(i + 1) * self.l]
# Get the nucleotides and their quality scores
nucleotide_seq, quality_scores = self.decode_dna(piece)
yield nucleotide_seq, quality_scores, i
def decode_to_fastq(self):
""" Decodes the DNA sequence and returns it in FASTQ format
Yields FASTQ formatted strings for each DNA sequence and its quality score.
Yields:
string: FASTQ string
"""
for nucleotide_seq, quality_scores, i in self.generate_decoded_sequences():
yield self.fastq_string(nucleotide_seq, quality_scores, i+1)