Skip to content

Commit

Permalink
Changes for Hadoop-BAM and htsjdk upgrades, Spark tools.
Browse files Browse the repository at this point in the history
  • Loading branch information
cmnbroad committed Feb 5, 2016
1 parent 10d9ec1 commit a9debad
Show file tree
Hide file tree
Showing 16 changed files with 190 additions and 70 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ task downloadGsaLibFile(type: Download) {

repositories {
mavenCentral()
mavenLocal()
jcenter()

maven {
Expand Down Expand Up @@ -88,7 +89,7 @@ dependencies {
compile files("${System.properties['java.home']}/../lib/tools.jar")

compile 'com.google.guava:guava:18.0'
compile 'com.github.samtools:htsjdk:2.0.1'
compile 'com.github.samtools:htsjdk:2.1.0'
compile ('com.google.cloud.genomics:google-genomics-dataflow:v1beta2-0.15') {
// an upstream dependency includes guava-jdk5, but we want the newer version instead.
exclude module: 'guava-jdk5'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.broadinstitute.hellbender.cmdline;

import htsjdk.samtools.ValidationStringency;
import htsjdk.samtools.util.Log;

/**
* A set of String constants in which the name of the constant (minus the _SHORT_NAME suffix)
* is the standard long Option name, and the value of the constant is the standard shortName.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.broadinstitute.hellbender.cmdline.Argument;
import org.broadinstitute.hellbender.cmdline.ArgumentCollectionDefinition;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.utils.read.ReadConstants;

import java.io.File;
import java.util.List;
Expand All @@ -23,7 +24,7 @@ public abstract class ReadInputArgumentCollection implements ArgumentCollectionD
"do not otherwise need to be decoded.",
common=true,
optional=true)
public ValidationStringency readValidationStringency = ValidationStringency.SILENT;
public ValidationStringency readValidationStringency = ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY;

/**
* Get the list of BAM/SAM/CRAM files specified at the command line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ void initializeReference() {
* May be overridden by traversals that require custom initialization of the reads data source.
*/
void initializeReads() {
SamReaderFactory factory = null;
if (! readArguments.getReadFiles().isEmpty()) {
factory = SamReaderFactory.makeDefault().validationStringency(readArguments.getReadValidationStringency());
SamReaderFactory factory = SamReaderFactory.makeDefault().validationStringency(readArguments.getReadValidationStringency());
if (hasReference()) { // pass in reference if available, because CRAM files need it
factory = factory.referenceSequence(referenceArguments.getReferenceFile());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.utils.iterators.SAMRecordToReadIterator;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadConstants;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -112,7 +113,7 @@ public ReadsDataSource( final List<File> samFiles, SamReaderFactory customSamRea

final SamReaderFactory samReaderFactory =
customSamReaderFactory == null ?
SamReaderFactory.makeDefault().validationStringency(ValidationStringency.SILENT) :
SamReaderFactory.makeDefault().validationStringency(ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY) :
customSamReaderFactory;

for ( final File samFile : samFiles ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,17 @@ protected void runPipeline( JavaSparkContext sparkContext ) {
/**
* Initialize standard tool inputs.
*/
private void initializeToolInputs(JavaSparkContext sparkContext) {
initializeReads(sparkContext);
private void initializeToolInputs(final JavaSparkContext sparkContext) {
initializeReference();
initializeReads(sparkContext); // in order to initialize reads, the reference must have been initializwd
initializeIntervals();
}

/**
* Initializes our reads source (but does not yet load the reads into a {@link JavaRDD}).
* Does nothing if no reads inputs are present.
*/
private void initializeReads(JavaSparkContext sparkContext) {
private void initializeReads(final JavaSparkContext sparkContext) {
if ( readArguments.getReadFilesNames().isEmpty() ) {
return;
}
Expand All @@ -333,8 +333,11 @@ private void initializeReads(JavaSparkContext sparkContext) {
}

readInput = readArguments.getReadFilesNames().get(0);
readsSource = new ReadsSparkSource(sparkContext);
readsHeader = ReadsSparkSource.getHeader(sparkContext, readInput, getAuthHolder());
readsSource = new ReadsSparkSource(sparkContext, readArguments.getReadValidationStringency());
readsHeader = readsSource.getHeader(
readInput,
hasReference() ? referenceArguments.getReferenceFile().getAbsolutePath() : null,
getAuthHolder());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.avro.AvroParquetInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -24,12 +26,14 @@
import org.broadinstitute.hellbender.utils.io.IOUtils;
import org.broadinstitute.hellbender.utils.read.BDGAlignmentRecordToGATKReadAdapter;
import org.broadinstitute.hellbender.utils.read.GATKRead;
import org.broadinstitute.hellbender.utils.read.ReadConstants;
import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter;
import org.seqdoop.hadoop_bam.AnySAMInputFormat;
import org.seqdoop.hadoop_bam.CRAMInputFormat;
import org.seqdoop.hadoop_bam.SAMRecordWritable;
import org.seqdoop.hadoop_bam.util.SAMHeaderReader;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
Expand All @@ -42,8 +46,16 @@ public final class ReadsSparkSource implements Serializable {
private static final String HADOOP_PART_PREFIX = "part-";

private transient final JavaSparkContext ctx;
public ReadsSparkSource(JavaSparkContext ctx) {
private ValidationStringency validationStringency = ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY;

protected final Logger logger = LogManager.getLogger(ReadsSparkSource.class);

public ReadsSparkSource(final JavaSparkContext ctx) { this.ctx = ctx; }

public ReadsSparkSource(final JavaSparkContext ctx, final ValidationStringency validationStringency)
{
this.ctx = ctx;
this.validationStringency = validationStringency;
}


Expand All @@ -69,40 +81,24 @@ public JavaRDD<GATKRead> getParallelReads(final String readFileName, final Strin
* use the default split size (determined by the Hadoop input format, typically the size of one HDFS block).
* @return RDD of (SAMRecord-backed) GATKReads from the file.
*/
public JavaRDD<GATKRead> getParallelReads(final String readFileName, final String referencePath, final List<SimpleInterval> intervals, long splitSize) {
final Configuration conf = new Configuration();
public JavaRDD<GATKRead> getParallelReads(final String readFileName, final String referencePath, final List<SimpleInterval> intervals, final long splitSize) {
final Configuration conf = ctx.hadoopConfiguration();
if (splitSize > 0) {
conf.set("mapreduce.input.fileinputformat.split.maxsize", Long.toString(splitSize));
}

final JavaPairRDD<LongWritable, SAMRecordWritable> rdd2;

//Note: in Hadoop-bam AnySAMInputFormat does not support CRAM https://github.com/HadoopGenomics/Hadoop-BAM/issues/35
//The workaround is to use CRAMInputFormat
if (IOUtils.isCramFileName(readFileName)) {
if (referencePath == null){
throw new UserException.MissingReference("A reference file is required when using CRAM files.");
}
//Note: cram input requires a reference and reference is passed by this property.
conf.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY, referencePath);
setHadoopBAMConfigurationProperties(readFileName, referencePath);

rdd2 = ctx.newAPIHadoopFile(
readFileName, CRAMInputFormat.class, LongWritable.class, SAMRecordWritable.class,
conf);
} else {
rdd2 = ctx.newAPIHadoopFile(
rdd2 = ctx.newAPIHadoopFile(
readFileName, AnySAMInputFormat.class, LongWritable.class, SAMRecordWritable.class,
conf);
}

return rdd2.map(v1 -> {
SAMRecord sam = v1._2().get();
if (samRecordOverlaps(sam, intervals)) {
try {
return (GATKRead)SAMRecordToGATKReadAdapter.headerlessReadAdapter(sam);
} catch (SAMException e) {
// TODO: add stringency
}
return (GATKRead)SAMRecordToGATKReadAdapter.headerlessReadAdapter(sam);
}
return null;

Expand Down Expand Up @@ -130,7 +126,7 @@ public JavaRDD<GATKRead> getParallelReads(final String readFileName, final Strin
* @return RDD of (SAMRecord-backed) GATKReads from the file.
*/
public JavaRDD<GATKRead> getParallelReads(final String readFileName, final String referencePath, int splitSize) {
final SAMFileHeader readsHeader = getHeader(ctx, readFileName, null);
final SAMFileHeader readsHeader = getHeader(readFileName, referencePath, null);
List<SimpleInterval> intervals = IntervalUtils.getAllIntervalsForReference(readsHeader.getSequenceDictionary());
return getParallelReads(readFileName, referencePath, intervals, splitSize);
}
Expand Down Expand Up @@ -161,15 +157,16 @@ public JavaRDD<GATKRead> getADAMReads(final String inputPath, final List<SimpleI
/**
* Loads the header using Hadoop-BAM.
* @param filePath path to the bam.
* @param referencePath to the reference
* @param auth authentication information if using GCS.
* @return the header for the bam.
*/
public static SAMFileHeader getHeader(final JavaSparkContext ctx, final String filePath, final AuthHolder auth) {
public SAMFileHeader getHeader(final String filePath, final String referencePath, final AuthHolder auth) {
// GCS case
if (BucketUtils.isCloudStorageUrl(filePath)) {
try {
Storage.Objects storageClient = auth.makeStorageClient();
try (final SamReader reader = BAMIO.openBAM(storageClient, filePath, ValidationStringency.DEFAULT_STRINGENCY)) {
try (final SamReader reader = BAMIO.openBAM(storageClient, filePath, validationStringency)) {
return reader.getFileHeader();
}
} catch (Exception e) {
Expand All @@ -194,12 +191,48 @@ public boolean accept(Path path) {
}
path = bamFiles[0].getPath(); // Hadoop-BAM writes the same header to each shard, so use the first one
}
setHadoopBAMConfigurationProperties(path.getName(), referencePath);
return SAMHeaderReader.readSAMHeaderFrom(path, ctx.hadoopConfiguration());
} catch (IOException e) {
throw new UserException("Failed to read bam header from " + filePath + "\n Caused by:" + e.getMessage(), e);
}
}

/**
* Propagate any values that need to be passed to Hadoop-BAM through configuration properties:
*
* - the validation stringency property is always set
* - if the input file is a CRAM file, the reference value, which must be a URI which includes
* a scheme, will also be set
* - if the input file is not CRAM, the reference property is *unset* to prevent Hadoop-BAM
* from passing a stale value through to htsjdk when multiple read calls are made serially
* with different inputs but the same Spark context
*/
private void setHadoopBAMConfigurationProperties(final String inputName, final String referenceName) {
final Configuration conf = ctx.hadoopConfiguration();
conf.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, validationStringency.name());

if (!IOUtils.isCramFileName(inputName)) { // only set the reference for CRAM input
conf.unset(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY);
}
else {
if (null == referenceName) {
throw new UserException.MissingReference("A reference is required for CRAM input");
}
else {
if (ReferenceTwoBitSource.isTwoBit(referenceName)) { // htsjdk can't handle 2bit reference files
throw new UserException("A 2bit file cannot be used as a CRAM file reference");
}
else { // Hadoop-BAM requires the reference to be a URI, including scheme
String referenceURI = null == new Path(referenceName).toUri().getScheme() ?
"file://" + new File(referenceName).getAbsolutePath() :
referenceName;
conf.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY, referenceURI);
}
}
}
}

/**
* Tests if a given SAMRecord overlaps any interval in a collection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected Object doWork() {
validator.setBisulfiteSequenced(IS_BISULFITE_SEQUENCED);
}
if (VALIDATE_INDEX) {
validator.setValidateIndex(VALIDATE_INDEX);
validator.setIndexValidationStringency(BamIndexValidator.IndexValidationStringency.EXHAUSTIVE);
}
if (IOUtil.isRegularPath(INPUT)) {
// Do not check termination if reading from a stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected void runPipeline( JavaSparkContext ctx ) {

final ReferenceMultiSource rds = new ReferenceMultiSource(auth, referenceURL, BaseRecalibrationEngine.BQSR_REFERENCE_WINDOW_FUNCTION);

SAMFileHeader readsHeader = ReadsSparkSource.getHeader(ctx, bam, auth);
SAMFileHeader readsHeader = new ReadsSparkSource(ctx, readArguments.getReadValidationStringency()).getHeader(bam, referenceURL, auth);
final SAMSequenceDictionary readsDictionary = readsHeader.getSequenceDictionary();
final SAMSequenceDictionary refDictionary = rds.getReferenceSequenceDictionary(readsDictionary);
final CountingReadFilter readFilterToApply = BaseRecalibrator.getStandardBQSRReadFilter(readsHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ final public class CompareBaseQualitiesSpark extends GATKSparkTool {
@Override
protected void runTool(final JavaSparkContext ctx) {
JavaRDD<GATKRead> firstReads = getReads();
ReadsSparkSource readsSource2 = new ReadsSparkSource(ctx);
ReadsSparkSource readsSource2 = new ReadsSparkSource(ctx, readArguments.getReadValidationStringency());
JavaRDD<GATKRead> secondReads = readsSource2.getParallelReads(input2, null, getIntervals(), bamPartitionSplitSize);

long firstBamSize = firstReads.count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected void runTool(final JavaSparkContext ctx) {

JavaRDD<GATKRead> firstReads = filteredReads(getReads(), readArguments.getReadFilesNames().get(0));

ReadsSparkSource readsSource2 = new ReadsSparkSource(ctx);
ReadsSparkSource readsSource2 = new ReadsSparkSource(ctx, readArguments.getReadValidationStringency());
JavaRDD<GATKRead> secondReads = filteredReads(readsSource2.getParallelReads(input2, null, getIntervals(), bamPartitionSplitSize), input2);

// Start by verifying that we have same number of reads and duplicates in each BAM.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package org.broadinstitute.hellbender.utils.read;

import htsjdk.samtools.ValidationStringency;

/**
* Constants for use with the GATKRead interface
*/
public final class ReadConstants {

/**
* Value used as the default validation stringency for all read input
*/
public static ValidationStringency DEFAULT_READ_VALIDATION_STRINGENCY = ValidationStringency.SILENT;

/**
* Value used to represent the absence of a defined start/end position in a read
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.SAMRecordCoordinateComparator;
import htsjdk.samtools.ValidationStringency;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -70,9 +71,9 @@ public void readsSinkTest(String inputBam, String outputFileName, String outputF
final File outputFile = createTempFile(outputFileName, outputFileExtension);
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();

ReadsSparkSource readSource = new ReadsSparkSource(ctx);
ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, null);
SAMFileHeader header = ReadsSparkSource.getHeader(ctx, inputBam, null);
SAMFileHeader header = readSource.getHeader(inputBam, null, null);

ReadsSparkSink.writeReads(ctx, outputFile.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.SINGLE);

Expand All @@ -96,10 +97,10 @@ public void readsSinkShardedTest(String inputBam, String outputFileName, String
final File outputFile = createTempFile(outputFileName, outputFileExtension);
JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();

ReadsSparkSource readSource = new ReadsSparkSource(ctx);
ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, null);
rddParallelReads = rddParallelReads.repartition(2); // ensure that the output is in two shards
SAMFileHeader header = ReadsSparkSource.getHeader(ctx, inputBam, null);
SAMFileHeader header = readSource.getHeader(inputBam, null, null);

ReadsSparkSink.writeReads(ctx, outputFile.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.SHARDED);
int shards = outputFile.listFiles((dir, name) -> !name.startsWith(".") && !name.startsWith("_")).length;
Expand All @@ -123,9 +124,9 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw

JavaSparkContext ctx = SparkContextFactory.getTestSparkContext();

ReadsSparkSource readSource = new ReadsSparkSource(ctx);
ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT);
JavaRDD<GATKRead> rddParallelReads = readSource.getParallelReads(inputBam, null);
SAMFileHeader header = ReadsSparkSource.getHeader(ctx, inputBam, null);
SAMFileHeader header = readSource.getHeader(inputBam, null, null);

ReadsSparkSink.writeReads(ctx, outputDirectory.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.ADAM);

Expand Down
Loading

0 comments on commit a9debad

Please sign in to comment.