Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not read contig and filter header lines on VCF ingest #189

Merged
merged 2 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/io/projectglow/vcf/VCFFileFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ private[vcf] object SchemaDelegate {
val infoHeaderLines = ArrayBuffer[VCFInfoHeaderLine]()
val formatHeaderLines = ArrayBuffer[VCFFormatHeaderLine]()
VCFHeaderUtils
.readHeaderLines(spark, files.map(_.getPath.toString))
.readHeaderLines(spark, files.map(_.getPath.toString), getNonSchemaHeaderLines = false)
.foreach {
case i: VCFInfoHeaderLine => infoHeaderLines += i
case f: VCFFormatHeaderLine => formatHeaderLines += f
Expand Down
40 changes: 30 additions & 10 deletions core/src/main/scala/io/projectglow/vcf/VCFHeaderUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,30 @@ object VCFHeaderUtils extends GlowLogging {
}

/**
* Find the unique header lines from an RDD of VCF headers.
* Find the unique desired header lines from an RDD of VCF headers.
* If lines of the same class found with the same ID, we pick one unless they are incompatible.
* If there are incompatible lines, [[IllegalArgumentException]] is thrown.
* Incompatible lines are:
* - FORMAT or INFO lines with the same ID but different types or counts
* - contig lines with the same ID but different lengths
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a brief comment about the meaning of getNonSchemaHeaderLines.

*
* @param headers VCF headers to parse header lines from
* @param getNonSchemaHeaderLines If false, parses only INFO and FORMAT lines.
* If true, also parses contig and filter lines.
*/
def getUniqueHeaderLines(headers: RDD[VCFHeader]): Seq[VCFHeaderLine] = {
def getUniqueHeaderLines(
headers: RDD[VCFHeader],
getNonSchemaHeaderLines: Boolean): Seq[VCFHeaderLine] = {
headers.flatMap { header =>
val infoHeaderLines = header.getInfoHeaderLines.asScala
val formatHeaderLines = header.getFormatHeaderLines.asScala
val contigHeaderLines = header.getContigLines.asScala
val filterHeaderLines = header.getFilterLines.asScala
infoHeaderLines ++ formatHeaderLines ++ contigHeaderLines ++ filterHeaderLines
val schemaHeaderLines = header.getInfoHeaderLines.asScala ++ header
.getFormatHeaderLines
.asScala
val nonSchemaHeaderLines = if (getNonSchemaHeaderLines) {
header.getContigLines.asScala ++ header.getFilterLines.asScala
} else {
Seq.empty
}
schemaHeaderLines ++ nonSchemaHeaderLines
}.keyBy(line => (line.getClass.getName, line.getID))
.reduceByKey {
case (line1: VCFCompoundHeaderLine, line2: VCFCompoundHeaderLine) =>
Expand All @@ -138,16 +148,26 @@ object VCFHeaderUtils extends GlowLogging {
s"and $line2. Header lines with the same ID must have the same length.")
}
case (line1: VCFFilterHeaderLine, _: VCFFilterHeaderLine) => line1
case (line1, _) =>
throw new IllegalArgumentException(
s"Collected unexpected header line type: ${line1.getClass.getName}")
}
.values
.collect()
}

/**
* A convenience function to parse the headers from a set of VCF files and return the unique
* A convenience function to parse the headers from a set of VCF files and return the desired
* header lines.
*
* @param files VCF files to parse header lines from
* @param getNonSchemaHeaderLines If false, parses only INFO and FORMAT lines.
* If true, also parses contig and filter lines.
*/
def readHeaderLines(spark: SparkSession, files: Seq[String]): Seq[VCFHeaderLine] = {
getUniqueHeaderLines(createHeaderRDD(spark, files))
def readHeaderLines(
spark: SparkSession,
files: Seq[String],
getNonSchemaHeaderLines: Boolean): Seq[VCFHeaderLine] = {
getUniqueHeaderLines(createHeaderRDD(spark, files), getNonSchemaHeaderLines)
}
}
11 changes: 11 additions & 0 deletions core/src/test/scala/io/projectglow/vcf/VCFDatasourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,17 @@ class VCFDatasourceSuite extends GlowBaseTest {
test("Do not break when reading directory with index files") {
spark.read.format(sourceName).load(s"$testDataHome/tabix-test-vcf")
}

test("Do not break when reading VCFs with contig lines missing length") {
// Read two copies of the same file to trigger a header line merge
// May break if we parse contig header lines missing length
spark
.read
.format(sourceName)
.load(
s"$testDataHome/vcf/missing_contig_length.vcf",
s"$testDataHome/vcf/missing_contig_length.vcf")
}
}

// For testing only: schema based on CEUTrio VCF header
Expand Down
30 changes: 19 additions & 11 deletions core/src/test/scala/io/projectglow/vcf/VCFHeaderUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class VCFHeaderUtilsSuite extends GlowBaseTest {
}
}

test("merge header lines") {
gridTest("merge all header lines")(Seq(true, false)) { getNonSchemaHeaderLines =>
val file1 =
s"""
|##fileformat=VCFv4.2
Expand All @@ -155,28 +155,35 @@ class VCFHeaderUtilsSuite extends GlowBaseTest {
|##contig=<ID=21,length=48129895>
""".stripMargin
val paths = writeVCFHeaders(Seq(file1, file2))
val lines = VCFHeaderUtils.readHeaderLines(spark, paths)
val lines = VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines)

val expectedLines = Set(
val expectedSchemaLines = Set(
new VCFInfoHeaderLine("animal", 1, VCFHeaderLineType.String, "monkey"),
new VCFInfoHeaderLine("color", VCFHeaderLineCount.G, VCFHeaderLineType.String, ""),
new VCFFormatHeaderLine("AD", VCFHeaderLineCount.R, VCFHeaderLineType.Integer, ""),
new VCFFormatHeaderLine("DP", 1, VCFHeaderLineType.Integer, ""),
new VCFFilterHeaderLine("LowQual", "Low Quality"),
new VCFContigHeaderLine("<ID=20,length=63025520>", VCFHeaderVersion.VCF4_2, "contig", 0),
new VCFContigHeaderLine("<ID=21,length=48129895>", VCFHeaderVersion.VCF4_2, "contig", 1)
new VCFFormatHeaderLine("DP", 1, VCFHeaderLineType.Integer, "")
)
val expectedNonSchemaLines = if (getNonSchemaHeaderLines) {
Set(
new VCFFilterHeaderLine("LowQual", "Low Quality"),
new VCFContigHeaderLine("<ID=20,length=63025520>", VCFHeaderVersion.VCF4_2, "contig", 0),
new VCFContigHeaderLine("<ID=21,length=48129895>", VCFHeaderVersion.VCF4_2, "contig", 1)
)
} else {
Set.empty
}

// We compare the string-encoded versions of the header lines to avoid direct object comparisons
val sortedLines = lines.map(_.toString).toSet
val sortedExpectedLines = expectedLines.map(_.toString)
val sortedExpectedLines = (expectedSchemaLines ++ expectedNonSchemaLines).map(_.toString)
assert(lines.size == sortedLines.size)
assert(sortedLines == sortedExpectedLines)
}

def checkLinesIncompatible(file1: String, file2: String): Unit = {
val paths = writeVCFHeaders(Seq(file1, file2))
val ex = intercept[SparkException](VCFHeaderUtils.readHeaderLines(spark, paths))
val ex = intercept[SparkException](
VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines = true))
assert(ex.getCause.isInstanceOf[IllegalArgumentException])
}

Expand Down Expand Up @@ -226,15 +233,16 @@ class VCFHeaderUtilsSuite extends GlowBaseTest {
|##FORMAT=<ID=animal,Number=2,Type=String,Description="monkey">
""".stripMargin
val paths = writeVCFHeaders(Seq(file1, file2))
VCFHeaderUtils.readHeaderLines(spark, paths) // no exception
VCFHeaderUtils.readHeaderLines(spark, paths, getNonSchemaHeaderLines = true) // no exception
}

test("does not try to read tabix indices") {
assert(
VCFHeaderUtils
.readHeaderLines(
spark,
Seq(s"$testDataHome/tabix-test-vcf/NA12878_21_10002403NoTbi.vcf.gz.tbi"))
Seq(s"$testDataHome/tabix-test-vcf/NA12878_21_10002403NoTbi.vcf.gz.tbi"),
getNonSchemaHeaderLines = true)
.isEmpty)
}
}
7 changes: 7 additions & 0 deletions test-data/vcf/missing_contig_length.vcf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
##fileformat=VCFv4.2
##FORMAT=<ID=AD,Number=1,Type=Integer,Description="Allelic depths for the ref and alt alleles in theorder listed">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Approximate read depth; some reads may have beenfiltered">
##contig=<ID=1>
##contig=<ID=2>
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SAMPLE1
1 1 a A G . . . AD 1,2