diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index efae26636a4b..aa4cbfcef8f5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -23,6 +23,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.util.Utils; +import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -96,7 +97,7 @@ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); - CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + HashMap options = new HashMap<>(); for (Map.Entry entry : allConfs.entrySet()) { Matcher matcher = prefix.matcher(entry.getKey()); if (matcher.matches() && matcher.groupCount() > 0) { @@ -104,6 +105,6 @@ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf } } - return options; + return new CaseInsensitiveStringMap(options); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index 704d90ed60ad..da41346d7ce7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -18,8 +18,11 @@ package org.apache.spark.sql.util; import org.apache.spark.annotation.Experimental; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -35,16 +38,29 @@ */ @Experimental public class CaseInsensitiveStringMap implements Map { + private final Logger logger = LoggerFactory.getLogger(CaseInsensitiveStringMap.class); + + private String unsupportedOperationMsg = "CaseInsensitiveStringMap is read-only."; public static CaseInsensitiveStringMap empty() { return new CaseInsensitiveStringMap(new HashMap<>(0)); } + private final Map original; + private final Map delegate; public CaseInsensitiveStringMap(Map originalMap) { - this.delegate = new HashMap<>(originalMap.size()); - putAll(originalMap); + original = new HashMap<>(originalMap); + delegate = new HashMap<>(originalMap.size()); + for (Map.Entry entry : originalMap.entrySet()) { + String key = toLowerCase(entry.getKey()); + if (delegate.containsKey(key)) { + logger.warn("Converting duplicated key " + entry.getKey() + + " into CaseInsensitiveStringMap."); + } + delegate.put(key, entry.getValue()); + } } @Override @@ -78,24 +94,22 @@ public String get(Object key) { @Override public String put(String key, String value) { - return delegate.put(toLowerCase(key), value); + throw new UnsupportedOperationException(unsupportedOperationMsg); } @Override public String remove(Object key) { - return delegate.remove(toLowerCase(key)); + throw new UnsupportedOperationException(unsupportedOperationMsg); } @Override public void putAll(Map m) { - for (Map.Entry entry : m.entrySet()) { - put(entry.getKey(), entry.getValue()); - } + throw new UnsupportedOperationException(unsupportedOperationMsg); } @Override public void clear() { - delegate.clear(); + throw new UnsupportedOperationException(unsupportedOperationMsg); } @Override @@ -157,4 +171,11 @@ public double getDouble(String key, double defaultValue) { String value = get(key); return value == null ? defaultValue : Double.parseDouble(value); } + + /** + * Returns the original case-sensitive map. + */ + public Map asCaseSensitiveMap() { + return Collections.unmodifiableMap(original); + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala index 623ddeb14025..0accb471cada 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.util +import java.util + import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite @@ -25,9 +27,16 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite { test("put and get") { val options = CaseInsensitiveStringMap.empty() - options.put("kEy", "valUE") - assert(options.get("key") == "valUE") - assert(options.get("KEY") == "valUE") + intercept[UnsupportedOperationException] { + options.put("kEy", "valUE") + } + } + + test("clear") { + val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava) + intercept[UnsupportedOperationException] { + options.clear() + } } test("key and value set") { @@ -80,4 +89,20 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite { options.getDouble("foo", 0.1d) } } + + test("asCaseSensitiveMap") { + val originalMap = new util.HashMap[String, String] { + put("Foo", "Bar") + put("OFO", "ABR") + put("OoF", "bar") + } + + val options = new CaseInsensitiveStringMap(originalMap) + val caseSensitiveMap = options.asCaseSensitiveMap + assert(caseSensitiveMap.equals(originalMap)) + // The result of `asCaseSensitiveMap` is read-only. + intercept[UnsupportedOperationException] { + caseSensitiveMap.put("kEy", "valUE") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 9423fe95fb97..68667faa9bb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -35,13 +35,14 @@ abstract class FileTable( extends Table with SupportsBatchRead with SupportsBatchWrite { lazy val fileIndex: PartitioningAwareFileIndex = { - val scalaMap = options.asScala.toMap - val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(scalaMap) + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, checkEmptyGlobPath = true, checkFilesExist = true) val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) new InMemoryFileIndex( - sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache) + sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache) } lazy val dataSchema: StructType = userSpecifiedSchema.orElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index 0d07f5a02c25..bb4a428e4066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -64,16 +64,16 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St val sparkSession = SparkSession.active validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis) val path = new Path(paths.head) - val optionsAsScala = options.asScala.toMap - - val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala) + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val job = getJobInstance(hadoopConf, path) val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, outputPath = paths.head) lazy val description = - createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, optionsAsScala) + createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap) val fs = path.getFileSystem(hadoopConf) mode match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 4767f21bb029..8ac56aa5f64b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -37,7 +37,11 @@ case class OrcScanBuilder( dataSchema: StructType, options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) with SupportsPushDownFilters { - lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap) + lazy val hadoopConf = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + } override def build(): Scan = { OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, options) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala index bc5a30e97fae..e1d02540a745 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File +import org.apache.hadoop.fs.{Path, PathFilter} + import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.internal.SQLConf @@ -30,6 +32,10 @@ case class OrcParData(intField: Int, stringField: String) // The data that also includes the partitioning key case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) +class TestFileFilter extends PathFilter { + override def accept(path: Path): Boolean = path.getParent.getName != "p=2" +} + abstract class OrcPartitionDiscoveryTest extends OrcTest { val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" @@ -226,6 +232,23 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { } } } + + test("SPARK-27162: handle pathfilter configuration correctly") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df = spark.range(2) + df.write.orc(path + "/p=1") + df.write.orc(path + "/p=2") + assert(spark.read.orc(path).count() === 4) + + val extraOptions = Map( + "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName, + "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName + ) + assert(spark.read.options(extraOptions).orc(path).count() === 2) + } + } } class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext