Skip to content
This repository was archived by the owner on Dec 20, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/main/scala/com/databricks/spark/avro/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.zip.Deflater

import scala.util.control.NonFatal

import com.databricks.spark.avro.DefaultSource.{AvroSchema, IgnoreFilesWithoutExtensionProperty, SerializableConfiguration}
import com.databricks.spark.avro.DefaultSource.{AvroSchema, SerializableConfiguration}
import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.avro.{Schema, SchemaBuilder}
Expand Down Expand Up @@ -63,7 +63,7 @@ private[avro] class DefaultSource extends FileFormat with DataSourceRegister {

// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
val sampleFile = if (conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)) {
val sampleFile = if (DefaultSource.ignoreFilesWithoutExtensions(conf)) {
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" is " +
Expand Down Expand Up @@ -169,10 +169,7 @@ private[avro] class DefaultSource extends FileFormat with DataSourceRegister {
// Doing input file filtering is improper because we may generate empty tasks that process no
// input files but stress the scheduler. We should probably add a more general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
if (
conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true) &&
!file.filePath.endsWith(".avro")
) {
if (DefaultSource.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
Iterator.empty
} else {
val reader = {
Expand Down Expand Up @@ -283,4 +280,11 @@ private[avro] object DefaultSource {
value.readFields(new DataInputStream(in))
}
}

def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
// Files without .avro extensions are not ignored by default
val defaultValue = false

conf.getBoolean(IgnoreFilesWithoutExtensionProperty, defaultValue)
}
}
Binary file added src/test/resources/episodesAvro
Binary file not shown.
47 changes: 40 additions & 7 deletions src/test/scala/com/databricks/spark/avro/AvroSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.databricks.spark.avro

import java.io._
import java.nio.file.Files
import java.net.URL
import java.nio.file.{Files, Path, Paths}
import java.sql.{Date, Timestamp}
import java.util.{TimeZone, UUID}

Expand All @@ -38,6 +39,7 @@ import org.apache.spark.sql.types._
class AvroSuite extends FunSuite with BeforeAndAfterAll {
val episodesFile = "src/test/resources/episodes.avro"
val testFile = "src/test/resources/test.avro"
val episodesWithoutExtension = "src/test/resources/episodesAvro"

private var spark: SparkSession = _

Expand Down Expand Up @@ -625,7 +627,12 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll {
intercept[FileNotFoundException] {
TestUtils.withTempDir { dir =>
FileUtils.touch(new File(dir, "test"))
spark.read.avro(dir.toString)
val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
try {
hadoopConf.set(DefaultSource.IgnoreFilesWithoutExtensionProperty, "true")
spark.read.avro(dir.toString)
} finally {
hadoopConf.unset(DefaultSource.IgnoreFilesWithoutExtensionProperty) }
}
}

Expand Down Expand Up @@ -687,12 +694,18 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll {

Files.createFile(new File(tempSaveDir, "non-avro").toPath)

val newDf = spark
.read
.option(DefaultSource.IgnoreFilesWithoutExtensionProperty, "true")
.avro(tempSaveDir)
val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
val count = try {
hadoopConf.set(DefaultSource.IgnoreFilesWithoutExtensionProperty, "true")
val newDf = spark
.read
.avro(tempSaveDir)
newDf.count()
} finally {
hadoopConf.unset(DefaultSource.IgnoreFilesWithoutExtensionProperty)
}

assert(newDf.count == 8)
assert(count == 8)
}
}

Expand Down Expand Up @@ -808,4 +821,24 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll {
assert(readDf.collect().sameElements(writeDf.collect()))
}
}

test("do not ignore files without .avro extension by default") {
TestUtils.withTempDir { dir =>
dir.mkdirs()
Files.copy(
Paths.get(episodesWithoutExtension),
Paths.get(dir.getCanonicalPath, "episodes"))

val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes"
val df1 = spark.read.avro(fileWithoutExtension)
assert(df1.count == 8)

val schema = new StructType()
.add("title", StringType)
.add("air_date", StringType)
.add("doctor", IntegerType)
val df2 = spark.read.schema(schema).avro(fileWithoutExtension)
assert(df2.count == 8)
}
}
}