Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,17 @@ object SQLConf {
.intConf
.createWithDefault(10000)

val IGNORE_DATA_LOCALITY =
buildConf("spark.sql.sources.ignore.datalocality")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf naming looks a little weird... Compared with the other SQL Confs, this should be renamed to spark.sql.sources.ignoreDataLocality.enabled cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, please be careful about the namespace created in the new config names. ignore is definitely not a good namespace.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangshisan Could you submit a follow-up PR to rename it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry you are right, i should have paid more attention to this. I have opened a pr to fix the naming: #26056

.doc("If true, Spark will not fetch the block locations for each file on " +
"listing files. This speeds up file listing, but the scheduler cannot " +
"schedule tasks to take advantage of data locality. It can be particularly " +
"useful if data is read from a remote cluster so the scheduler could never " +
"take advantage of locality anyway.")
.internal()
.booleanConf
.createWithDefault(false)

// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
Expand Down Expand Up @@ -2475,6 +2486,8 @@ class SQLConf extends Serializable with Logging {

def defaultV2Catalog: Option[String] = getConf(DEFAULT_V2_CATALOG)

def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ object InMemoryFileIndex extends Logging {
areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {

val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles
val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
Expand All @@ -181,6 +182,7 @@ object InMemoryFileIndex extends Logging {
filter,
Some(sparkSession),
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = areRootPaths)
(path, leafFiles)
}
Expand Down Expand Up @@ -221,6 +223,7 @@ object InMemoryFileIndex extends Logging {
filter,
None,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = areRootPaths)
(path, leafFiles)
}.iterator
Expand Down Expand Up @@ -287,6 +290,7 @@ object InMemoryFileIndex extends Logging {
filter: PathFilter,
sessionOpt: Option[SparkSession],
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
isRootPath: Boolean): Seq[FileStatus] = {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
Expand All @@ -299,7 +303,7 @@ object InMemoryFileIndex extends Logging {
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
// FileNotFoundException which is better handled by doing the lookups manually below.
case _: DistributedFileSystem =>
case _: DistributedFileSystem if !ignoreLocality =>
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
Expand Down Expand Up @@ -353,6 +357,7 @@ object InMemoryFileIndex extends Logging {
filter,
sessionOpt,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = false)
}
}
Expand All @@ -376,7 +381,7 @@ object InMemoryFileIndex extends Logging {
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `bulkListLeafFiles` when the number of
// paths exceeds threshold.
case f =>
case f if !ignoreLocality =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
Expand All @@ -400,6 +405,8 @@ object InMemoryFileIndex extends Logging {
missingFiles += f.getPath.toString
None
}

case f => Some(f)
}

if (missingFiles.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,35 @@ class FileIndexSuite extends SharedSparkSession {
}
}

test("Add an option to ignore block locations when listing file") {
withTempDir { dir =>
val partitionDirectory = new File(dir, "a=foo")
partitionDirectory.mkdir()
for (i <- 1 to 8) {
val file = new File(partitionDirectory, i + ".txt")
stringToFile(file, "text")
}
val path = new Path(dir.getCanonicalPath)
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
withSQLConf(SQLConf.IGNORE_DATA_LOCALITY.key -> "false",
"fs.file.impl" -> classOf[SpecialBlockLocationFileSystem].getName) {
val withBlockLocations = fileIndex.
listLeafFiles(Seq(new Path(partitionDirectory.getPath)))

withSQLConf(SQLConf.IGNORE_DATA_LOCALITY.key -> "true") {
val withoutBlockLocations = fileIndex.
listLeafFiles(Seq(new Path(partitionDirectory.getPath)))

assert(withBlockLocations.size == withoutBlockLocations.size)
assert(withBlockLocations.forall(b => b.isInstanceOf[LocatedFileStatus] &&
b.asInstanceOf[LocatedFileStatus].getBlockLocations.nonEmpty))
assert(withoutBlockLocations.forall(b => b.isInstanceOf[FileStatus] &&
!b.isInstanceOf[LocatedFileStatus]))
assert(withoutBlockLocations.forall(withBlockLocations.contains))
}
}
}
}
}

object DeletionRaceFileSystem {
Expand Down