Skip to content

Commit a249fcb

Browse files
jlfleejaywei
authored andcommitted
AL-4757 when refresh InMemoryFileIndex, can recursiveDirChildrenFiles (apache#566)
* AL-4757 when refresh InMemoryFileIndex, if recursiveFileLookup is true, use recursiveDirChildrenFiles * AL-4757 add UT
1 parent 78adf7c commit a249fcb

File tree

3 files changed

+65
-1
lines changed

3 files changed

+65
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,17 @@ object SQLConf {
14841484
.booleanConf
14851485
.createWithDefault(false)
14861486

1487+
val RECURSIVE_FILE_LOOKUP =
1488+
buildConf("spark.sql.sources.recursiveFileLookup")
1489+
.doc("If dynamic partitioning is used to write the output of UNION or UNION ALL " +
1490+
"queries into table files with hive.execution.engine=tez, HIVE-17275 add a sub dir " +
1491+
"in table or partition location. If true, Spark will fetch all files on each location" +
1492+
" dir and sub dir. Default value is false, Spark just fetch files on each location dir")
1493+
.version("3.2.0")
1494+
.internal()
1495+
.booleanConf
1496+
.createWithDefault(false)
1497+
14871498
// Whether to automatically resolve ambiguity in join conditions for self-joins.
14881499
// See SPARK-6231.
14891500
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -4542,6 +4553,8 @@ class SQLConf extends Serializable with Logging {
45424553

45434554
def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY)
45444555

4556+
def recursiveFileLookup: Boolean = getConf(SQLConf.RECURSIVE_FILE_LOOKUP)
4557+
45454558
def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
45464559

45474560
def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,29 @@ class InMemoryFileIndex(
9494
val files = listLeafFiles(rootPaths)
9595
cachedLeafFiles =
9696
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
97-
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
97+
cachedLeafDirToChildrenFiles = if (sparkSession.sessionState.conf.recursiveFileLookup) {
98+
recursiveDirChildrenFiles(files)
99+
} else {
100+
files.toArray.groupBy(_.getPath.getParent)
101+
}
98102
cachedPartitionSpec = null
99103
}
100104

105+
def recursiveDirChildrenFiles(files: mutable.LinkedHashSet[FileStatus])
106+
: Map[Path, Array[FileStatus]] = {
107+
// rootPaths is table / partition location
108+
val rootParents = rootPaths.map(_.getParent).toSet
109+
val reorganized = new mutable.LinkedHashMap[Path, mutable.LinkedHashSet[FileStatus]]()
110+
files.foreach { f => // f is a file, not a directory.
111+
var parent = f.getPath.getParent
112+
while (parent != null && !rootParents.contains(parent)) {
113+
reorganized.getOrElseUpdate(parent, new mutable.LinkedHashSet[FileStatus]()).add(f)
114+
parent = parent.getParent
115+
}
116+
}
117+
reorganized.mapValues(_.toArray).toMap
118+
}
119+
101120
override def equals(other: Any): Boolean = other match {
102121
case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
103122
case _ => false

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,38 @@ class FileIndexSuite extends SharedSparkSession {
477477
}
478478
}
479479

480+
test("recursive dir children files") {
481+
withTempDir { dir =>
482+
val fs = new Path(dir.getPath).getFileSystem(
483+
spark.sessionState.newHadoopConfWithOptions(Map.empty))
484+
val allFiles = new mutable.LinkedHashSet[FileStatus]()
485+
val partitionDirectory = new File(dir, "a=foo")
486+
partitionDirectory.mkdir()
487+
for (i <- 1 to 8) {
488+
val file0 = new File(partitionDirectory, i + ".txt")
489+
stringToFile(file0, "text")
490+
allFiles.add(fs.getFileStatus(new Path(file0.getPath)))
491+
if (i % 2 == 0) {
492+
val subDirectory = new File(partitionDirectory, "i=" + i)
493+
subDirectory.mkdir()
494+
for (ii <- 1 to 8) {
495+
val file1 = new File(subDirectory, ii + ".txt")
496+
stringToFile(file1, "text")
497+
allFiles.add(fs.getFileStatus(new Path(file1.getPath)))
498+
}
499+
}
500+
}
501+
val path = fs.getFileStatus(new Path(partitionDirectory.getPath)).getPath
502+
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
503+
val files = fileIndex.listLeafFiles(Seq(path))
504+
val leafDirToChildrenFiles = fileIndex.recursiveDirChildrenFiles(files)
505+
assert(leafDirToChildrenFiles.size == 5)
506+
val actualFiles = leafDirToChildrenFiles(path)
507+
assert(allFiles.size == actualFiles.length)
508+
assert(actualFiles.forall(actualFiles.contains))
509+
}
510+
}
511+
480512
test("SPARK-31047 - Improve file listing for ViewFileSystem") {
481513
val path = mock(classOf[Path])
482514
val dfs = mock(classOf[ViewFileSystem])

0 commit comments

Comments
 (0)