Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,16 @@ public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap(new HashMap<>(0));
}

private final Map<String, String> original;

private final Map<String, String> delegate;

public CaseInsensitiveStringMap(Map<String, String> originalMap) {
this.delegate = new HashMap<>(originalMap.size());
putAll(originalMap);
original = new HashMap<>(originalMap);
delegate = new HashMap<>(originalMap.size());
for (Map.Entry<? extends String, ? extends String> entry : originalMap.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the ? extends String required? Can we just use String?

delegate.put(toLowerCase(entry.getKey()), entry.getValue());
}
}

@Override
Expand Down Expand Up @@ -78,24 +83,22 @@ public String get(Object key) {

@Override
public String put(String key, String value) {
return delegate.put(toLowerCase(key), value);
throw new UnsupportedOperationException();
}

@Override
public String remove(Object key) {
return delegate.remove(toLowerCase(key));
throw new UnsupportedOperationException();
}

@Override
public void putAll(Map<? extends String, ? extends String> m) {
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
throw new UnsupportedOperationException();
}

@Override
public void clear() {
delegate.clear();
throw new UnsupportedOperationException();
}

@Override
Expand Down Expand Up @@ -157,4 +160,11 @@ public double getDouble(String key, double defaultValue) {
String value = get(key);
return value == null ? defaultValue : Double.parseDouble(value);
}

/**
* Returns the original case-sensitive map.
*/
public Map<String, String> asCaseSensitiveMap() {
return original;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return a read-only version of original. You can use Collections.unmodifiableMap.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.util

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
Expand All @@ -25,9 +27,16 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {

test("put and get") {
val options = CaseInsensitiveStringMap.empty()
options.put("kEy", "valUE")
assert(options.get("key") == "valUE")
assert(options.get("KEY") == "valUE")
intercept[UnsupportedOperationException] {
options.put("kEy", "valUE")
}
}

test("clear") {
val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
intercept[UnsupportedOperationException] {
options.clear()
}
}

test("key and value set") {
Expand Down Expand Up @@ -80,4 +89,15 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {
options.getDouble("foo", 0.1d)
}
}

test("asCaseSensitiveMap") {
val originalMap = new util.HashMap[String, String] {
put("Foo", "Bar")
put("OFO", "ABR")
put("OoF", "bar")
}

val options = new CaseInsensitiveStringMap(originalMap)
assert(options.asCaseSensitiveMap.equals(originalMap))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ abstract class FileTable(
extends Table with SupportsBatchRead with SupportsBatchWrite {

lazy val fileIndex: PartitioningAwareFileIndex = {
val scalaMap = options.asScala.toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(scalaMap)
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = true)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache)
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
}

lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St
val sparkSession = SparkSession.active
validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
val path = new Path(paths.head)
val optionsAsScala = options.asScala.toMap

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val job = getJobInstance(hadoopConf, path)
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = paths.head)
lazy val description =
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, optionsAsScala)
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap)

val fs = path.getFileSystem(hadoopConf)
mode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ case class OrcScanBuilder(
dataSchema: StructType,
options: CaseInsensitiveStringMap)
extends FileScanBuilder(schema) with SupportsPushDownFilters {
lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)
lazy val hadoopConf = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}

override def build(): Scan = {
OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.internal

import java.io.File

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

Expand All @@ -32,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener}
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, ExecutionListenerManager, QueryExecutionListener}

/**
* A class that holds all session-specific state in a given [[SparkSession]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.orc

import java.io.File

import org.apache.hadoop.fs.{Path, PathFilter}

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -30,6 +32,10 @@ case class OrcParData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String)

class TestFileFilter extends PathFilter {
override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
}

abstract class OrcPartitionDiscoveryTest extends OrcTest {
val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__"

Expand Down Expand Up @@ -226,6 +232,23 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
}
}
}

test("SPARK-27162: handle pathfilter configuration correctly") {
withTempPath { dir =>
val path = dir.getCanonicalPath

val df = spark.range(2)
df.write.orc(path + "/p=1")
df.write.orc(path + "/p=2")
assert(spark.read.orc(path).count() === 4)

val extraOptions = Map(
"mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
"mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
)
assert(spark.read.options(extraOptions).orc(path).count() === 2)
}
}
}

class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext
Expand Down