|
1 |
| -""" |
2 |
| -Tests for sequencing error model in models |
3 |
| -""" |
4 |
| - |
5 | 1 | import pytest
|
6 | 2 | import numpy as np
|
7 | 3 |
|
8 |
| -from Bio.SeqRecord import SeqRecord |
9 |
| -from Bio.Seq import Seq |
10 |
| - |
11 | 4 | from neat.models import FragmentLengthModel
|
12 |
| -from neat.variants import SingleNucleotideVariant, Insertion, Deletion |
13 | 5 | from neat.read_simulator.utils import Options, cover_dataset
|
14 | 6 |
|
15 | 7 |
|
16 | 8 | def test_cover_dataset():
|
17 |
| - """Test that a cover is successfully generated""" |
| 9 | + """Test that a cover is successfully generated for different coverage values""" |
18 | 10 | read_pool = [10] * 2000
|
19 | 11 | span_length = 100
|
20 | 12 | target_vector = np.full(100, fill_value=10, dtype=int)
|
21 | 13 | options = Options(rng_seed=0)
|
22 |
| - options.paired_ended = False |
23 |
| - options.read_len = 10 |
24 |
| - options.coverage = 10 |
| 14 | + options.read_len = 101 |
| 15 | + options.paired_ended = True |
| 16 | + options.fragment_mean = 250 |
| 17 | + options.fragment_st_dev = 100 |
| 18 | + options.output.overwrite_output = True |
25 | 19 | fragment_model = FragmentLengthModel(rng=options.rng)
|
26 | 20 |
|
27 |
| - read1, read2 = cover_dataset(read_pool, span_length, target_vector, options, fragment_model) |
28 |
| - coverage_check = [] |
29 |
| - for i in range(span_length): |
30 |
| - # single ended test, only need read1 |
31 |
| - cover = [x for x in read1 if i in range(x[0], x[1])] |
32 |
| - coverage_check.append(len(cover)) |
33 |
| - assert sum(coverage_check)/len(coverage_check) > 10 |
| 21 | + coverage_values = [1, 2, 5, 10, 25, 50] |
| 22 | + for coverage in coverage_values: |
| 23 | + options.coverage = coverage |
| 24 | + read1, read2 = cover_dataset(read_pool, span_length, target_vector, options, fragment_model) |
| 25 | + coverage_check = [] |
| 26 | + for i in range(span_length): |
| 27 | + # single ended test, only need read1 |
| 28 | + cover = [x for x in read1 if i in range(x[0], x[1])] |
| 29 | + coverage_check.append(len(cover)) |
| 30 | + assert sum(coverage_check)/len(coverage_check) > coverage, f"Coverage check failed for coverage {coverage}" |
34 | 31 |
|
35 | 32 |
|
36 | 33 | def test_paired_cover_dataset():
|
37 |
| - """Test that a cover is successfully generated""" |
| 34 | + """Test that a cover is successfully generated for different coverage values""" |
38 | 35 | read_pool = [10] * 2000
|
39 | 36 | span_length = 100
|
40 | 37 | target_vector = np.full(100, fill_value=10, dtype=int)
|
41 | 38 | options = Options(rng_seed=0)
|
| 39 | + options.read_len = 101 |
42 | 40 | options.paired_ended = True
|
43 |
| - options.read_len = 10 |
44 |
| - options.coverage = 10 |
| 41 | + options.fragment_mean = 250 |
| 42 | + options.fragment_st_dev = 100 |
| 43 | + options.output.overwrite_output = True |
45 | 44 | fragment_model = FragmentLengthModel(fragment_mean=20, fragment_std=2, fragment_min=10, fragment_max=30, rng=options.rng)
|
46 | 45 |
|
47 |
| - read1, read2 = cover_dataset(read_pool, span_length, target_vector, options, fragment_model) |
48 |
| - coverage_check = [] |
49 |
| - for i in range(span_length): |
50 |
| - # single ended test, only need read1 |
51 |
| - cover = [x for x in read1+read2 if i in range(x[0], x[1])] |
52 |
| - coverage_check.append(len(cover)) |
53 |
| - assert sum(coverage_check) / len(coverage_check) > 10 |
| 46 | + coverage_values = [1, 2, 5, 10, 25, 50] |
| 47 | + for coverage in coverage_values: |
| 48 | + options.coverage = coverage |
| 49 | + read1, read2 = cover_dataset(read_pool, span_length, target_vector, options, fragment_model) |
| 50 | + coverage_check = [] |
| 51 | + for i in range(span_length): |
| 52 | + # paired ended test, need both read1 and read2 |
| 53 | + cover = [x for x in read1 + read2 if i in range(x[0], x[1])] |
| 54 | + coverage_check.append(len(cover)) |
| 55 | + assert sum(coverage_check) / len(coverage_check) > coverage, f"Coverage check failed for coverage {coverage}" |
| 56 | + |
| 57 | + |
| 58 | +def test_various_read_lengths(): |
| 59 | + """Test cover_dataset with various read lengths to ensure no errors""" |
| 60 | + read_pool = [10] * 2000 |
| 61 | + span_length = 100 |
| 62 | + target_vector = np.full(100, fill_value=10, dtype=int) |
| 63 | + options = Options(rng_seed=0) |
| 64 | + options.paired_ended = True |
| 65 | + options.coverage = 10 |
| 66 | + options.fragment_mean = 250 |
| 67 | + options.fragment_st_dev = 100 |
| 68 | + options.output.overwrite_output = True |
| 69 | + fragment_model = FragmentLengthModel(rng=options.rng) |
| 70 | + |
| 71 | + for read_len in range(10, 251, 10): |
| 72 | + options.read_len = read_len |
| 73 | + try: |
| 74 | + read1, _ = cover_dataset(read_pool, span_length, target_vector, options, fragment_model) |
| 75 | + except Exception as e: |
| 76 | + pytest.fail(f"Test failed for read_len={read_len} with exception: {e}") |
| 77 | + |
| 78 | + |
| 79 | +def test_fragment_mean_st_dev_combinations(): |
| 80 | + """Test cover_dataset with combinations of fragment mean and standard deviation to ensure no errors""" |
| 81 | + read_pool = [10] * 2000 |
| 82 | + span_length = 100 |
| 83 | + target_vector = np.full(100, fill_value=10, dtype=int) |
| 84 | + options = Options(rng_seed=0) |
| 85 | + options.paired_ended = True |
| 86 | + options.read_len = 50 |
| 87 | + options.coverage = 10 |
| 88 | + options.output.overwrite_output = True |
| 89 | + |
| 90 | + fragment_means = [1, 2, 5, 10, 25, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 750, 1000] |
| 91 | + fragment_st_devs = [1, 2, 5, 10, 25, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 750, 1000] |
| 92 | + |
| 93 | + for mean in fragment_means: |
| 94 | + for st_dev in fragment_st_devs: |
| 95 | + options.fragment_mean = mean |
| 96 | + options.fragment_st_dev = st_dev |
| 97 | + fragment_model = FragmentLengthModel(fragment_mean=mean, fragment_std=st_dev, rng=options.rng) |
| 98 | + try: |
| 99 | + read1, _ = cover_dataset(read_pool, span_length, target_vector, options, fragment_model) |
| 100 | + except Exception as e: |
| 101 | + pytest.fail(f"Test failed for mean={mean}, st_dev={st_dev} with exception: {e}") |
0 commit comments