Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class CatalogFileIndex(
val timeNs = System.nanoTime() - startTime
new InMemoryFileIndex(sparkSession,
rootPathsSpecified = partitionSpec.partitions.map(_.path),
parameters = Map.empty,
parameters = table.storage.properties,
userSpecifiedSchema = Some(partitionSpec.partitionColumns),
fileStatusCache = fileStatusCache,
userSpecifiedPartitionSpec = Some(partitionSpec),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ abstract class PartitioningAwareFileIndex(
caseInsensitiveMap.getOrElse("recursiveFileLookup", "false").toBoolean
}

protected lazy val inferRecursivePartition: Boolean = {
caseInsensitiveMap.getOrElse("inferRecursivePartition", "false").toBoolean
}

override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
def isNonEmptyFile(f: FileStatus): Boolean = {
Expand Down Expand Up @@ -106,13 +110,36 @@ abstract class PartitioningAwareFileIndex(
PartitionDirectory(InternalRow.empty, allFiles()
.filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: Nil
} else {
if (recursiveFileLookup) {
throw new IllegalArgumentException(
"Datasource with partition do not allow recursive file loading.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added in #24830. cc @WeichenXu123 @cloud-fan @gengliangwang FYI

val prunePartitionPaths = prunePartitions(partitionFilters, partitionSpec())

val partitionDirToChildrenFiles = if (recursiveFileLookup) {
// For partitioned table, we need to group by partition path
// when recursiveFileLookup is true.

val partitionDirs = prunePartitionPaths.map(_.path).toSet

def matchedPartitionDir(path: Path): Option[Path] = {
if (partitionDirs.contains(path)) {
Some(path)
} else if (path.getParent != null) {
matchedPartitionDir(path.getParent)
} else {
None
}
}

leafDirToChildrenFiles.toSeq.flatMap {
case (path, statuses) => matchedPartitionDir(path).map((_, statuses))
}.groupBy(_._1).map {
case (path, pathToStatuses) => (path, pathToStatuses.flatMap(_._2).toArray)
}
} else {
leafDirToChildrenFiles
}
prunePartitions(partitionFilters, partitionSpec()).map {

prunePartitionPaths.map {
case PartitionPath(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
val files: Seq[FileStatus] = partitionDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) &&
Expand Down Expand Up @@ -169,7 +196,7 @@ abstract class PartitioningAwareFileIndex(
}

protected def inferPartitioning(): PartitionSpec = {
if (recursiveFileLookup) {
if (recursiveFileLookup && !inferRecursivePartition) {
PartitionSpec.emptySpec
} else {
// We use leaf dirs containing data files to discover the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,19 @@ object PartitioningUtils extends SQLConfHelper {

// We create pairs of (path -> path's partition value) here
// If the corresponding partition value is None, the pair will be skipped
val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
val pathsWithPartitionValues = (partitionValues, optDiscoveredBasePaths, paths).zipped.flatMap {
case (Some(partitionValue), optDiscoveredBasePath, path) =>
val partitionPath = optDiscoveredBasePath.map(basePath => {
var tmpPartitionPath = basePath
partitionValue.columnNames.zip(partitionValue.typedValues).foreach {
case (column, TypedPartValue(value, _)) => tmpPartitionPath =
new Path(tmpPartitionPath, s"$column=$value")
}
tmpPartitionPath
})
Some(partitionPath.getOrElse(path), partitionValue)
case _ => None
}.distinct

if (pathsWithPartitionValues.isEmpty) {
// This dataset is not partitioned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,111 @@ class FileIndexSuite extends SharedSparkSession {
}
}
}

test("SPARK-40600: Support recursiveFileLookup for partitioned datasource") {
def createPartitionDirWithSubDir(basePath: File,
partitionDirs: Seq[String], subDir: String,
expectedFileList: mutable.ListBuffer[String]): Unit = {
var partitionDirectory: File = basePath
partitionDirs.foreach { dir =>
partitionDirectory = new File(partitionDirectory, dir)
partitionDirectory.mkdir()
}
val file = new File(partitionDirectory, "text.txt")
stringToFile(file, "text")
expectedFileList.append(file.getCanonicalPath)

val subDirectory = new File(partitionDirectory, subDir)
subDirectory.mkdir()
val subFile = new File(subDirectory, "subtext.txt")
stringToFile(subFile, "text")
expectedFileList.append(subFile.getCanonicalPath)
}

def checkPartitions(fileIndex: InMemoryFileIndex,
expectedPartitions: Seq[(Seq[String], String)]): Unit = {
assertResult(expectedPartitions.sortBy(_._2)) {
fileIndex.partitionSpec().partitions
.map { p =>
val values = (0 until p.values.numFields).map(p.values.getString(_))
(values, new File(p.path.toUri).getCanonicalPath)
}.sortBy(_._2)
}
}

def checkListFiles(fileIndex: InMemoryFileIndex, expectedFiles: Seq[String]): Unit = {
assertResult(expectedFiles.sorted) {
fileIndex.listFiles(Nil, Nil).flatMap(_.files)
.map(f => new File(f.getPath.toUri).getCanonicalPath).sorted
}
}

// one partition path,
// dirs: dir/a=a1/text.text
// dir/a=a1/subdir01/subtext.text
// if inferRecursivePartition is false, expected partitions: ()
// if inferRecursivePartition is true, expected partitions: (a=a1, dir/a=a1)
withTempDir { dir =>
val expectedFileList = mutable.ListBuffer[String]()
createPartitionDirWithSubDir(dir, Seq("a=a1"), "subdir01", expectedFileList)

val fileIndex = new InMemoryFileIndex(spark,
Seq(new Path(dir.getCanonicalPath)), Map("recursiveFileLookup" -> "false"), None)
checkPartitions(fileIndex, Seq((Seq("a1"), new File(dir, "a=a1").getCanonicalPath)))
checkListFiles(fileIndex, expectedFileList.filter(!_.contains("subdir")).toSeq)

val recursiveFileIndex1 = new InMemoryFileIndex(spark,
Seq(new Path(dir.getCanonicalPath)), Map("recursiveFileLookup" -> "true"), None)
checkPartitions(recursiveFileIndex1, Seq.empty)
checkListFiles(recursiveFileIndex1, expectedFileList.toSeq)

val recursiveFileIndex2 = new InMemoryFileIndex(spark,
Seq(new Path(dir.getCanonicalPath)),
Map("recursiveFileLookup" -> "true", "inferRecursivePartition" -> "true"),
None)
checkPartitions(recursiveFileIndex2, Seq((Seq("a1"), new File(dir, "a=a1").getCanonicalPath)))
checkListFiles(recursiveFileIndex2, expectedFileList.toSeq)
}

// two partition path,
// dirs: dir/a=a2/b=b1/text.text
// dir/a=a2/b=b1/subdir01/subtext.text
// dir/a=a2/b=b2/text.text
// dir/a=a2/b=b2/subdir02/subtext.text
// if inferRecursivePartition is false, expected partitions:
// ()
// if inferRecursivePartition is true, expected partitions:
// (a=a2/b=b1, dir/a=a2/b=b1)
// (a=a2/b=b2, dir/a=a2/b=b2)
withTempDir { dir =>
val expectedFileList = mutable.ListBuffer[String]()
createPartitionDirWithSubDir(dir, Seq("a=a2", "b=b1"), "subdir01", expectedFileList)
createPartitionDirWithSubDir(dir, Seq("a=a2", "b=b2"), "subdir02", expectedFileList)

val fileIndex = new InMemoryFileIndex(spark,
Seq(new Path(dir.getCanonicalPath)), Map("recursiveFileLookup" -> "false"), None)
checkPartitions(fileIndex, Seq(
(Seq("a2", "b1"), new File(new File(dir, "a=a2"), "b=b1").getCanonicalPath),
(Seq("a2", "b2"), new File(new File(dir, "a=a2"), "b=b2").getCanonicalPath)
))
checkListFiles(fileIndex, expectedFileList.filter(!_.contains("subdir")).toSeq)

val recursiveFileIndex1 = new InMemoryFileIndex(spark,
Seq(new Path(dir.getCanonicalPath)), Map("recursiveFileLookup" -> "true"), None)
checkPartitions(recursiveFileIndex1, Seq.empty)
checkListFiles(recursiveFileIndex1, expectedFileList.toSeq)

val recursiveFileIndex2 = new InMemoryFileIndex(spark,
Seq(new Path(dir.getCanonicalPath)),
Map("recursiveFileLookup" -> "true", "inferRecursivePartition" -> "true"),
None)
checkPartitions(recursiveFileIndex2, Seq(
(Seq("a2", "b1"), new File(new File(dir, "a=a2"), "b=b1").getCanonicalPath),
(Seq("a2", "b2"), new File(new File(dir, "a=a2"), "b=b2").getCanonicalPath)
))
checkListFiles(recursiveFileIndex2, expectedFileList.toSeq)
}
}
}

object DeletionRaceFileSystem {
Expand Down