From a9debad8e2c2cb910edbd6ba5ef8a5a395f1f51c Mon Sep 17 00:00:00 2001 From: Chris Norman Date: Wed, 3 Feb 2016 17:49:25 -0500 Subject: [PATCH] Changes for Hadoop-BAM and htsjdk upgrades, Spark tools. --- build.gradle | 3 +- .../cmdline/StandardArgumentDefinitions.java | 3 + .../ReadInputArgumentCollection.java | 3 +- .../hellbender/engine/GATKTool.java | 3 +- .../hellbender/engine/ReadsDataSource.java | 3 +- .../engine/spark/GATKSparkTool.java | 13 +- .../spark/datasources/ReadsSparkSource.java | 83 +++++++++---- .../tools/picard/sam/ValidateSamFile.java | 2 +- .../spark/BaseRecalibratorSparkSharded.java | 2 +- .../validation/CompareBaseQualitiesSpark.java | 2 +- .../validation/CompareDuplicatesSpark.java | 2 +- .../hellbender/utils/read/ReadConstants.java | 7 ++ .../datasources/ReadsSparkSinkUnitTest.java | 13 +- .../datasources/ReadsSparkSourceUnitTest.java | 111 ++++++++++++++---- .../picard/sam/SortSamIntegrationTest.java | 5 +- .../MarkDuplicatesSparkUnitTest.java | 5 +- 16 files changed, 190 insertions(+), 70 deletions(-) diff --git a/build.gradle b/build.gradle index 73b5da86f72..6d25ef359f3 100644 --- a/build.gradle +++ b/build.gradle @@ -39,6 +39,7 @@ task downloadGsaLibFile(type: Download) { repositories { mavenCentral() + mavenLocal() jcenter() maven { @@ -88,7 +89,7 @@ dependencies { compile files("${System.properties['java.home']}/../lib/tools.jar") compile 'com.google.guava:guava:18.0' - compile 'com.github.samtools:htsjdk:2.0.1' + compile 'com.github.samtools:htsjdk:2.1.0' compile ('com.google.cloud.genomics:google-genomics-dataflow:v1beta2-0.15') { // an upstream dependency includes guava-jdk5, but we want the newer version instead. exclude module: 'guava-jdk5' diff --git a/src/main/java/org/broadinstitute/hellbender/cmdline/StandardArgumentDefinitions.java b/src/main/java/org/broadinstitute/hellbender/cmdline/StandardArgumentDefinitions.java index 80d14d2788c..aec3e4f6164 100644 --- a/src/main/java/org/broadinstitute/hellbender/cmdline/StandardArgumentDefinitions.java +++ b/src/main/java/org/broadinstitute/hellbender/cmdline/StandardArgumentDefinitions.java @@ -1,5 +1,8 @@ package org.broadinstitute.hellbender.cmdline; +import htsjdk.samtools.ValidationStringency; +import htsjdk.samtools.util.Log; + /** * A set of String constants in which the name of the constant (minus the _SHORT_NAME suffix) * is the standard long Option name, and the value of the constant is the standard shortName. diff --git a/src/main/java/org/broadinstitute/hellbender/cmdline/argumentcollections/ReadInputArgumentCollection.java b/src/main/java/org/broadinstitute/hellbender/cmdline/argumentcollections/ReadInputArgumentCollection.java index 12ff02f8738..294ca06e639 100644 --- a/src/main/java/org/broadinstitute/hellbender/cmdline/argumentcollections/ReadInputArgumentCollection.java +++ b/src/main/java/org/broadinstitute/hellbender/cmdline/argumentcollections/ReadInputArgumentCollection.java @@ -4,6 +4,7 @@ import org.broadinstitute.hellbender.cmdline.Argument; import org.broadinstitute.hellbender.cmdline.ArgumentCollectionDefinition; import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions; +import org.broadinstitute.hellbender.utils.read.ReadConstants; import java.io.File; import java.util.List; @@ -23,7 +24,7 @@ public abstract class ReadInputArgumentCollection implements ArgumentCollectionD "do not otherwise need to be decoded.", common=true, optional=true) - public ValidationStringency readValidationStringency = ValidationStringency.SILENT; + public ValidationStringency readValidationStringency = ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY; /** * Get the list of BAM/SAM/CRAM files specified at the command line diff --git a/src/main/java/org/broadinstitute/hellbender/engine/GATKTool.java b/src/main/java/org/broadinstitute/hellbender/engine/GATKTool.java index 54fb6e0f17a..d331251bc6a 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/GATKTool.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/GATKTool.java @@ -103,9 +103,8 @@ void initializeReference() { * May be overridden by traversals that require custom initialization of the reads data source. */ void initializeReads() { - SamReaderFactory factory = null; if (! readArguments.getReadFiles().isEmpty()) { - factory = SamReaderFactory.makeDefault().validationStringency(readArguments.getReadValidationStringency()); + SamReaderFactory factory = SamReaderFactory.makeDefault().validationStringency(readArguments.getReadValidationStringency()); if (hasReference()) { // pass in reference if available, because CRAM files need it factory = factory.referenceSequence(referenceArguments.getReferenceFile()); } diff --git a/src/main/java/org/broadinstitute/hellbender/engine/ReadsDataSource.java b/src/main/java/org/broadinstitute/hellbender/engine/ReadsDataSource.java index c76bea10d8c..5f2c95b6495 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/ReadsDataSource.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/ReadsDataSource.java @@ -10,6 +10,7 @@ import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.iterators.SAMRecordToReadIterator; import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.utils.read.ReadConstants; import java.io.File; import java.io.IOException; @@ -112,7 +113,7 @@ public ReadsDataSource( final List samFiles, SamReaderFactory customSamRea final SamReaderFactory samReaderFactory = customSamReaderFactory == null ? - SamReaderFactory.makeDefault().validationStringency(ValidationStringency.SILENT) : + SamReaderFactory.makeDefault().validationStringency(ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY) : customSamReaderFactory; for ( final File samFile : samFiles ) { diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java index 000d40f4928..60b67a16320 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/GATKSparkTool.java @@ -313,9 +313,9 @@ protected void runPipeline( JavaSparkContext sparkContext ) { /** * Initialize standard tool inputs. */ - private void initializeToolInputs(JavaSparkContext sparkContext) { - initializeReads(sparkContext); + private void initializeToolInputs(final JavaSparkContext sparkContext) { initializeReference(); + initializeReads(sparkContext); // in order to initialize reads, the reference must have been initializwd initializeIntervals(); } @@ -323,7 +323,7 @@ private void initializeToolInputs(JavaSparkContext sparkContext) { * Initializes our reads source (but does not yet load the reads into a {@link JavaRDD}). * Does nothing if no reads inputs are present. */ - private void initializeReads(JavaSparkContext sparkContext) { + private void initializeReads(final JavaSparkContext sparkContext) { if ( readArguments.getReadFilesNames().isEmpty() ) { return; } @@ -333,8 +333,11 @@ private void initializeReads(JavaSparkContext sparkContext) { } readInput = readArguments.getReadFilesNames().get(0); - readsSource = new ReadsSparkSource(sparkContext); - readsHeader = ReadsSparkSource.getHeader(sparkContext, readInput, getAuthHolder()); + readsSource = new ReadsSparkSource(sparkContext, readArguments.getReadValidationStringency()); + readsHeader = readsSource.getHeader( + readInput, + hasReference() ? referenceArguments.getReferenceFile().getAbsolutePath() : null, + getAuthHolder()); } /** diff --git a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java index 781de1bf03d..aa2294b07ab 100644 --- a/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java +++ b/src/main/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSource.java @@ -10,6 +10,8 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.parquet.avro.AvroParquetInputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -24,12 +26,14 @@ import org.broadinstitute.hellbender.utils.io.IOUtils; import org.broadinstitute.hellbender.utils.read.BDGAlignmentRecordToGATKReadAdapter; import org.broadinstitute.hellbender.utils.read.GATKRead; +import org.broadinstitute.hellbender.utils.read.ReadConstants; import org.broadinstitute.hellbender.utils.read.SAMRecordToGATKReadAdapter; import org.seqdoop.hadoop_bam.AnySAMInputFormat; import org.seqdoop.hadoop_bam.CRAMInputFormat; import org.seqdoop.hadoop_bam.SAMRecordWritable; import org.seqdoop.hadoop_bam.util.SAMHeaderReader; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.List; @@ -42,8 +46,16 @@ public final class ReadsSparkSource implements Serializable { private static final String HADOOP_PART_PREFIX = "part-"; private transient final JavaSparkContext ctx; - public ReadsSparkSource(JavaSparkContext ctx) { + private ValidationStringency validationStringency = ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY; + + protected final Logger logger = LogManager.getLogger(ReadsSparkSource.class); + + public ReadsSparkSource(final JavaSparkContext ctx) { this.ctx = ctx; } + + public ReadsSparkSource(final JavaSparkContext ctx, final ValidationStringency validationStringency) + { this.ctx = ctx; + this.validationStringency = validationStringency; } @@ -69,40 +81,24 @@ public JavaRDD getParallelReads(final String readFileName, final Strin * use the default split size (determined by the Hadoop input format, typically the size of one HDFS block). * @return RDD of (SAMRecord-backed) GATKReads from the file. */ - public JavaRDD getParallelReads(final String readFileName, final String referencePath, final List intervals, long splitSize) { - final Configuration conf = new Configuration(); + public JavaRDD getParallelReads(final String readFileName, final String referencePath, final List intervals, final long splitSize) { + final Configuration conf = ctx.hadoopConfiguration(); if (splitSize > 0) { conf.set("mapreduce.input.fileinputformat.split.maxsize", Long.toString(splitSize)); } final JavaPairRDD rdd2; - //Note: in Hadoop-bam AnySAMInputFormat does not support CRAM https://github.com/HadoopGenomics/Hadoop-BAM/issues/35 - //The workaround is to use CRAMInputFormat - if (IOUtils.isCramFileName(readFileName)) { - if (referencePath == null){ - throw new UserException.MissingReference("A reference file is required when using CRAM files."); - } - //Note: cram input requires a reference and reference is passed by this property. - conf.set(CRAMInputFormat.REFERENCE_SOURCE_PATH_PROPERTY, referencePath); + setHadoopBAMConfigurationProperties(readFileName, referencePath); - rdd2 = ctx.newAPIHadoopFile( - readFileName, CRAMInputFormat.class, LongWritable.class, SAMRecordWritable.class, - conf); - } else { - rdd2 = ctx.newAPIHadoopFile( + rdd2 = ctx.newAPIHadoopFile( readFileName, AnySAMInputFormat.class, LongWritable.class, SAMRecordWritable.class, conf); - } return rdd2.map(v1 -> { SAMRecord sam = v1._2().get(); if (samRecordOverlaps(sam, intervals)) { - try { - return (GATKRead)SAMRecordToGATKReadAdapter.headerlessReadAdapter(sam); - } catch (SAMException e) { - // TODO: add stringency - } + return (GATKRead)SAMRecordToGATKReadAdapter.headerlessReadAdapter(sam); } return null; @@ -130,7 +126,7 @@ public JavaRDD getParallelReads(final String readFileName, final Strin * @return RDD of (SAMRecord-backed) GATKReads from the file. */ public JavaRDD getParallelReads(final String readFileName, final String referencePath, int splitSize) { - final SAMFileHeader readsHeader = getHeader(ctx, readFileName, null); + final SAMFileHeader readsHeader = getHeader(readFileName, referencePath, null); List intervals = IntervalUtils.getAllIntervalsForReference(readsHeader.getSequenceDictionary()); return getParallelReads(readFileName, referencePath, intervals, splitSize); } @@ -161,15 +157,16 @@ public JavaRDD getADAMReads(final String inputPath, final List firstReads = getReads(); - ReadsSparkSource readsSource2 = new ReadsSparkSource(ctx); + ReadsSparkSource readsSource2 = new ReadsSparkSource(ctx, readArguments.getReadValidationStringency()); JavaRDD secondReads = readsSource2.getParallelReads(input2, null, getIntervals(), bamPartitionSplitSize); long firstBamSize = firstReads.count(); diff --git a/src/main/java/org/broadinstitute/hellbender/tools/spark/validation/CompareDuplicatesSpark.java b/src/main/java/org/broadinstitute/hellbender/tools/spark/validation/CompareDuplicatesSpark.java index aae1e466576..ad9460d4a64 100644 --- a/src/main/java/org/broadinstitute/hellbender/tools/spark/validation/CompareDuplicatesSpark.java +++ b/src/main/java/org/broadinstitute/hellbender/tools/spark/validation/CompareDuplicatesSpark.java @@ -54,7 +54,7 @@ protected void runTool(final JavaSparkContext ctx) { JavaRDD firstReads = filteredReads(getReads(), readArguments.getReadFilesNames().get(0)); - ReadsSparkSource readsSource2 = new ReadsSparkSource(ctx); + ReadsSparkSource readsSource2 = new ReadsSparkSource(ctx, readArguments.getReadValidationStringency()); JavaRDD secondReads = filteredReads(readsSource2.getParallelReads(input2, null, getIntervals(), bamPartitionSplitSize), input2); // Start by verifying that we have same number of reads and duplicates in each BAM. diff --git a/src/main/java/org/broadinstitute/hellbender/utils/read/ReadConstants.java b/src/main/java/org/broadinstitute/hellbender/utils/read/ReadConstants.java index 73e6d50d377..3b23eb347ba 100644 --- a/src/main/java/org/broadinstitute/hellbender/utils/read/ReadConstants.java +++ b/src/main/java/org/broadinstitute/hellbender/utils/read/ReadConstants.java @@ -1,10 +1,17 @@ package org.broadinstitute.hellbender.utils.read; +import htsjdk.samtools.ValidationStringency; + /** * Constants for use with the GATKRead interface */ public final class ReadConstants { + /** + * Value used as the default validation stringency for all read input + */ + public static ValidationStringency DEFAULT_READ_VALIDATION_STRINGENCY = ValidationStringency.SILENT; + /** * Value used to represent the absence of a defined start/end position in a read */ diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java index 30d98e6ad23..c1ec3865170 100644 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSinkUnitTest.java @@ -4,6 +4,7 @@ import htsjdk.samtools.SAMFileHeader; import htsjdk.samtools.SAMRecord; import htsjdk.samtools.SAMRecordCoordinateComparator; +import htsjdk.samtools.ValidationStringency; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -70,9 +71,9 @@ public void readsSinkTest(String inputBam, String outputFileName, String outputF final File outputFile = createTempFile(outputFileName, outputFileExtension); JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - ReadsSparkSource readSource = new ReadsSparkSource(ctx); + ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT); JavaRDD rddParallelReads = readSource.getParallelReads(inputBam, null); - SAMFileHeader header = ReadsSparkSource.getHeader(ctx, inputBam, null); + SAMFileHeader header = readSource.getHeader(inputBam, null, null); ReadsSparkSink.writeReads(ctx, outputFile.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.SINGLE); @@ -96,10 +97,10 @@ public void readsSinkShardedTest(String inputBam, String outputFileName, String final File outputFile = createTempFile(outputFileName, outputFileExtension); JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - ReadsSparkSource readSource = new ReadsSparkSource(ctx); + ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT); JavaRDD rddParallelReads = readSource.getParallelReads(inputBam, null); rddParallelReads = rddParallelReads.repartition(2); // ensure that the output is in two shards - SAMFileHeader header = ReadsSparkSource.getHeader(ctx, inputBam, null); + SAMFileHeader header = readSource.getHeader(inputBam, null, null); ReadsSparkSink.writeReads(ctx, outputFile.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.SHARDED); int shards = outputFile.listFiles((dir, name) -> !name.startsWith(".") && !name.startsWith("_")).length; @@ -123,9 +124,9 @@ public void readsSinkADAMTest(String inputBam, String outputDirectoryName) throw JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - ReadsSparkSource readSource = new ReadsSparkSource(ctx); + ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT); JavaRDD rddParallelReads = readSource.getParallelReads(inputBam, null); - SAMFileHeader header = ReadsSparkSource.getHeader(ctx, inputBam, null); + SAMFileHeader header = readSource.getHeader(inputBam, null, null); ReadsSparkSink.writeReads(ctx, outputDirectory.getAbsolutePath(), rddParallelReads, header, ReadsWriteFormat.ADAM); diff --git a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSourceUnitTest.java b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSourceUnitTest.java index 201873a84fa..d9de0434032 100644 --- a/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSourceUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/engine/spark/datasources/ReadsSparkSourceUnitTest.java @@ -2,6 +2,7 @@ import com.google.common.collect.Lists; import htsjdk.samtools.SAMFileHeader; +import htsjdk.samtools.SAMFormatException; import htsjdk.samtools.SamReaderFactory; import htsjdk.samtools.ValidationStringency; import org.apache.hadoop.conf.Configuration; @@ -11,6 +12,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.broadinstitute.hellbender.engine.ReadsDataSource; import org.broadinstitute.hellbender.engine.spark.SparkContextFactory; +import org.broadinstitute.hellbender.exceptions.UserException; import org.broadinstitute.hellbender.utils.IntervalUtils; import org.broadinstitute.hellbender.utils.SimpleInterval; import org.broadinstitute.hellbender.utils.read.GATKRead; @@ -20,6 +22,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import javax.validation.Valid; import java.io.File; import java.io.IOException; import java.util.List; @@ -33,14 +36,13 @@ public class ReadsSparkSourceUnitTest extends BaseTest { @DataProvider(name = "loadReads") public Object[][] loadReads() { return new Object[][]{ - {NA12878_chr17_1k_BAM, null}, + {NA12878_chr17_1k_BAM, null, ValidationStringency.SILENT}, - {dirBQSR + "HiSeq.1mb.1RG.2k_lines.alternate.bam", null}, - {dirBQSR + "expected.HiSeq.1mb.1RG.2k_lines.alternate.recalibrated.DIQ.bam", null}, + {dirBQSR + "HiSeq.1mb.1RG.2k_lines.alternate.bam", null, ValidationStringency.STRICT}, + {dirBQSR + "expected.HiSeq.1mb.1RG.2k_lines.alternate.recalibrated.DIQ.bam", null, ValidationStringency.STRICT}, - //reading CRAM on Spark needs a way to relax validation stringency https://github.com/broadinstitute/gatk/issues/1261 -// {NA12878_chr17_1k_CRAM, new File(v37_chr17_1Mb_Reference)}, - {dir + "valid.cram", dir + "valid.fasta"} + {NA12878_chr17_1k_CRAM, v37_chr17_1Mb_Reference, ValidationStringency.SILENT}, + {dir + "valid.cram", dir + "valid.fasta", ValidationStringency.STRICT} }; } @@ -52,12 +54,19 @@ public Object[][] loadShardedReads() { }; } - @Test(dataProvider = "loadReads", groups = "spark") - public void readsSparkSourceTest(String bam, String referencePath) { + @DataProvider(name = "loadReadsValidation") // these require validation lenient or silent + public Object[][] loadReadsValidation() { + return new Object[][]{ + {NA12878_chr17_1k_BAM, null}, + {NA12878_chr17_1k_CRAM, v37_chr17_1Mb_Reference}, + }; + } + + private void doLoadReadsTest(String bam, String referencePath, ValidationStringency validationStringency) { JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - ReadsSparkSource readSource = new ReadsSparkSource(ctx); - JavaRDD rddSerialReads = getSerialReads(ctx, bam, referencePath); + ReadsSparkSource readSource = new ReadsSparkSource(ctx, validationStringency); + JavaRDD rddSerialReads = getSerialReads(ctx, bam, referencePath, validationStringency); JavaRDD rddParallelReads = readSource.getParallelReads(bam, referencePath); List serialReads = rddSerialReads.collect(); @@ -65,12 +74,32 @@ public void readsSparkSourceTest(String bam, String referencePath) { Assert.assertEquals(serialReads.size(), parallelReads.size()); } + @Test(dataProvider = "loadReads", groups = "spark") + public void readsSparkSourceTest(String bam, String referencePath, ValidationStringency validationStringency) { + doLoadReadsTest(bam, referencePath, validationStringency); + } + + @Test(dataProvider = "loadReadsValidation", groups = "spark", expectedExceptions = SAMFormatException.class) + public void readsSparkSourceTestStrict(String bam, String referencePath) { + doLoadReadsTest(bam, referencePath, ValidationStringency.STRICT); + } + + @Test(dataProvider = "loadReadsValidation", groups = "spark") + public void readsSparkSourceTestLenient(String bam, String referencePath) { + doLoadReadsTest(bam, referencePath, ValidationStringency.LENIENT); + } + + @Test(dataProvider = "loadReadsValidation", groups = "spark") + public void readsSparkSourceTestSilent(String bam, String referencePath) { + doLoadReadsTest(bam, referencePath, ValidationStringency.SILENT); + } + @Test(dataProvider = "loadShardedReads", groups = "spark") public void shardedReadsSparkSourceTest(String expectedBam, String shardedBam, String referencePath) { JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - ReadsSparkSource readSource = new ReadsSparkSource(ctx); - JavaRDD rddSerialReads = getSerialReads(ctx, expectedBam, referencePath); + ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT); + JavaRDD rddSerialReads = getSerialReads(ctx, expectedBam, referencePath, ValidationStringency.DEFAULT_STRINGENCY); JavaRDD rddParallelReads = readSource.getParallelReads(shardedBam, referencePath); List serialReads = rddSerialReads.collect(); @@ -81,7 +110,7 @@ public void shardedReadsSparkSourceTest(String expectedBam, String shardedBam, S @Test(groups = "spark") public void testHeadersAreStripped() { JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - ReadsSparkSource readSource = new ReadsSparkSource(ctx); + ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT); final List reads = readSource.getParallelReads(dirBQSR + "HiSeq.1mb.1RG.2k_lines.alternate.bam", null).collect(); for ( final GATKRead read : reads ) { @@ -89,13 +118,23 @@ public void testHeadersAreStripped() { } } + @Test(groups = "spark", expectedExceptions=UserException.class) + public void testReject2BitCRAMReference() { + doLoadReadsTest(NA12878_chr17_1k_CRAM, b37_2bit_reference_20_21, ValidationStringency.STRICT); + } + + @Test(groups = "spark", expectedExceptions=UserException.class) + public void testCRAMReferenceRequired() { + doLoadReadsTest(NA12878_chr17_1k_CRAM, null, ValidationStringency.STRICT); + } + @Test public void testPartitionSizing(){ String bam = dirBQSR + "HiSeq.1mb.1RG.2k_lines.alternate.bam"; //file is ~220 kB JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - ReadsSparkSource readSource = new ReadsSparkSource(ctx); + ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT); JavaRDD allInOnePartition = readSource.getParallelReads(bam, null); JavaRDD smallPartitions = readSource.getParallelReads(bam, null, 100 * 1024); // 100 kB Assert.assertEquals(allInOnePartition.partitions().size(), 1); @@ -116,7 +155,7 @@ public void testReadFromFileAndHDFS() throws IOException { cluster.getFileSystem().copyFromLocalFile(new Path(bai.toURI()), baiPath); final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - final ReadsSparkSource readsSparkSource = new ReadsSparkSource(ctx); + final ReadsSparkSource readsSparkSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT); final List localReads = readsSparkSource.getParallelReads(bam.toURI().toString(), null).collect(); final List hdfsReads = readsSparkSource.getParallelReads(bamPath.toUri().toString(), null).collect(); @@ -125,7 +164,39 @@ public void testReadFromFileAndHDFS() throws IOException { } finally { if (cluster != null) { - cluster.shutdown(); + cluster.shutdown(true); // delete hdfs dir on shutdown + } + } + } + + @Test + public void testCRAMReferenceFromHDFS() throws IOException { + final File cram = new File(NA12878_chr17_1k_CRAM); + final File reference = new File(v37_chr17_1Mb_Reference); + final File referenceIndex = new File(v37_chr17_1Mb_Reference + ".fai"); + + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(new Configuration()).build(); + final Path workingDirectory = cluster.getFileSystem().getWorkingDirectory(); + final Path cramPath = new Path(workingDirectory,"hdfs.cram"); + final Path refPath = new Path(workingDirectory, "hdfs.fasta"); + final Path refIndexPath = new Path(workingDirectory, "hdfs.fasta.fai"); + cluster.getFileSystem().copyFromLocalFile(new Path(cram.toURI()), cramPath); + cluster.getFileSystem().copyFromLocalFile(new Path(reference.toURI()), refPath); + cluster.getFileSystem().copyFromLocalFile(new Path(referenceIndex.toURI()), refIndexPath); + + final JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); + final ReadsSparkSource readsSparkSource = new ReadsSparkSource(ctx); + final List localReads = readsSparkSource.getParallelReads(cram.toURI().toString(), reference.toURI().toString()).collect(); + final List hdfsReads = readsSparkSource.getParallelReads(cramPath.toUri().toString(), refPath.toUri().toString()).collect(); + + Assert.assertFalse(localReads.isEmpty()); + Assert.assertEquals(localReads, hdfsReads); + } finally { + + if (cluster != null) { + cluster.shutdown(true); // delete hdfs dir on shutdown } } } @@ -135,16 +206,16 @@ public void testReadFromFileAndHDFS() throws IOException { * @param bam file to load * @return RDD of (SAMRecord-backed) GATKReads from the file. */ - public JavaRDD getSerialReads(final JavaSparkContext ctx, final String bam, final String referencePath) { - final SAMFileHeader readsHeader = ReadsSparkSource.getHeader(ctx, bam, null); + public JavaRDD getSerialReads(final JavaSparkContext ctx, final String bam, final String referencePath, final ValidationStringency validationStringency) { + final SAMFileHeader readsHeader = new ReadsSparkSource(ctx, validationStringency).getHeader(bam, referencePath, null); List intervals = IntervalUtils.getAllIntervalsForReference(readsHeader.getSequenceDictionary()); final SamReaderFactory samReaderFactory; if (referencePath != null) { final File reference = new File(referencePath); - samReaderFactory = SamReaderFactory.makeDefault().validationStringency(ValidationStringency.SILENT).referenceSequence(reference); + samReaderFactory = SamReaderFactory.makeDefault().validationStringency(validationStringency).referenceSequence(reference); } else { - samReaderFactory = SamReaderFactory.makeDefault().validationStringency(ValidationStringency.SILENT); + samReaderFactory = SamReaderFactory.makeDefault().validationStringency(validationStringency); } ReadsDataSource bam2 = new ReadsDataSource(new File(bam), samReaderFactory); diff --git a/src/test/java/org/broadinstitute/hellbender/tools/picard/sam/SortSamIntegrationTest.java b/src/test/java/org/broadinstitute/hellbender/tools/picard/sam/SortSamIntegrationTest.java index 629a8432aff..afaba04efe4 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/picard/sam/SortSamIntegrationTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/picard/sam/SortSamIntegrationTest.java @@ -18,9 +18,8 @@ public Object[][] sortBAMData() { {"count_reads.bam", "count_reads_sorted.bam", "count_reads.fasta", ".cram", "coordinate"}, {"count_reads.bam", "count_reads.bam", null, ".bam", "queryname"}, {"count_reads.cram", "count_reads_sorted.cram", "count_reads.fasta", ".cram", "coordinate"}, - {"count_reads.cram", "count_reads_sorted.cram", "count_reads.fasta", ".bam", "coordinate"} - // requires fix in https://github.com/samtools/htsjdk/issues/404 - //{"count_reads.cram", "count_reads.cram", "count_reads.fasta", ".cram", "queryname"} + {"count_reads.cram", "count_reads_sorted.cram", "count_reads.fasta", ".bam", "coordinate"}, + {"count_reads.cram", "count_reads.cram", "count_reads.fasta", ".cram", "queryname"} }; } diff --git a/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUnitTest.java b/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUnitTest.java index 6831c74cda0..eff474d1a7e 100644 --- a/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUnitTest.java +++ b/src/test/java/org/broadinstitute/hellbender/tools/spark/transforms/markduplicates/MarkDuplicatesSparkUnitTest.java @@ -1,6 +1,7 @@ package org.broadinstitute.hellbender.tools.spark.transforms.markduplicates; import htsjdk.samtools.SAMFileHeader; +import htsjdk.samtools.ValidationStringency; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.broadinstitute.hellbender.cmdline.argumentcollections.OpticalDuplicatesArgumentCollection; @@ -32,11 +33,11 @@ public Object[][] loadReads() { public void markDupesTest(final String input, final long totalExpected, final long dupsExpected) throws IOException { JavaSparkContext ctx = SparkContextFactory.getTestSparkContext(); - ReadsSparkSource readSource = new ReadsSparkSource(ctx); + ReadsSparkSource readSource = new ReadsSparkSource(ctx, ValidationStringency.STRICT); JavaRDD reads = readSource.getParallelReads(input, null); Assert.assertEquals(reads.count(), totalExpected); - SAMFileHeader header = ReadsSparkSource.getHeader(ctx, input, null); + SAMFileHeader header = readSource.getHeader(input, null, null); OpticalDuplicatesArgumentCollection opticalDuplicatesArgumentCollection = new OpticalDuplicatesArgumentCollection(); final OpticalDuplicateFinder finder = opticalDuplicatesArgumentCollection.READ_NAME_REGEX != null ? new OpticalDuplicateFinder(opticalDuplicatesArgumentCollection.READ_NAME_REGEX, opticalDuplicatesArgumentCollection.OPTICAL_DUPLICATE_PIXEL_DISTANCE, null) : null;