diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ce3bc3dd4832..ff5fe093f9df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -103,6 +104,9 @@ case class DataSource( bucket.sortColumnNames, "in the sort definition", equality) } + private def newHadoopConfiguration(): Configuration = + sparkSession.sessionState.newHadoopConfWithOptions(options) + /** * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer * it. In the read path, only managed tables by Hive provide the partition columns properly when @@ -220,7 +224,7 @@ case class DataSource( // once the streaming job starts and some upstream source starts dropping data. val hdfsPath = new Path(path) if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) { - val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = hdfsPath.getFileSystem(newHadoopConfiguration()) if (!fs.exists(hdfsPath)) { throw new AnalysisException(s"Path does not exist: $path") } @@ -331,7 +335,7 @@ case class DataSource( case (format: FileFormat, _) if FileStreamSink.hasMetadata( caseInsensitiveOptions.get("path").toSeq ++ paths, - sparkSession.sessionState.newHadoopConf()) => + newHadoopConfiguration()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema) val dataSchema = userSpecifiedSchema.orElse { @@ -421,7 +425,7 @@ case class DataSource( val allPaths = paths ++ caseInsensitiveOptions.get("path") val outputPath = if (allPaths.length == 1) { val path = new Path(allPaths.head) - val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val fs = path.getFileSystem(newHadoopConfiguration()) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } else { throw new IllegalArgumentException("Expected exactly one path to be specified, but " + @@ -541,7 +545,7 @@ case class DataSource( checkEmptyGlobPath: Boolean, checkFilesExist: Boolean): Seq[Path] = { val allPaths = caseInsensitiveOptions.get("path") ++ paths - val hadoopConf = sparkSession.sessionState.newHadoopConf() + val hadoopConf = newHadoopConfiguration() allPaths.flatMap { path => val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 94f163708832..dd6128ac167d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -478,6 +478,22 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + test("SPARK-31935: Hadoop file system config should be effective in data source options") { + withTempDir { dir => + val path = dir.getCanonicalPath + val defaultFs = "nonexistFS://nonexistFS" + val expectMessage = "No FileSystem for scheme: nonexistFS" + val message1 = intercept[java.io.IOException] { + spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path) + }.getMessage + assert(message1 == expectMessage) + val message2 = intercept[java.io.IOException] { + spark.read.option("fs.defaultFS", defaultFs).parquet(path) + }.getMessage + assert(message2 == expectMessage) + } + } + test("SPARK-25237 compute correct input metrics in FileScanRDD") { withTempPath { p => val path = p.getAbsolutePath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 9f6553ed4474..34e3f4e42bd2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -534,6 +534,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-31935: Hadoop file system config should be effective in data source options") { + withTempDir { dir => + val path = dir.getCanonicalPath + val defaultFs = "nonexistFS://nonexistFS" + val expectMessage = "No FileSystem for scheme: nonexistFS" + val message = intercept[java.io.IOException] { + spark.readStream.option("fs.defaultFS", defaultFs).text(path) + }.getMessage + assert(message == expectMessage) + } + } + // =============== JSON file stream tests ================ test("read from json files") {