Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -96,14 +97,14 @@ private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");

CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
HashMap<String, String> options = new HashMap<>();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
options.put(matcher.group(1), entry.getValue());
}
}

return options;
return new CaseInsensitiveStringMap(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.spark.sql.util;

import org.apache.spark.annotation.Experimental;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
Expand All @@ -35,16 +38,29 @@
*/
@Experimental
public class CaseInsensitiveStringMap implements Map<String, String> {
private final Logger logger = LoggerFactory.getLogger(CaseInsensitiveStringMap.class);

private String unsupportedOperationMsg = "CaseInsensitiveStringMap is read-only.";

public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap(new HashMap<>(0));
}

private final Map<String, String> original;

private final Map<String, String> delegate;

public CaseInsensitiveStringMap(Map<String, String> originalMap) {
this.delegate = new HashMap<>(originalMap.size());
putAll(originalMap);
original = new HashMap<>(originalMap);
delegate = new HashMap<>(originalMap.size());
for (Map.Entry<String, String> entry : originalMap.entrySet()) {
String key = toLowerCase(entry.getKey());
if (delegate.containsKey(key)) {
logger.warn("Converting duplicated key " + entry.getKey() +
" into CaseInsensitiveStringMap.");
}
delegate.put(key, entry.getValue());
}
}

@Override
Expand Down Expand Up @@ -78,24 +94,22 @@ public String get(Object key) {

@Override
public String put(String key, String value) {
return delegate.put(toLowerCase(key), value);
throw new UnsupportedOperationException(unsupportedOperationMsg);
}

@Override
public String remove(Object key) {
return delegate.remove(toLowerCase(key));
throw new UnsupportedOperationException(unsupportedOperationMsg);
}

@Override
public void putAll(Map<? extends String, ? extends String> m) {
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
throw new UnsupportedOperationException(unsupportedOperationMsg);
}

@Override
public void clear() {
delegate.clear();
throw new UnsupportedOperationException(unsupportedOperationMsg);
}

@Override
Expand Down Expand Up @@ -157,4 +171,11 @@ public double getDouble(String key, double defaultValue) {
String value = get(key);
return value == null ? defaultValue : Double.parseDouble(value);
}

/**
* Returns the original case-sensitive map.
*/
public Map<String, String> asCaseSensitiveMap() {
return Collections.unmodifiableMap(original);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.util

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
Expand All @@ -25,9 +27,16 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {

test("put and get") {
val options = CaseInsensitiveStringMap.empty()
options.put("kEy", "valUE")
assert(options.get("key") == "valUE")
assert(options.get("KEY") == "valUE")
intercept[UnsupportedOperationException] {
options.put("kEy", "valUE")
}
}

test("clear") {
val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
intercept[UnsupportedOperationException] {
options.clear()
}
}

test("key and value set") {
Expand Down Expand Up @@ -80,4 +89,20 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {
options.getDouble("foo", 0.1d)
}
}

test("asCaseSensitiveMap") {
val originalMap = new util.HashMap[String, String] {
put("Foo", "Bar")
put("OFO", "ABR")
put("OoF", "bar")
}

val options = new CaseInsensitiveStringMap(originalMap)
val caseSensitiveMap = options.asCaseSensitiveMap
assert(caseSensitiveMap.equals(originalMap))
// The result of `asCaseSensitiveMap` is read-only.
intercept[UnsupportedOperationException] {
caseSensitiveMap.put("kEy", "valUE")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ abstract class FileTable(
extends Table with SupportsBatchRead with SupportsBatchWrite {

lazy val fileIndex: PartitioningAwareFileIndex = {
val scalaMap = options.asScala.toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(scalaMap)
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = true)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache)
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
}

lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St
val sparkSession = SparkSession.active
validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
val path = new Path(paths.head)
val optionsAsScala = options.asScala.toMap

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val job = getJobInstance(hadoopConf, path)
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = paths.head)
lazy val description =
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, optionsAsScala)
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap)

val fs = path.getFileSystem(hadoopConf)
mode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ case class OrcScanBuilder(
dataSchema: StructType,
options: CaseInsensitiveStringMap)
extends FileScanBuilder(schema) with SupportsPushDownFilters {
lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)
lazy val hadoopConf = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}

override def build(): Scan = {
OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.orc

import java.io.File

import org.apache.hadoop.fs.{Path, PathFilter}

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -30,6 +32,10 @@ case class OrcParData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String)

class TestFileFilter extends PathFilter {
override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
}

abstract class OrcPartitionDiscoveryTest extends OrcTest {
val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"

Expand Down Expand Up @@ -226,6 +232,23 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
}
}
}

test("SPARK-27162: handle pathfilter configuration correctly") {
withTempPath { dir =>
val path = dir.getCanonicalPath

val df = spark.range(2)
df.write.orc(path + "/p=1")
df.write.orc(path + "/p=2")
assert(spark.read.orc(path).count() === 4)

val extraOptions = Map(
"mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
"mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
)
assert(spark.read.options(extraOptions).orc(path).count() === 2)
}
}
}

class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext
Expand Down