diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0ec661fc16c88..34534dd14b3f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -832,6 +832,17 @@ object SQLConf { .intConf .createWithDefault(10000) + val IGNORE_DATA_LOCALITY = + buildConf("spark.sql.sources.ignore.datalocality") + .doc("If true, Spark will not fetch the block locations for each file on " + + "listing files. This speeds up file listing, but the scheduler cannot " + + "schedule tasks to take advantage of data locality. It can be particularly " + + "useful if data is read from a remote cluster so the scheduler could never " + + "take advantage of locality anyway.") + .internal() + .booleanConf + .createWithDefault(false) + // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = @@ -2475,6 +2486,8 @@ class SQLConf extends Serializable with Logging { def defaultV2Catalog: Option[String] = getConf(DEFAULT_V2_CATALOG) + def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index cf7a13050f66c..ed860f69d7466 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -171,6 +171,7 @@ object InMemoryFileIndex extends Logging { areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { @@ -181,6 +182,7 @@ object InMemoryFileIndex extends Logging { filter, Some(sparkSession), ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, isRootPath = areRootPaths) (path, leafFiles) } @@ -221,6 +223,7 @@ object InMemoryFileIndex extends Logging { filter, None, ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, isRootPath = areRootPaths) (path, leafFiles) }.iterator @@ -287,6 +290,7 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sessionOpt: Option[SparkSession], ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, isRootPath: Boolean): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -299,7 +303,7 @@ object InMemoryFileIndex extends Logging { // to retrieve the file status with the file block location. The reason to still fallback // to listStatus is because the default implementation would potentially throw a // FileNotFoundException which is better handled by doing the lookups manually below. - case _: DistributedFileSystem => + case _: DistributedFileSystem if !ignoreLocality => val remoteIter = fs.listLocatedStatus(path) new Iterator[LocatedFileStatus]() { def next(): LocatedFileStatus = remoteIter.next @@ -353,6 +357,7 @@ object InMemoryFileIndex extends Logging { filter, sessionOpt, ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, isRootPath = false) } } @@ -376,7 +381,7 @@ object InMemoryFileIndex extends Logging { // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not // be a big deal since we always use to `bulkListLeafFiles` when the number of // paths exceeds threshold. - case f => + case f if !ignoreLocality => // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). @@ -400,6 +405,8 @@ object InMemoryFileIndex extends Logging { missingFiles += f.getPath.toString None } + + case f => Some(f) } if (missingFiles.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 4b086e830e456..a7a2349a1dfb9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -416,6 +416,35 @@ class FileIndexSuite extends SharedSparkSession { } } + test("Add an option to ignore block locations when listing file") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + for (i <- 1 to 8) { + val file = new File(partitionDirectory, i + ".txt") + stringToFile(file, "text") + } + val path = new Path(dir.getCanonicalPath) + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) + withSQLConf(SQLConf.IGNORE_DATA_LOCALITY.key -> "false", + "fs.file.impl" -> classOf[SpecialBlockLocationFileSystem].getName) { + val withBlockLocations = fileIndex. + listLeafFiles(Seq(new Path(partitionDirectory.getPath))) + + withSQLConf(SQLConf.IGNORE_DATA_LOCALITY.key -> "true") { + val withoutBlockLocations = fileIndex. + listLeafFiles(Seq(new Path(partitionDirectory.getPath))) + + assert(withBlockLocations.size == withoutBlockLocations.size) + assert(withBlockLocations.forall(b => b.isInstanceOf[LocatedFileStatus] && + b.asInstanceOf[LocatedFileStatus].getBlockLocations.nonEmpty)) + assert(withoutBlockLocations.forall(b => b.isInstanceOf[FileStatus] && + !b.isInstanceOf[LocatedFileStatus])) + assert(withoutBlockLocations.forall(withBlockLocations.contains)) + } + } + } + } } object DeletionRaceFileSystem {