Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -103,6 +104,9 @@ case class DataSource(
bucket.sortColumnNames, "in the sort definition", equality)
}

private def newHadoopConfiguration(): Configuration =
sparkSession.sessionState.newHadoopConfWithOptions(options)

/**
* Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
* it. In the read path, only managed tables by Hive provide the partition columns properly when
Expand Down Expand Up @@ -220,7 +224,7 @@ case class DataSource(
// once the streaming job starts and some upstream source starts dropping data.
val hdfsPath = new Path(path)
if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
}
Expand Down Expand Up @@ -331,7 +335,7 @@ case class DataSource(
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
newHadoopConfiguration()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
val dataSchema = userSpecifiedSchema.orElse {
Expand Down Expand Up @@ -421,7 +425,7 @@ case class DataSource(
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
Expand Down Expand Up @@ -541,7 +545,7 @@ case class DataSource(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val hadoopConf = newHadoopConfiguration()
allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,22 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}

test("SPARK-31935: Hadoop file system config should be effective in data source options") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val defaultFs = "nonexistFS://nonexistFS"
val expectMessage = "No FileSystem for scheme: nonexistFS"
val message1 = intercept[java.io.IOException] {
spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message1 == expectMessage)
val message2 = intercept[java.io.IOException] {
spark.read.option("fs.defaultFS", defaultFs).parquet(path)
}.getMessage
assert(message2 == expectMessage)
}
}

test("SPARK-25237 compute correct input metrics in FileScanRDD") {
withTempPath { p =>
val path = p.getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-31935: Hadoop file system config should be effective in data source options") {
withTempDir { dir =>
val path = dir.getCanonicalPath
val defaultFs = "nonexistFS://nonexistFS"
val expectMessage = "No FileSystem for scheme: nonexistFS"
val message = intercept[java.io.IOException] {
spark.readStream.option("fs.defaultFS", defaultFs).text(path)
}.getMessage
assert(message == expectMessage)
}
}

// =============== JSON file stream tests ================

test("read from json files") {
Expand Down