Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
val sampleFile =
if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running queries. The option avro.mapred.ignore.inputs.without.extension is not set in conf. This is a bug in spark-avro.
Please read the value from options. It would be good to have a new test case with avro.mapred.ignore.inputs.without.extension as true.

Copy link
Member Author

@MaxGekk MaxGekk Jul 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The avro.mapred.ignore.inputs.without.extension is hadoop's parameter. This PR aims to change the default behavior only. I would prefer to do not convert the hadoop parameter to Avro datasource option here.

Copy link
Member Author

@MaxGekk MaxGekk Jul 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is how people use the option so far: databricks/spark-avro#71 (comment) . Probably we should discuss seperatly from this PR how we could fix the "bug" and could not break backward compatibily.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hadoop config can be changed like:

spark
  .sqlContext
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we submit a separate PR to add a new option for AVRO? We should not rely on hadoopConf to control the behaviors of AVRO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR: #21798 Please, have a look at it.

files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
Expand Down Expand Up @@ -172,10 +172,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Doing input file filtering is improper because we may generate empty tasks that process no
// input files but stress the scheduler. We should probably add a more general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
if (
conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
!file.filePath.endsWith(".avro")
) {
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
Iterator.empty
} else {
val reader = {
Expand Down Expand Up @@ -286,4 +283,11 @@ private[avro] object AvroFileFormat {
value.readFields(new DataInputStream(in))
}
}

def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
// Files without .avro extensions are not ignored by default
val defaultValue = false

conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue)
}
}
Binary file added external/avro/src/test/resources/episodesAvro
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.types._
class AvroSuite extends SparkFunSuite {
val episodesFile = "src/test/resources/episodes.avro"
val testFile = "src/test/resources/test.avro"
val episodesWithoutExtension = "src/test/resources/episodesAvro"

private var spark: SparkSession = _

Expand Down Expand Up @@ -623,7 +624,7 @@ class AvroSuite extends SparkFunSuite {
spark.read.avro("*/*/*/*/*/*/*/something.avro")
}

intercept[FileNotFoundException] {
intercept[java.io.IOException] {
TestUtils.withTempDir { dir =>
FileUtils.touch(new File(dir, "test"))
spark.read.avro(dir.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fix the case as

spark.read.option("avro.mapred.ignore.inputs.without.extension", false).avro(dir.toString)

The behavior will be the same as before. And we don't need to modify the expected FileNotFoundException

Copy link
Member Author

@MaxGekk MaxGekk Jul 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually remove this piece of code from the test. It checked the default behavior but it is checked by special test now. Explicit settings for avro.mapred.ignore.inputs.without.extension should be checked in separate tests where the config is set explicitly.

Expand Down Expand Up @@ -809,4 +810,16 @@ class AvroSuite extends SparkFunSuite {
assert(readDf.collect().sameElements(writeDf.collect()))
}
}

test("SPARK-24805: reading files without .avro extension") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we create a temp path and copy the original episodes.avro to the path? So that we don't need to have two duplicated resource file.

Copy link
Member Author

@MaxGekk MaxGekk Jul 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it just introduce unnesseccary dependency here and overcomplicate the test? I can create small (with just one row) avro file without .avro extension especially for the test if you don't mind.

val df1 = spark.read.avro(episodesWithoutExtension)
assert(df1.count == 8)

val schema = new StructType()
.add("title", StringType)
.add("air_date", StringType)
.add("doctor", IntegerType)
val df2 = spark.read.schema(schema).avro(episodesWithoutExtension)
assert(df2.count == 8)
}
}