From f567c0abc116794c1a2d3378be0ec6c33dc943e5 Mon Sep 17 00:00:00 2001 From: tfenne Date: Thu, 22 Aug 2019 09:09:01 -0600 Subject: [PATCH] Attempt to make the GATK tasks a) auto-detect GATK version and b) switch command lines accordingly. --- .../main/scala/dagr/tasks/gatk/GatkTask.scala | 56 ++++++++++++++++--- .../scala/dagr/tasks/gatk/GenotypeGvcfs.scala | 6 +- .../dagr/tasks/gatk/HaplotypeCaller.scala | 32 ++++++----- 3 files changed, 71 insertions(+), 23 deletions(-) diff --git a/tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala b/tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala index 16b75316..ec8247d1 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala @@ -23,18 +23,43 @@ */ package dagr.tasks.gatk -import java.nio.file.Path +import java.nio.file.{Files, Path} +import java.util.jar.JarInputStream import dagr.core.config.Configuration import dagr.core.execsystem.{Cores, Memory} import dagr.core.tasksystem.{FixedResources, ProcessTask} -import dagr.tasks.{DagrDef, JarTask} -import DagrDef.{PathToFasta, PathToIntervals} +import dagr.tasks.JarTask +import com.fulcrumgenomics.commons.CommonsDef._ import scala.collection.mutable.ListBuffer object GatkTask { val GatkJarPathConfigKey = "gatk.jar" + + /** Attempts to determine the major version from the GATK Jar. */ + def majorVersion(jar: FilePath): Int = { + val in = new JarInputStream(Files.newInputStream(jar)) + val attrs = in.getManifest.getMainAttributes + in.close() + + if (attrs.containsKey("Implementation-Version")) { + attrs.getValue("Implementation-Version").takeWhile(_ != '.').toInt + } + else { + attrs.getValue("Main-Class") match { + case "org.broadinstitute.sting.gatk.CommandLineGATK" => 1 + case "org.broadinstitute.gatk.engine.CommandLineGATK" => 3 + case x => throw new IllegalArgumentException(s"Couldn't determind GATK version from jar $jar") + } + } + } + + /** The path to the "standard" GATK jar. */ + lazy val GatkJarPath: FilePath = Configuration.configure[Path](GatkTask.GatkJarPathConfigKey) + + /** The major version of the "standard" GATK jar. */ + lazy val GatkMajorVersion: Int = majorVersion(GatkJarPath) } @@ -49,12 +74,18 @@ abstract class GatkTask(val walker: String, requires(Cores(1), Memory("4g")) override def args: Seq[Any] = { - val buffer = ListBuffer[Any]() - buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory)) - buffer.append("-T", this.walker) + val buffer = ListBuffer[Any]() + val jvmArgs = if (gatkMajorVersion >= 4) bamCompression.map(c => s"-Dsamjdk.compression_level=$c") else None + buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory, jvmArgs=jvmArgs)) + + if (gatkMajorVersion < 4) buffer += "-T" + buffer += this.walker + + if (gatkMajorVersion < 4) bamCompression.foreach(c => buffer.append("--bam_compression", c)) + buffer.append("-R", this.ref.toAbsolutePath.toString) intervals.foreach(il => buffer.append("-L", il.toAbsolutePath.toString)) - bamCompression.foreach(c => buffer.append("--bam_compression", c)) + addWalkerArgs(buffer) buffer.toSeq } @@ -62,5 +93,16 @@ abstract class GatkTask(val walker: String, /** Can be overridden to use a specific GATK jar. */ protected def gatkJar: Path = configure[Path](GatkTask.GatkJarPathConfigKey) + /** The version of GATK being used by this task. */ + protected lazy val gatkMajorVersion: Int = { + val jar = gatkJar + if (jar == GatkTask.GatkJarPath) GatkTask.GatkMajorVersion + else GatkTask.majorVersion(jar) + } + + /** Adds arguments specific to the walker. */ protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit + + /** Helper function to select an argument name for pre vs. post V4 naming. */ + protected def either(preV4Name: String, postV4Name: String): String = if (gatkMajorVersion < 4) preV4Name else postV4Name } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala b/tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala index d675690b..0357cb38 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala @@ -50,9 +50,13 @@ class GenotypeGvcfs private (ref: PathToFasta, val dbSnpVcf: Option[PathToVcf] = None) extends GatkTask("GenotypeGVCFs", ref, intervals=intervals) { + if (gatkMajorVersion >= 4 && gvcfs.length > 1) throw new IllegalStateException( + "GenotypeGVCFs only supports one GVCF at a time with GATK version 4+." + ) + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { dbSnpVcf.foreach(v => buffer.append("--dbsnp", v.toAbsolutePath.toString)) gvcfs.foreach(gvcf => buffer.append("-V", gvcf.toAbsolutePath.toString)) - buffer.append("-o", vcf.toAbsolutePath.toString) + buffer.append(either("--out", "--output"), vcf.toAbsolutePath.toString) } } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala b/tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala index e20c2367..2307dfc7 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala @@ -47,25 +47,27 @@ class HaplotypeCaller(ref: PathToFasta, extends GatkTask("HaplotypeCaller", ref, intervals=intervals) { override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { - buffer.append("--minPruning", minPruning) - buffer.append("--maxNumHaplotypesInPopulation", "200") - buffer.append("-variant_index_parameter", "128000") - buffer.append("-variant_index_type", "LINEAR") - buffer.append("--emitRefConfidence", "GVCF") - ploidy.foreach(p => buffer.append("--sample_ploidy", p)) - buffer.append("--max_alternate_alleles", maxAlternateAlleles) - maxReadsInRegionPerSample.foreach(n => buffer.append("--maxReadsInRegionPerSample", n)) - minReadsPerAlignmentStart.foreach(n => buffer.append("--minReadsPerAlignmentStart", n)) - buffer.append("--contamination_fraction_to_filter", contaminationFraction) buffer.append("-I", bam.toAbsolutePath) buffer.append("-o", vcf.toAbsolutePath) + buffer.append(either("--minPruning", "--min-pruning"), minPruning) + buffer.append(either("--maxNumHaplotypesInPopulation", "--max-num-haplotypes-in-population"), "200") + buffer.append(either("--emitRefConfidence", "--emit-ref-confidence"), "GVCF") + buffer.append(either("--max_alternate_alleles", "--max-alternate-alleles"), maxAlternateAlleles) + buffer.append(either("--contamination_fraction_to_filter", "--contamination-fraction-to-filter"), contaminationFraction) + ploidy.foreach(p => buffer.append(either("--sample_ploidy", "--sample-ploidy"), p)) + if (useNativePairHmm) buffer.append("-pairHMM", either("VECTOR_LOGLESS_CACHING", "FASTEST_AVAILABLE")) - if (rnaMode) { - buffer.append("-dontUseSoftClippedBases") - buffer.append("-stand_call_conf", 20) - buffer.append("-stand_emit_conf", 20) + if (gatkMajorVersion < 4) { + buffer.append("-variant_index_parameter", "128000") + buffer.append("-variant_index_type", "LINEAR") + maxReadsInRegionPerSample.foreach(n => buffer.append("--maxReadsInRegionPerSample", n)) + minReadsPerAlignmentStart.foreach(n => buffer.append("--minReadsPerAlignmentStart", n)) } - if (useNativePairHmm) buffer.append("-pairHMM", "VECTOR_LOGLESS_CACHING") + if (rnaMode) { + buffer.append(either("-dontUseSoftClippedBases", "--dont-use-soft-clipped-bases")) + buffer.append(either("-stand_call_conf", "-stand-call-conf"), 20) + if (gatkMajorVersion < 4) buffer.append("-stand_emit_conf", 20) + } } }