From 47e47a33ba43d4c6d3d386239364b7231c7a633b Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Mon, 20 Jul 2020 20:37:16 +0900 Subject: [PATCH] Options in PartitioningAwareFileIndex should respect case insensitivity --- .../datasources/PartitioningAwareFileIndex.scala | 11 +++++++---- .../spark/sql/FileBasedDataSourceSuite.scala | 16 +++++++++------- .../execution/datasources/FileIndexSuite.scala | 16 +++++++++------- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 2e09c729529a6..5341e22f5e670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -56,14 +56,17 @@ abstract class PartitioningAwareFileIndex( protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] - protected lazy val pathGlobFilter = parameters.get("pathGlobFilter").map(new GlobFilter(_)) + private val caseInsensitiveMap = CaseInsensitiveMap(parameters) + + protected lazy val pathGlobFilter: Option[GlobFilter] = + caseInsensitiveMap.get("pathGlobFilter").map(new GlobFilter(_)) protected def matchGlobPattern(file: FileStatus): Boolean = { pathGlobFilter.forall(_.accept(file.getPath)) } - protected lazy val recursiveFileLookup = { - parameters.getOrElse("recursiveFileLookup", "false").toBoolean + protected lazy val recursiveFileLookup: Boolean = { + caseInsensitiveMap.getOrElse("recursiveFileLookup", "false").toBoolean } override def listFiles( @@ -215,7 +218,7 @@ abstract class PartitioningAwareFileIndex( * and the returned DataFrame will have the column of `something`. */ private def basePaths: Set[Path] = { - parameters.get(BASE_PATH_PARAM).map(new Path(_)) match { + caseInsensitiveMap.get(BASE_PATH_PARAM).map(new Path(_)) match { case Some(userDefinedBasePath) => val fs = userDefinedBasePath.getFileSystem(hadoopConf) if (!fs.isDirectory(userDefinedBasePath)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 231a8f2aa7ddd..e9bff64d72fc3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -633,13 +633,15 @@ class FileBasedDataSourceSuite extends QueryTest assert(fileList.toSet === expectedFileList.toSet) - val fileList2 = spark.read.format("binaryFile") - .option("recursiveFileLookup", true) - .option("pathGlobFilter", "*.bin") - .load(dataPath) - .select("path").collect().map(_.getString(0)) - - assert(fileList2.toSet === expectedFileList.filter(_.endsWith(".bin")).toSet) + withClue("SPARK-32368: 'recursiveFileLookup' and 'pathGlobFilter' can be case insensitive") { + val fileList2 = spark.read.format("binaryFile") + .option("RecuRsivefileLookup", true) + .option("PaThglobFilter", "*.bin") + .load(dataPath) + .select("path").collect().map(_.getString(0)) + + assert(fileList2.toSet === expectedFileList.filter(_.endsWith(".bin")).toSet) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 040996276063b..02be8c9221704 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -367,13 +367,15 @@ class FileIndexSuite extends SharedSparkSession { val wrongBasePath = new File(dir, "unknown") // basePath must be a directory wrongBasePath.mkdir() - val parameters = Map("basePath" -> wrongBasePath.getCanonicalPath) - val fileIndex = new InMemoryFileIndex(spark, Seq(path), parameters, None) - val msg = intercept[IllegalArgumentException] { - // trigger inferPartitioning() - fileIndex.partitionSpec() - }.getMessage - assert(msg === s"Wrong basePath ${wrongBasePath.getCanonicalPath} for the root path: $path") + withClue("SPARK-32368: 'basePath' can be case insensitive") { + val parameters = Map("bAsepAtH" -> wrongBasePath.getCanonicalPath) + val fileIndex = new InMemoryFileIndex(spark, Seq(path), parameters, None) + val msg = intercept[IllegalArgumentException] { + // trigger inferPartitioning() + fileIndex.partitionSpec() + }.getMessage + assert(msg === s"Wrong basePath ${wrongBasePath.getCanonicalPath} for the root path: $path") + } } }