diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e6c2cba79841..fe69f252d43e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) } Dataset.ofRows(sparkSession, DataSourceV2Relation.create( - ds, extraOptions.toMap ++ sessionOptions + pathsOption, + ds, sessionOptions ++ extraOptions.toMap + pathsOption, userSpecifiedSchema = userSpecifiedSchema)) } else { loadV1Source(paths: _*) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index dfb8c4718550..188fce72efac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val source = cls.newInstance().asInstanceOf[DataSourceV2] source match { case provider: BatchWriteSupportProvider => - val options = extraOptions ++ - DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf) + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + source, + df.sparkSession.sessionState.conf) + val options = sessionOptions ++ extraOptions - val relation = DataSourceV2Relation.create(source, options.toMap) + val relation = DataSourceV2Relation.create(source, options) if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index f6c3e0ce82e3..7cc8abc9f042 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources.v2 +import java.io.File + import test.org.apache.spark.sql.sources.v2._ import org.apache.spark.SparkException @@ -317,6 +319,38 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { checkCanonicalizedOutput(df, 2, 2) checkCanonicalizedOutput(df.select('i), 2, 1) } + + test("SPARK-25425: extra options should override sessions options during reading") { + val prefix = "spark.datasource.userDefinedDataSource." + val optionName = "optionA" + withSQLConf(prefix + optionName -> "true") { + val df = spark + .read + .option(optionName, false) + .format(classOf[DataSourceV2WithSessionConfig].getName).load() + val options = df.queryExecution.optimizedPlan.collectFirst { + case d: DataSourceV2Relation => d.options + } + assert(options.get.get(optionName) == Some("false")) + } + } + + test("SPARK-25425: extra options should override sessions options during writing") { + withTempPath { path => + val sessionPath = path.getCanonicalPath + withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) { + withTempPath { file => + val optionPath = file.getCanonicalPath + val format = classOf[SimpleWritableDataSource].getName + + val df = Seq((1L, 2L)).toDF("i", "j") + df.write.format(format).option("path", optionPath).save() + assert(!new File(sessionPath).exists) + checkAnswer(spark.read.format(format).option("path", optionPath).load(), df) + } + } + } + } } @@ -385,7 +419,6 @@ class SimpleDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider { } } - class AdvancedDataSourceV2 extends DataSourceV2 with BatchReadSupportProvider { class ReadSupport extends SimpleReadSupport { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 952241b0b6be..a0f4404f4614 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -39,10 +39,14 @@ import org.apache.spark.util.SerializableConfiguration * Each job moves files from `target/_temporary/queryId/` to `target`. */ class SimpleWritableDataSource extends DataSourceV2 - with BatchReadSupportProvider with BatchWriteSupportProvider { + with BatchReadSupportProvider + with BatchWriteSupportProvider + with SessionConfigSupport { private val schema = new StructType().add("i", "long").add("j", "long") + override def keyPrefix: String = "simpleWritableDataSource" + class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport { override def fullSchema(): StructType = schema