Skip to content

Commit

Permalink
Configurable approaches to prep alignment splits
Browse files Browse the repository at this point in the history
- Adds a configuration option to explore alternatives to our current
  bgzip/grabix preparation for fastqs to see if we can improve speed.
- Add indexing option with rtg SDF files, which improves speed at the
  cost of a 3x larger disk footprint.
  • Loading branch information
chapmanb committed Jan 6, 2016
1 parent 5da92ae commit c08aba1
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 35 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
representation and pick best metric for plotting ROC scores.
- Remove `--sudo` flag from installer. bcbio requires install into a directory
structure with user permissions.
- Add ability to tweak fastq preparation for alignment splitting so we can
explore alternative approaches to bgzip and grabix index.
- Re-enable `stringtie` as an expression caller.
- Allow `stringtie` as a transcript assembler.
- Replace the `assemble_transcriptome` option with `transcript_assembler`, which
Expand Down
67 changes: 45 additions & 22 deletions bcbio/ngsalign/alignprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
import collections
import copy
import glob
import os
import shutil
import subprocess
Expand All @@ -14,6 +15,7 @@
from bcbio.distributed import objectstore
from bcbio.distributed.multi import run_multicore, zeromq_aware_logging
from bcbio.distributed.transaction import file_transaction
from bcbio.ngsalign import rtg
from bcbio.pipeline import config_utils, tools
from bcbio.pipeline import datadict as dd
from bcbio.provenance import do
Expand All @@ -22,46 +24,65 @@ def create_inputs(data):
"""Index input reads and prepare groups of reads to process concurrently.
Allows parallelization of alignment beyond processors available on a single
machine. Uses bgzip and grabix to prepare an indexed fastq file.
machine. Prepares a rtg SDF format file with build in indexes for retrieving
sections of files.
Retains back compatibility with bgzip/grabix approach.
"""
aligner = tz.get_in(("config", "algorithm", "aligner"), data)
# CRAM files must be converted to bgzipped fastq, unless not aligning.
# Also need to prep and download remote files.
if not ("files" in data and data["files"] and aligner and (_is_cram_input(data["files"]) or
objectstore.is_remote(data["files"][0]))):
# skip indexing on samples without input files or not doing alignment
# skip if we're not BAM and not doing alignment splitting
if ("files" not in data or not data["files"] or data["files"][0] is None or not aligner
or _no_index_needed(data)):
if ("files" not in data or not data["files"] or data["files"][0] is None or not aligner):
return [[data]]
ready_files = _prep_grabix_indexes(data["files"], data["dirs"], data)
data["files"] = ready_files
# bgzip preparation takes care of converting illumina into sanger format
approach = "grabix" if _has_grabix_indices(data) else dd.get_align_prep_method(data)
if approach == "rtg":
data["files"] = [rtg.to_sdf(data["files"], data)]
else:
data["files"] = _prep_grabix_indexes(data["files"], data["dirs"], data)
# preparation converts illumina into sanger format
data["config"]["algorithm"]["quality_format"] = "standard"
if tz.get_in(["config", "algorithm", "align_split_size"], data):
splits = _find_read_splits(ready_files[0], data["config"]["algorithm"]["align_split_size"])
else:
splits = [None]
if len(splits) == 1:
return [[data]]
else:
out = []
if approach == "rtg":
splits = rtg.calculate_splits(data["files"][0], data["config"]["algorithm"]["align_split_size"])
else:
splits = _find_read_splits(data["files"][0], data["config"]["algorithm"]["align_split_size"])
for split in splits:
cur_data = copy.deepcopy(data)
cur_data["align_split"] = "-".join([str(x) for x in split])
cur_data["align_split"] = split
out.append([cur_data])
return out
else:
return [[data]]

def _no_index_needed(data):
return (not data["files"][0].endswith(".bam")
and data["config"]["algorithm"].get("align_split_size") is None)
def _has_grabix_indices(data):
"""Back compatibility with existing runs, look for grabix indexes.
"""
work_dir = (os.path.join(data["dirs"]["work"], "align_prep"))
return len(glob.glob(os.path.join(work_dir, "*.gbi"))) > 0

