From 7d711eb1f8d2df8a1ee7426cc17d8a12bc4be7c3 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 14 Mar 2019 19:46:50 +0800 Subject: [PATCH 1/9] maintain original map in CaseInsensitiveStringMap --- .../sql/util/CaseInsensitiveStringMap.java | 13 +++++++++ .../util/CaseInsensitiveStringMapSuite.scala | 28 +++++++++++++++++++ .../execution/datasources/v2/FileTable.scala | 4 +-- .../datasources/v2/FileWriteBuilder.scala | 6 ++-- .../datasources/v2/orc/OrcScanBuilder.scala | 2 +- .../spark/sql/internal/SessionState.scala | 9 +++++- .../orc/OrcPartitionDiscoverySuite.scala | 23 +++++++++++++++ 7 files changed, 77 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index 704d90ed60ad..47030768d762 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -40,9 +40,12 @@ public static CaseInsensitiveStringMap empty() { return new CaseInsensitiveStringMap(new HashMap<>(0)); } + private final Map originalMap; + private final Map delegate; public CaseInsensitiveStringMap(Map originalMap) { + this.originalMap = new HashMap<>(originalMap); this.delegate = new HashMap<>(originalMap.size()); putAll(originalMap); } @@ -78,11 +81,13 @@ public String get(Object key) { @Override public String put(String key, String value) { + originalMap.put(key, value); return delegate.put(toLowerCase(key), value); } @Override public String remove(Object key) { + originalMap.remove(key); return delegate.remove(toLowerCase(key)); } @@ -95,6 +100,7 @@ public void putAll(Map m) { @Override public void clear() { + originalMap.clear(); delegate.clear(); } @@ -157,4 +163,11 @@ public double getDouble(String key, double defaultValue) { String value = get(key); return value == null ? defaultValue : Double.parseDouble(value); } + + /** + * Returns the original case-sensitive map. + */ + public Map getOriginalMap() { + return originalMap; + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala index 623ddeb14025..405be15959c6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.util +import java.util + import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite @@ -80,4 +82,30 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite { options.getDouble("foo", 0.1d) } } + + test("getOriginalMap") { + val originalMap = new util.HashMap[String, String] { + put("Foo", "Bar") + put("OFO", "ABR") + put("OoF", "bar") + } + + val options = new CaseInsensitiveStringMap(originalMap) + assert(options.getOriginalMap.equals(originalMap)) + + val key = "Key" + val value = "value" + originalMap.put(key, value) + options.put(key, value) + originalMap.put(key, value) + assert(options.getOriginalMap.equals(originalMap)) + + val removedKey = "OFO" + originalMap.remove(removedKey) + options.remove(removedKey) + assert(options.getOriginalMap.equals(originalMap)) + + options.clear() + assert(options.getOriginalMap.isEmpty) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 9423fe95fb97..b2514f4a6aa2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -35,11 +35,11 @@ abstract class FileTable( extends Table with SupportsBatchRead with SupportsBatchWrite { lazy val fileIndex: PartitioningAwareFileIndex = { - val scalaMap = options.asScala.toMap - val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(scalaMap) + val hadoopConf = sparkSession.sessionState.newHadoopConfWithCaseInsensitiveOptions(options) val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, checkEmptyGlobPath = true, checkFilesExist = true) val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + val scalaMap = options.getOriginalMap.asScala.toMap new InMemoryFileIndex( sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index 0d07f5a02c25..ccc9b2c88d00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -64,16 +64,14 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St val sparkSession = SparkSession.active validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis) val path = new Path(paths.head) - val optionsAsScala = options.asScala.toMap - - val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala) + val hadoopConf = sparkSession.sessionState.newHadoopConfWithCaseInsensitiveOptions(options) val job = getJobInstance(hadoopConf, path) val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, outputPath = paths.head) lazy val description = - createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, optionsAsScala) + createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap) val fs = path.getFileSystem(hadoopConf) mode match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 4767f21bb029..33bac336c558 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -37,7 +37,7 @@ case class OrcScanBuilder( dataSchema: StructType, options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) with SupportsPushDownFilters { - lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap) + lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithCaseInsensitiveOptions(options) override def build(): Scan = { OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index b34db581ca2c..47988326cfd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.internal import java.io.File +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -32,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.streaming.StreamingQueryManager -import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener} +import org.apache.spark.sql.util.{CaseInsensitiveStringMap, ExecutionListenerManager, QueryExecutionListener} /** * A class that holds all session-specific state in a given [[SparkSession]]. @@ -96,6 +98,11 @@ private[sql] class SessionState( hadoopConf } + def newHadoopConfWithCaseInsensitiveOptions(options: CaseInsensitiveStringMap): Configuration = { + // Hadoop configurations are case sensitive. + newHadoopConfWithOptions(options.getOriginalMap.asScala.toMap) + } + /** * Get an identical copy of the `SessionState` and associate it with the given `SparkSession` */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala index bc5a30e97fae..e1d02540a745 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File +import org.apache.hadoop.fs.{Path, PathFilter} + import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.internal.SQLConf @@ -30,6 +32,10 @@ case class OrcParData(intField: Int, stringField: String) // The data that also includes the partitioning key case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) +class TestFileFilter extends PathFilter { + override def accept(path: Path): Boolean = path.getParent.getName != "p=2" +} + abstract class OrcPartitionDiscoveryTest extends OrcTest { val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" @@ -226,6 +232,23 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { } } } + + test("SPARK-27162: handle pathfilter configuration correctly") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df = spark.range(2) + df.write.orc(path + "/p=1") + df.write.orc(path + "/p=2") + assert(spark.read.orc(path).count() === 4) + + val extraOptions = Map( + "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName, + "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName + ) + assert(spark.read.options(extraOptions).orc(path).count() === 2) + } + } } class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext From 66792a536cd83e950f060a099090945641c1165f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 15 Mar 2019 00:28:05 +0800 Subject: [PATCH 2/9] rename --- .../spark/sql/util/CaseInsensitiveStringMap.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index 47030768d762..cf10730b29ed 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -40,12 +40,12 @@ public static CaseInsensitiveStringMap empty() { return new CaseInsensitiveStringMap(new HashMap<>(0)); } - private final Map originalMap; + private final Map original; private final Map delegate; public CaseInsensitiveStringMap(Map originalMap) { - this.originalMap = new HashMap<>(originalMap); + this.original = new HashMap<>(originalMap); this.delegate = new HashMap<>(originalMap.size()); putAll(originalMap); } @@ -81,13 +81,13 @@ public String get(Object key) { @Override public String put(String key, String value) { - originalMap.put(key, value); + original.put(key, value); return delegate.put(toLowerCase(key), value); } @Override public String remove(Object key) { - originalMap.remove(key); + original.remove(key); return delegate.remove(toLowerCase(key)); } @@ -100,7 +100,7 @@ public void putAll(Map m) { @Override public void clear() { - originalMap.clear(); + original.clear(); delegate.clear(); } @@ -168,6 +168,6 @@ public double getDouble(String key, double defaultValue) { * Returns the original case-sensitive map. */ public Map getOriginalMap() { - return originalMap; + return original; } } From 33a15fd2966cd56f0c2068cc51ec1b8f40cda6bb Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 15 Mar 2019 17:51:52 +0800 Subject: [PATCH 3/9] address one comment --- .../org/apache/spark/sql/util/CaseInsensitiveStringMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index cf10730b29ed..d5544500657a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -45,7 +45,7 @@ public static CaseInsensitiveStringMap empty() { private final Map delegate; public CaseInsensitiveStringMap(Map originalMap) { - this.original = new HashMap<>(originalMap); + this.original = new HashMap<>(originalMap.size()); this.delegate = new HashMap<>(originalMap.size()); putAll(originalMap); } From f0f59e3ac368ac24b032af3b05ddf3cc9255051e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 18 Mar 2019 14:35:34 +0800 Subject: [PATCH 4/9] address comments --- .../sql/util/CaseInsensitiveStringMap.java | 23 ++++++------- .../util/CaseInsensitiveStringMapSuite.scala | 32 +++++++------------ .../execution/datasources/v2/FileTable.scala | 7 ++-- .../datasources/v2/FileWriteBuilder.scala | 4 ++- .../datasources/v2/orc/OrcScanBuilder.scala | 6 +++- .../spark/sql/internal/SessionState.scala | 5 --- 6 files changed, 34 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index d5544500657a..bd1d247ea0ac 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -45,9 +45,11 @@ public static CaseInsensitiveStringMap empty() { private final Map delegate; public CaseInsensitiveStringMap(Map originalMap) { - this.original = new HashMap<>(originalMap.size()); - this.delegate = new HashMap<>(originalMap.size()); - putAll(originalMap); + original = new HashMap<>(originalMap); + delegate = new HashMap<>(originalMap.size()); + for (Map.Entry entry : originalMap.entrySet()) { + delegate.put(toLowerCase(entry.getKey()), entry.getValue()); + } } @Override @@ -81,27 +83,22 @@ public String get(Object key) { @Override public String put(String key, String value) { - original.put(key, value); - return delegate.put(toLowerCase(key), value); + throw new UnsupportedOperationException(); } @Override public String remove(Object key) { - original.remove(key); - return delegate.remove(toLowerCase(key)); + throw new UnsupportedOperationException(); } @Override public void putAll(Map m) { - for (Map.Entry entry : m.entrySet()) { - put(entry.getKey(), entry.getValue()); - } + throw new UnsupportedOperationException(); } @Override public void clear() { - original.clear(); - delegate.clear(); + throw new UnsupportedOperationException(); } @Override @@ -167,7 +164,7 @@ public double getDouble(String key, double defaultValue) { /** * Returns the original case-sensitive map. */ - public Map getOriginalMap() { + public Map asCaseSensitiveMap() { return original; } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala index 405be15959c6..1d22910f837b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala @@ -27,9 +27,16 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite { test("put and get") { val options = CaseInsensitiveStringMap.empty() - options.put("kEy", "valUE") - assert(options.get("key") == "valUE") - assert(options.get("KEY") == "valUE") + intercept[UnsupportedOperationException] { + options.put("kEy", "valUE") + } + } + + test("clear") { + val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava) + intercept[UnsupportedOperationException] { + options.clear() + } } test("key and value set") { @@ -83,7 +90,7 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite { } } - test("getOriginalMap") { + test("asCaseSensitiveMap") { val originalMap = new util.HashMap[String, String] { put("Foo", "Bar") put("OFO", "ABR") @@ -91,21 +98,6 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite { } val options = new CaseInsensitiveStringMap(originalMap) - assert(options.getOriginalMap.equals(originalMap)) - - val key = "Key" - val value = "value" - originalMap.put(key, value) - options.put(key, value) - originalMap.put(key, value) - assert(options.getOriginalMap.equals(originalMap)) - - val removedKey = "OFO" - originalMap.remove(removedKey) - options.remove(removedKey) - assert(options.getOriginalMap.equals(originalMap)) - - options.clear() - assert(options.getOriginalMap.isEmpty) + assert(options.asCaseSensitiveMap.equals(originalMap)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index b2514f4a6aa2..68667faa9bb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -35,13 +35,14 @@ abstract class FileTable( extends Table with SupportsBatchRead with SupportsBatchWrite { lazy val fileIndex: PartitioningAwareFileIndex = { - val hadoopConf = sparkSession.sessionState.newHadoopConfWithCaseInsensitiveOptions(options) + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, checkEmptyGlobPath = true, checkFilesExist = true) val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - val scalaMap = options.getOriginalMap.asScala.toMap new InMemoryFileIndex( - sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache) + sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache) } lazy val dataSchema: StructType = userSpecifiedSchema.orElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index ccc9b2c88d00..bb4a428e4066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -64,7 +64,9 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St val sparkSession = SparkSession.active validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis) val path = new Path(paths.head) - val hadoopConf = sparkSession.sessionState.newHadoopConfWithCaseInsensitiveOptions(options) + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val job = getJobInstance(hadoopConf, path) val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 33bac336c558..8ac56aa5f64b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -37,7 +37,11 @@ case class OrcScanBuilder( dataSchema: StructType, options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) with SupportsPushDownFilters { - lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithCaseInsensitiveOptions(options) + lazy val hadoopConf = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + } override def build(): Scan = { OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 47988326cfd7..bd56b3065be0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -98,11 +98,6 @@ private[sql] class SessionState( hadoopConf } - def newHadoopConfWithCaseInsensitiveOptions(options: CaseInsensitiveStringMap): Configuration = { - // Hadoop configurations are case sensitive. - newHadoopConfWithOptions(options.getOriginalMap.asScala.toMap) - } - /** * Get an identical copy of the `SessionState` and associate it with the given `SparkSession` */ From e153f0351ec2a5196c15d038fd3a00f557a49fcd Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 18 Mar 2019 16:00:52 +0800 Subject: [PATCH 5/9] address comments --- .../sql/util/CaseInsensitiveStringMap.java | 22 ++++++++++++++----- .../spark/sql/internal/SessionState.scala | 2 +- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index bd1d247ea0ac..b56dbfd9f332 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -18,6 +18,8 @@ package org.apache.spark.sql.util; import org.apache.spark.annotation.Experimental; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; @@ -35,6 +37,9 @@ */ @Experimental public class CaseInsensitiveStringMap implements Map { + private final Logger logger = LoggerFactory.getLogger(CaseInsensitiveStringMap.class); + + private String unsupportedOperationMsg = "CaseInsensitiveStringMap is read-only."; public static CaseInsensitiveStringMap empty() { return new CaseInsensitiveStringMap(new HashMap<>(0)); @@ -47,8 +52,13 @@ public static CaseInsensitiveStringMap empty() { public CaseInsensitiveStringMap(Map originalMap) { original = new HashMap<>(originalMap); delegate = new HashMap<>(originalMap.size()); - for (Map.Entry entry : originalMap.entrySet()) { - delegate.put(toLowerCase(entry.getKey()), entry.getValue()); + for (Map.Entry entry : originalMap.entrySet()) { + String key = toLowerCase(entry.getKey()); + if (delegate.containsKey(key)) { + logger.warn("Converting duplicated key " + entry.getKey() + + " into CaseInsensitiveStringMap."); + } + delegate.put(key, entry.getValue()); } } @@ -83,22 +93,22 @@ public String get(Object key) { @Override public String put(String key, String value) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(unsupportedOperationMsg); } @Override public String remove(Object key) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(unsupportedOperationMsg); } @Override public void putAll(Map m) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(unsupportedOperationMsg); } @Override public void clear() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(unsupportedOperationMsg); } @Override diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index bd56b3065be0..ec29e1223670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.streaming.StreamingQueryManager -import org.apache.spark.sql.util.{CaseInsensitiveStringMap, ExecutionListenerManager, QueryExecutionListener} +import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener} /** * A class that holds all session-specific state in a given [[SparkSession]]. From d2522110d6d4f0a5ee58846ace53fa6685e91601 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 18 Mar 2019 18:02:25 +0800 Subject: [PATCH 6/9] revise --- .../main/scala/org/apache/spark/sql/internal/SessionState.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index ec29e1223670..b34db581ca2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.internal import java.io.File -import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path From 08ab550f2f01a6f6403a1da81f792b5295162023 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 19 Mar 2019 01:54:17 +0800 Subject: [PATCH 7/9] address comment --- .../apache/spark/sql/util/CaseInsensitiveStringMap.java | 8 ++------ .../spark/sql/util/CaseInsensitiveStringMapSuite.scala | 7 ++++++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index b56dbfd9f332..4a76cabe21ed 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -21,11 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * Case-insensitive map of string keys to string values. @@ -175,6 +171,6 @@ public double getDouble(String key, double defaultValue) { * Returns the original case-sensitive map. */ public Map asCaseSensitiveMap() { - return original; + return Collections.unmodifiableMap(original); } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala index 1d22910f837b..0accb471cada 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala @@ -98,6 +98,11 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite { } val options = new CaseInsensitiveStringMap(originalMap) - assert(options.asCaseSensitiveMap.equals(originalMap)) + val caseSensitiveMap = options.asCaseSensitiveMap + assert(caseSensitiveMap.equals(originalMap)) + // The result of `asCaseSensitiveMap` is read-only. + intercept[UnsupportedOperationException] { + caseSensitiveMap.put("kEy", "valUE") + } } } From 2245766922c8e7e5cff1940960b2e796608b57f1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 19 Mar 2019 03:16:54 +0800 Subject: [PATCH 8/9] fix test failure --- .../main/java/org/apache/spark/sql/catalog/v2/Catalogs.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java index efae26636a4b..aa4cbfcef8f5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -23,6 +23,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.util.Utils; +import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -96,7 +97,7 @@ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); - CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + HashMap options = new HashMap<>(); for (Map.Entry entry : allConfs.entrySet()) { Matcher matcher = prefix.matcher(entry.getKey()); if (matcher.matches() && matcher.groupCount() > 0) { @@ -104,6 +105,6 @@ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf } } - return options; + return new CaseInsensitiveStringMap(options); } } From 28e05f206bdfcd0e8477d54cf0f3584d6d94133d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 19 Mar 2019 04:20:52 +0800 Subject: [PATCH 9/9] avoid using wildcard --- .../apache/spark/sql/util/CaseInsensitiveStringMap.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index 4a76cabe21ed..da41346d7ce7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -21,7 +21,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; /** * Case-insensitive map of string keys to string values.