Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -274,7 +275,21 @@ object InMemoryFileIndex extends Logging {
// [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
val statuses: Array[FileStatus] = try {
fs match {
// DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
// FileNotFoundException which is better handled by doing the lookups manually below.
case _: DistributedFileSystem =>
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
def hasNext(): Boolean = remoteIter.hasNext
}.toArray
case _ => fs.listStatus(path)
}
} catch {
case _: FileNotFoundException =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
Expand Down