From ebd1431a5ae3ddaedcf9c1b6d02c02d4ace9987e Mon Sep 17 00:00:00 2001 From: rrusso2007 Date: Sat, 25 May 2019 15:49:30 -0700 Subject: [PATCH] [SPARK-27801][SQL] Improve performance of InMemoryFileIndex.listLeafFiles for HDFS directories with many files ## What changes were proposed in this pull request? InMemoryFileIndex.listLeafFiles should use listLocatedStatus for DistributedFileSystem. DistributedFileSystem overrides the listLocatedStatus method in order to do it with 1 single namenode call thus saving thousands of calls to getBlockLocations. Currently in InMemoryFileIndex, all directory listings are done using FileSystem.listStatus following by individual calls to FileSystem.getFileBlockLocations. This is painstakingly slow for folders that have large numbers of files because this process happens serially and parallelism is only applied at the folder level, not the file level. FileSystem also provides another API listLocatedStatus which returns the LocatedFileStatus objects that already have the block locations. In FileSystem main class this just delegates to listStatus and getFileBlockLocations similarly to the way Spark does it. However when HDFS specifically is the backing file system, DistributedFileSystem overrides this method and simply makes one single call to the namenode to retrieve the directory listing with the block locations. This avoids potentially thousands or more calls to namenode and also is more consistent because files will either exist with locations or not exist instead of having the FileNotFoundException exception case. For our example directory with 6500 files, the load time of spark.read.parquet was reduced 96x from 76 seconds to .8 seconds. This savings only goes up with the number of files in the directory. In the pull request instead of using this method always which could lead to a FileNotFoundException that could be tough to decipher in the default FileSystem implementation, this method is only used when the FileSystem is a DistributedFileSystem and otherwise the old logic still applies. ## How was this patch tested? test suite ran Closes #24672 from rrusso2007/master. Authored-by: rrusso2007 Signed-off-by: Dongjoon Hyun --- .../datasources/InMemoryFileIndex.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index db55a06c10822..45cf924266257 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -23,6 +23,7 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ +import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.SparkContext @@ -274,7 +275,21 @@ object InMemoryFileIndex extends Logging { // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { + val statuses: Array[FileStatus] = try { + fs match { + // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode + // to retrieve the file status with the file block location. The reason to still fallback + // to listStatus is because the default implementation would potentially throw a + // FileNotFoundException which is better handled by doing the lookups manually below. + case _: DistributedFileSystem => + val remoteIter = fs.listLocatedStatus(path) + new Iterator[LocatedFileStatus]() { + def next(): LocatedFileStatus = remoteIter.next + def hasNext(): Boolean = remoteIter.hasNext + }.toArray + case _ => fs.listStatus(path) + } + } catch { case _: FileNotFoundException => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus]