diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 727b33018fbc..202bdfd421b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -83,7 +83,7 @@ class CatalogFileIndex( val timeNs = System.nanoTime() - startTime new InMemoryFileIndex(sparkSession, rootPathsSpecified = partitionSpec.partitions.map(_.path), - parameters = Map.empty, + parameters = table.storage.properties, userSpecifiedSchema = Some(partitionSpec.partitionColumns), fileStatusCache = fileStatusCache, userSpecifiedPartitionSpec = Some(partitionSpec), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index d70c4b11bc0d..36fd5638b399 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -67,6 +67,10 @@ abstract class PartitioningAwareFileIndex( caseInsensitiveMap.getOrElse("recursiveFileLookup", "false").toBoolean } + protected lazy val inferRecursivePartition: Boolean = { + caseInsensitiveMap.getOrElse("inferRecursivePartition", "false").toBoolean + } + override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { @@ -106,13 +110,36 @@ abstract class PartitioningAwareFileIndex( PartitionDirectory(InternalRow.empty, allFiles() .filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: Nil } else { - if (recursiveFileLookup) { - throw new IllegalArgumentException( - "Datasource with partition do not allow recursive file loading.") + val prunePartitionPaths = prunePartitions(partitionFilters, partitionSpec()) + + val partitionDirToChildrenFiles = if (recursiveFileLookup) { + // For partitioned table, we need to group by partition path + // when recursiveFileLookup is true. + + val partitionDirs = prunePartitionPaths.map(_.path).toSet + + def matchedPartitionDir(path: Path): Option[Path] = { + if (partitionDirs.contains(path)) { + Some(path) + } else if (path.getParent != null) { + matchedPartitionDir(path.getParent) + } else { + None + } + } + + leafDirToChildrenFiles.toSeq.flatMap { + case (path, statuses) => matchedPartitionDir(path).map((_, statuses)) + }.groupBy(_._1).map { + case (path, pathToStatuses) => (path, pathToStatuses.flatMap(_._2).toArray) + } + } else { + leafDirToChildrenFiles } - prunePartitions(partitionFilters, partitionSpec()).map { + + prunePartitionPaths.map { case PartitionPath(values, path) => - val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { + val files: Seq[FileStatus] = partitionDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) && @@ -169,7 +196,7 @@ abstract class PartitioningAwareFileIndex( } protected def inferPartitioning(): PartitionSpec = { - if (recursiveFileLookup) { + if (recursiveFileLookup && !inferRecursivePartition) { PartitionSpec.emptySpec } else { // We use leaf dirs containing data files to discover the schema. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index d4989606927b..3ef96bf43e1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -148,7 +148,19 @@ object PartitioningUtils extends SQLConfHelper { // We create pairs of (path -> path's partition value) here // If the corresponding partition value is None, the pair will be skipped - val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) + val pathsWithPartitionValues = (partitionValues, optDiscoveredBasePaths, paths).zipped.flatMap { + case (Some(partitionValue), optDiscoveredBasePath, path) => + val partitionPath = optDiscoveredBasePath.map(basePath => { + var tmpPartitionPath = basePath + partitionValue.columnNames.zip(partitionValue.typedValues).foreach { + case (column, TypedPartValue(value, _)) => tmpPartitionPath = + new Path(tmpPartitionPath, s"$column=$value") + } + tmpPartitionPath + }) + Some(partitionPath.getOrElse(path), partitionValue) + case _ => None + }.distinct if (pathsWithPartitionValues.isEmpty) { // This dataset is not partitioned. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 1897a347ef17..fa09ee3b83be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -539,6 +539,111 @@ class FileIndexSuite extends SharedSparkSession { } } } + + test("SPARK-40600: Support recursiveFileLookup for partitioned datasource") { + def createPartitionDirWithSubDir(basePath: File, + partitionDirs: Seq[String], subDir: String, + expectedFileList: mutable.ListBuffer[String]): Unit = { + var partitionDirectory: File = basePath + partitionDirs.foreach { dir => + partitionDirectory = new File(partitionDirectory, dir) + partitionDirectory.mkdir() + } + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + expectedFileList.append(file.getCanonicalPath) + + val subDirectory = new File(partitionDirectory, subDir) + subDirectory.mkdir() + val subFile = new File(subDirectory, "subtext.txt") + stringToFile(subFile, "text") + expectedFileList.append(subFile.getCanonicalPath) + } + + def checkPartitions(fileIndex: InMemoryFileIndex, + expectedPartitions: Seq[(Seq[String], String)]): Unit = { + assertResult(expectedPartitions.sortBy(_._2)) { + fileIndex.partitionSpec().partitions + .map { p => + val values = (0 until p.values.numFields).map(p.values.getString(_)) + (values, new File(p.path.toUri).getCanonicalPath) + }.sortBy(_._2) + } + } + + def checkListFiles(fileIndex: InMemoryFileIndex, expectedFiles: Seq[String]): Unit = { + assertResult(expectedFiles.sorted) { + fileIndex.listFiles(Nil, Nil).flatMap(_.files) + .map(f => new File(f.getPath.toUri).getCanonicalPath).sorted + } + } + + // one partition path, + // dirs: dir/a=a1/text.text + // dir/a=a1/subdir01/subtext.text + // if inferRecursivePartition is false, expected partitions: () + // if inferRecursivePartition is true, expected partitions: (a=a1, dir/a=a1) + withTempDir { dir => + val expectedFileList = mutable.ListBuffer[String]() + createPartitionDirWithSubDir(dir, Seq("a=a1"), "subdir01", expectedFileList) + + val fileIndex = new InMemoryFileIndex(spark, + Seq(new Path(dir.getCanonicalPath)), Map("recursiveFileLookup" -> "false"), None) + checkPartitions(fileIndex, Seq((Seq("a1"), new File(dir, "a=a1").getCanonicalPath))) + checkListFiles(fileIndex, expectedFileList.filter(!_.contains("subdir")).toSeq) + + val recursiveFileIndex1 = new InMemoryFileIndex(spark, + Seq(new Path(dir.getCanonicalPath)), Map("recursiveFileLookup" -> "true"), None) + checkPartitions(recursiveFileIndex1, Seq.empty) + checkListFiles(recursiveFileIndex1, expectedFileList.toSeq) + + val recursiveFileIndex2 = new InMemoryFileIndex(spark, + Seq(new Path(dir.getCanonicalPath)), + Map("recursiveFileLookup" -> "true", "inferRecursivePartition" -> "true"), + None) + checkPartitions(recursiveFileIndex2, Seq((Seq("a1"), new File(dir, "a=a1").getCanonicalPath))) + checkListFiles(recursiveFileIndex2, expectedFileList.toSeq) + } + + // two partition path, + // dirs: dir/a=a2/b=b1/text.text + // dir/a=a2/b=b1/subdir01/subtext.text + // dir/a=a2/b=b2/text.text + // dir/a=a2/b=b2/subdir02/subtext.text + // if inferRecursivePartition is false, expected partitions: + // () + // if inferRecursivePartition is true, expected partitions: + // (a=a2/b=b1, dir/a=a2/b=b1) + // (a=a2/b=b2, dir/a=a2/b=b2) + withTempDir { dir => + val expectedFileList = mutable.ListBuffer[String]() + createPartitionDirWithSubDir(dir, Seq("a=a2", "b=b1"), "subdir01", expectedFileList) + createPartitionDirWithSubDir(dir, Seq("a=a2", "b=b2"), "subdir02", expectedFileList) + + val fileIndex = new InMemoryFileIndex(spark, + Seq(new Path(dir.getCanonicalPath)), Map("recursiveFileLookup" -> "false"), None) + checkPartitions(fileIndex, Seq( + (Seq("a2", "b1"), new File(new File(dir, "a=a2"), "b=b1").getCanonicalPath), + (Seq("a2", "b2"), new File(new File(dir, "a=a2"), "b=b2").getCanonicalPath) + )) + checkListFiles(fileIndex, expectedFileList.filter(!_.contains("subdir")).toSeq) + + val recursiveFileIndex1 = new InMemoryFileIndex(spark, + Seq(new Path(dir.getCanonicalPath)), Map("recursiveFileLookup" -> "true"), None) + checkPartitions(recursiveFileIndex1, Seq.empty) + checkListFiles(recursiveFileIndex1, expectedFileList.toSeq) + + val recursiveFileIndex2 = new InMemoryFileIndex(spark, + Seq(new Path(dir.getCanonicalPath)), + Map("recursiveFileLookup" -> "true", "inferRecursivePartition" -> "true"), + None) + checkPartitions(recursiveFileIndex2, Seq( + (Seq("a2", "b1"), new File(new File(dir, "a=a2"), "b=b1").getCanonicalPath), + (Seq("a2", "b2"), new File(new File(dir, "a=a2"), "b=b2").getCanonicalPath) + )) + checkListFiles(recursiveFileIndex2, expectedFileList.toSeq) + } + } } object DeletionRaceFileSystem {