def split_namedpipe_cl(in_file, data):
def split_namedpipe_cls(pair1_file, pair2_file, data):
"""Create a commandline suitable for use as a named pipe with reads in a given region.
"""
grabix = config_utils.get_program("grabix", data["config"])
start, end = [int(x) for x in data["align_split"].split("-")]
return "<({grabix} grab {in_file} {start} {end})".format(**locals())
if "align_split" in data:
start, end = [int(x) for x in data["align_split"].split("-")]
else:
start, end = None, None
if pair1_file.endswith(".sdf"):
assert not pair2_file, pair2_file
return rtg.to_fastq_apipe_cl(pair1_file, start, end)
else:
out = []
for in_file in pair1_file, pair2_file:
if in_file:
assert os.path.exists(in_file + ".gbi"), "Need grabix index for %s" % in_file
out.append("<(grabix grab {in_file} {start} {end})".format(**locals()))
else:
out.append(None)
return out

def fastq_convert_pipe_cl(in_file, data):
"""Create an anonymous pipe converting Illumina 1.3-1.7 to Sanger.
Expand Down Expand Up @@ -89,6 +110,8 @@ def parallel_multiplier(items):
def setup_combine(final_file, data):
"""Setup the data and outputs to allow merging data back together.
"""
if "align_split" not in data:
return final_file, data
align_dir = os.path.dirname(final_file)
base, ext = os.path.splitext(os.path.basename(final_file))
start, end = [int(x) for x in data["align_split"].split("-")]
Expand Down Expand Up @@ -149,7 +172,7 @@ def _find_read_splits(in_file, split_size):
new = last + split_lines - 1
chunks.append((last, min(new, num_lines)))
last = new + 1
return chunks
return ["%s-%s" % (s, e) for s, e in chunks]

# ## bgzip and grabix

Expand Down
20 changes: 12 additions & 8 deletions bcbio/ngsalign/bwa.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from bcbio import bam, utils
from bcbio.distributed import objectstore
from bcbio.distributed.transaction import file_transaction, tx_tmpdir
from bcbio.ngsalign import alignprep, novoalign, postalign
from bcbio.ngsalign import alignprep, novoalign, postalign, rtg
from bcbio.provenance import do
from bcbio.rnaseq import gtf
import bcbio.pipeline.datadict as dd
Expand Down Expand Up @@ -84,12 +84,16 @@ def _get_bwa_mem_cmd(data, out_file, ref_file, fastq1, fastq2=""):
"{ref_file} {fastq1} {fastq2} ")
return (bwa_cmd + alt_cmd).format(**locals())

def _can_use_mem(fastq_file, data):
def _can_use_mem(fastq_file, data, read_min_size=None):
"""bwa-mem handle longer (> 70bp) reads with improved piping.
Randomly samples 5000 reads from the first two million.
Default to no piping if more than 75% of the sampled reads are small.
If we've previously calculated minimum read sizes (from rtg SDF output)
we can skip the formal check.
"""
min_size = 70
if read_min_size and read_min_size >= min_size:
return True
thresh = 0.75
head_count = 8000000
tocheck = 5000
Expand All @@ -115,12 +119,13 @@ def align_pipe(fastq_file, pair_file, ref_file, names, align_dir, data):
pair_file = pair_file if pair_file else ""
out_file = os.path.join(align_dir, "{0}-sort.bam".format(names["lane"]))
qual_format = data["config"]["algorithm"].get("quality_format", "").lower()
if data.get("align_split"):
min_size = None
if data.get("align_split") or fastq_file.endswith(".sdf"):
if fastq_file.endswith(".sdf"):
min_size = rtg.min_read_size(fastq_file)
final_file = out_file
out_file, data = alignprep.setup_combine(final_file, data)
fastq_file = alignprep.split_namedpipe_cl(fastq_file, data)
if pair_file:
pair_file = alignprep.split_namedpipe_cl(pair_file, data)
fastq_file, pair_file = alignprep.split_namedpipe_cls(fastq_file, pair_file, data)
else:
final_file = None
if qual_format == "illumina":
Expand All @@ -131,7 +136,7 @@ def align_pipe(fastq_file, pair_file, ref_file, names, align_dir, data):
if not utils.file_exists(out_file) and (final_file is None or not utils.file_exists(final_file)):
# If we cannot do piping, use older bwa aln approach
if ("bwa-mem" not in dd.get_tools_on(data) and
("bwa-mem" in dd.get_tools_off(data) or not _can_use_mem(fastq_file, data))):
("bwa-mem" in dd.get_tools_off(data) or not _can_use_mem(fastq_file, data, min_size))):
out_file = _align_backtrack(fastq_file, pair_file, ref_file, out_file,
names, rg_info, data)
else:
Expand All @@ -152,7 +157,6 @@ def _align_mem(fastq_file, pair_file, ref_file, out_file, names, rg_info, data):
def _align_backtrack(fastq_file, pair_file, ref_file, out_file, names, rg_info, data):
"""Perform a BWA alignment using 'aln' backtrack algorithm.
"""
assert not data.get("align_split"), "Do not handle split alignments with non-piped bwa"
bwa = config_utils.get_program("bwa", data["config"])
config = data["config"]
sai1_file = "%s_1.sai" % os.path.splitext(out_file)[0]
Expand Down
6 changes: 2 additions & 4 deletions bcbio/ngsalign/novoalign.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ def align_pipe(fastq_file, pair_file, ref_file, names, align_dir, data):
"""
pair_file = pair_file if pair_file else ""
out_file = os.path.join(align_dir, "{0}-sort.bam".format(names["lane"]))
if data.get("align_split"):
if data.get("align_split") or fastq_file.endswith(".sdf"):
final_file = out_file
out_file, data = alignprep.setup_combine(final_file, data)
fastq_file = alignprep.split_namedpipe_cl(fastq_file, data)
if pair_file:
pair_file = alignprep.split_namedpipe_cl(pair_file, data)
fastq_file, pair_file = alignprep.split_namedpipe_cls(fastq_file, pair_file, data)
else:
final_file = None
samtools = config_utils.get_program("samtools", data["config"])
Expand Down
100 changes: 100 additions & 0 deletions bcbio/ngsalign/rtg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Provide indexing and retrieval of files using Real Time Genomics SDF format.
Prepares a sdf representation of reads suitable for indexed retrieval,
normalizing many different input types.
https://github.com/RealTimeGenomics/rtg-tools
"""
import os
import subprocess

from bcbio import bam, utils
from bcbio.distributed.transaction import file_transaction
from bcbio.pipeline import datadict as dd
from bcbio.provenance import do

def to_sdf(files, data):
"""Convert a fastq or BAM input into a SDF indexed file.
"""
# BAM
if len(files) == 1 and files[0].endswith(".bam"):
qual = []
format = ["-f", "sam-pe" if bam.is_paired(files[0]) else "sam-se"]
inputs = [files[0]]
# fastq
else:
qual = ["-q", "illumina" if dd.get_quality_format(data).lower() == "illumina" else "sanger"]
format = ["-f", "fastq"]
if len(files) == 2:
inputs = ["-l", files[0], "-r", files[1]]
else:
assert len(files) == 1
inputs = [files[0]]
work_dir = utils.safe_makedir(os.path.join(data["dirs"]["work"], "align_prep"))
out_file = os.path.join(work_dir,
"%s.sdf" % utils.splitext_plus(os.path.basename(os.path.commonprefix(files)))[0])
if not utils.file_exists(out_file):
with file_transaction(data, out_file) as tx_out_file:
cmd = _rtg_cmd(["rtg", "format", "-o", tx_out_file] + format + qual + inputs)
do.run(cmd, "Format inputs to indexed SDF")
return out_file

def _rtg_cmd(cmd):
return "export RTG_JAVA_OPTS='-Xms500m' && export RTG_MEM=2g && " + " ".join(cmd)

def calculate_splits(sdf_file, split_size):
"""Retrieve
"""
counts = _sdfstats(sdf_file)["counts"]
splits = []
cur = 0
for i in range(counts // split_size + (0 if counts % split_size == 0 else 1)):
splits.append("%s-%s" % (cur, min(counts, cur + split_size)))
cur += split_size
return splits

def _sdfstats(sdf_file):
cmd = ["rtg", "sdfstats", sdf_file]
pairs = []
counts = []
lengths = []
for line in subprocess.check_output(_rtg_cmd(cmd), shell=True).split("\n"):
if line.startswith("Paired arm"):
pairs.append(line.strip().split()[-1])
elif line.startswith("Number of sequences"):
counts.append(int(line.strip().split()[-1]))
elif line.startswith("Minimum length"):
lengths.append(int(line.strip().split()[-1]))
assert len(set(counts)) == 1, counts
return {"counts": counts[0], "pairs": pairs, "min_size": min(lengths)}

def min_read_size(sdf_file):
"""Retrieve minimum read size from SDF statistics.
"""
return _sdfstats(sdf_file)["min_size"]

def is_paired(sdf_file):
"""Check if we have paired inputs in a SDF file.
"""
pairs = _sdfstats(sdf_file)["pairs"]
return len(set(pairs)) > 1

def to_fastq_apipe_cl(sdf_file, start=None, end=None):
"""Return a command lines to provide streaming fastq input.
For paired end, returns a forward and reverse command line. For
single end returns a single command line and None for the pair.
"""
cmd = ["rtg", "sdf2fastq", "--no-gzip", "-o", "-"]
if start is not None:
cmd += ["--start-id=%s" % start]
if end is not None:
cmd += ["--end-id=%s" % end]
if is_paired(sdf_file):
out = []
for ext in ["left", "right"]:
out.append("<(%s)" % _rtg_cmd(cmd + ["-i", os.path.join(sdf_file, ext)]))
return out
else:
cmd += ["-i", sdf_file]
return ["<(%s)" % _rtg_cmd(cmd), None]
1 change: 1 addition & 0 deletions bcbio/pipeline/datadict.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
"coverage": {"keys": ["config", "algorithm", "coverage"]},
"deduped_bam": {"keys": ["deduped_bam"]},
"align_bam": {"keys": ["align_bam"]},
"align_prep_method": {"keys": ["config", "algorithm", "align_prep_method"], "default": "grabix"},
"tools_off": {"keys": ["config", "algorithm", "tools_off"], "default": []},
"tools_on": {"keys": ["config", "algorithm", "tools_on"], "default": []},
"cwl_reporting": {"keys": ["config", "algorithm", "cwl_reporting"]},
Expand Down
2 changes: 1 addition & 1 deletion bcbio/pipeline/run_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _check_for_misplaced(xs, subkey, other_keys):

ALGORITHM_KEYS = set(["platform", "aligner", "bam_clean", "bam_sort",
"trim_reads", "adapters", "custom_trim", "species", "kraken",
"align_split_size", "quality_bin", "transcriptome_align",
"align_split_size", "align_prep_method", "quality_bin", "transcriptome_align",
"quality_format", "write_summary", "merge_bamprep",
"coverage", "coverage_interval", "ploidy", "indelcaller",
"variantcaller", "jointcaller", "variant_regions", "peakcaller",
Expand Down
1 change: 1 addition & 0 deletions tests/data/automated/run_info-bam.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ details:
#quality_bin: [prealignment, postrecal]
#archive: cram
align_split_size: 15000
#align_prep_method: grabix
aligner: bwa
#aligner: false
#bam_clean: picard
Expand Down

10 comments on commit c08aba1

@schelhorn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I added in my preprocessing is piped quality format conversion during bgzip re-compression using seqtk. Basically I am reading the first 10k fastq entries with seqtk and extracting average quality, and then parametrize streamed conversion to Sanger format with seqtk|bgzip if the quality range indicates that a file is in Illumina format. That simplifies a lot downstream (and during configuration, since I do not have to know the quality format beforehand), and I have to touch the fastqs anyway.

@chapmanb
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sven-Eric;
Thanks for this. As normal, you're reading my mind. We already do the quality conversion as part of bgzip and indexing so everything downstream can assume sanger:

https://github.com/chapmanb/bcbio-nextgen/blob/master/bcbio/ngsalign/alignprep.py#L434
https://github.com/chapmanb/bcbio-nextgen/blob/master/bcbio/ngsalign/alignprep.py#L87

The goal here is to try and make the preparation fast enough that we can always do it, which will enable more parallelization and hopefully better runs as well. I was hoping SDF indices from RTG would be a good solution. They are faster to generate but take ~3x more space than bgzipped fastqs. @Lenbok, are there ways to compress RTG fastq file storage to make them more disk friendly? I love the built in read indexing and retrieval.

We do have auto-detection of format, although only as a check instead of automatically setting it:

https://github.com/chapmanb/bcbio-nextgen/blob/master/bcbio/ngsalign/alignprep.py#L87

Maybe we could add a quality_format: auto option to guess and set quality formats if folks want to trust bcbio to figure it out. Would that be helpful and save upstream processing?

@schelhorn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great; automatic conversion in bcbio would definitely be a plus. I guess it just depends on the number of records you're looking at to infer the format. 10k were always okay in my experience. I wasn't using the bcbio implementation since I also wanted to parallelize the S3 download and bgzip/conversion over the cluster (bcbio was only using one job for this some time ago). Perhaps that has changed in the meantime. Regarding the RTG fastqs, I'd be fine with larger indexes but not with overall larger data files. In fact, I'd even vote for unaligned 8-binned CRAM if that is a reasonable alternative to fastq for all the external tools (but I guess it isn't yet). I'd assume CRAM would also include the ability for random access, but I'm not so sure if samtools/cramtools/scramble support that nicely enough for bcbio to use that everywhere.

@Lenbok
Copy link

@Lenbok Lenbok commented on c08aba1 Jan 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chapmanb short answer, no, not unless you count the use of --no-names or --no-quality to remove some of the data altogether. Longer answer:

The SDF format uses a fixed-bit-packed representation for storage of nucleotides/amino acids/q-scores (and q-scores are always stored in sanger encoding) in order to get all the parsing/normalization out of the way once and allow direct access to read subsets (or to pull out subsequences of longer data such as chromosomes, e.g rtg sdfsubseq). While it's better than storing the data uncompressed, there's definitely a large gap between it and a compressed storage format. A couple of years ago we did some experiments with compression (there's even an arithmetic coder implementation in the source) -- it was too slow to be practical, but that was in the context of looking for in-memory RAM reduction during mapping.

The margin is larger now people have started quantizing the q-scores, which compression techniques naturally take advantage of but which SDF still stores at a little over 6 bits per score. Looking at the q-score distribution of a typical fastq we have here which seems to have been binned to 7 distinct values, a 0-order compression model should be able to get this to about 1.9 bits per score. That alone would take the 3x down to 2x, more could be obtained from compression of name or base data.

Thanks for the feedback!

@chapmanb
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Len -- thanks for the overview. That helps a ton. The SDF indexing and access is really nice and I understand the space tradeoff.

Sven-Eric -- thanks for the feedback. I'll look at adding in auto-detection as an option. We have the RTG SDF option available but currently default to bgzip and grabix indexed fastqs. They are a bit slower to generate, but parallel bgzip helps quite a bit to make it comparable with RTGs preparation process as long as there are multiple cores to use. I don't know of any CRAM approaches for pulling blocks of reads from unaligned inputs, but would be happy to try them out if you know a practical approach. Thanks again.

@schelhorn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, samtools can read CRAM by now and is using htslib internally (https://github.com/samtools/htslib).

HTSlib implements a generalized BAM index, with file extension .csi (coordinate-sorted index). The HTSlib file reader first looks for the new index and then for the old if the new index is absent. This project also includes the popular tabix indexer, which indexes both .tbi and .csi formats, and the bgzip compression utility.

Somehow this sounds as if extracting blocks of reads should work, in principle.

@schelhorn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, further following up the CRAM proposal (since CRAM will be the dominant read format soon anyways, with support for lossy and greedy reference-based compression also for unaligned reads), obviously one would need someone to implement extraction of unaligned reads by blocks (i.e., give me the first ends of all reads pairs with index 1001-2000). Perhaps we could approach these guys: https://github.com/jwalabroad/VariantBam/blob/master/README.md That tool builds on htslib and looks useful also for other areas of bcbio. Given their powerful rule engine it might comparable easy for them to implement paired chunking of reads. Shall we ask?

@schelhorn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys and gals, I should have said - at least two ladies are on the team.

@chapmanb
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sven-Eric;
Nice find with VariantBam, that looks really useful. The ability to downsample by coverage is incredible and something I've looked for in a tool for a while. I don't know about the ability to extend the current cram indexes to pull out read blocks but definitely seems worth asking if that's of interest to the team. Thanks again.

@schelhorn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approached the htslib people first, since they should know most about the block structure of the CRAM format. samtools/htslib#316

Please sign in to comment.