diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 395e1c999f02..1d74b35d9210 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -190,10 +190,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance() - val options = new DataSourceOptions((extraOptions ++ - DataSourceV2Utils.extractSessionConfigs( - ds = ds.asInstanceOf[DataSourceV2], - conf = sparkSession.sessionState.conf)).asJava) + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + ds = ds.asInstanceOf[DataSourceV2], + conf = sparkSession.sessionState.conf) + val options = new DataSourceOptions((sessionOptions ++ extraOptions).asJava) // Streaming also uses the data source V2 API. So it may be that the data source implements // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6c9fb52290e7..3fcefb1b6656 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -243,10 +243,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val ds = cls.newInstance() ds match { case ws: WriteSupport => - val options = new DataSourceOptions((extraOptions ++ - DataSourceV2Utils.extractSessionConfigs( - ds = ds.asInstanceOf[DataSourceV2], - conf = df.sparkSession.sessionState.conf)).asJava) + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + ds = ds.asInstanceOf[DataSourceV2], + conf = df.sparkSession.sessionState.conf) + val options = new DataSourceOptions((sessionOptions ++ extraOptions).asJava) // Using a timestamp and a random UUID to distinguish different writing jobs. This is good // enough as there won't be tons of writing jobs created at the same second. val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 6ad0e5f79bc4..ec81e89d9216 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.sources.v2 +import java.io.File import java.util.{ArrayList, List => JList} import test.org.apache.spark.sql.sources.v2._ @@ -315,19 +316,52 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { checkCanonicalizedOutput(df, 2) checkCanonicalizedOutput(df.select('i), 1) } -} - -class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { - class Reader extends DataSourceReader { - override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") + test("SPARK-25425: extra options should override sessions options during reading") { + val prefix = "spark.datasource.userDefinedDataSource." + val optionName = "optionA" + withSQLConf(prefix + optionName -> "true") { + val df = spark + .read + .option(optionName, false) + .format(classOf[DataSourceV2WithSessionConfig].getName).load() + val options = df.queryExecution.optimizedPlan.collectFirst { + case DataSourceV2Relation(_, SimpleDataSourceV2Reader(options)) => options + } + assert(options.get.getBoolean(optionName, true) == false) + } + } - override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = { - java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new SimpleDataReaderFactory(5, 10)) + test("SPARK-25425: extra options should override sessions options during writing") { + withTempPath { path => + val sessionPath = path.getCanonicalPath + withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) { + withTempPath { file => + val optionPath = file.getCanonicalPath + val format = classOf[SimpleWritableDataSource].getName + + val df = Seq((1L, 2L)).toDF("i", "j") + df.write.format(format).option("path", optionPath).save() + assert(!new File(sessionPath).exists) + checkAnswer(spark.read.format(format).option("path", optionPath).load(), df) + } + } } } +} - override def createReader(options: DataSourceOptions): DataSourceReader = new Reader +case class SimpleDataSourceV2Reader(options: DataSourceOptions) extends DataSourceReader { + override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") + + override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = { + java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new SimpleDataReaderFactory(5, 10)) + } +} + +class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { + override def createReader(options: DataSourceOptions): DataSourceReader = { + SimpleDataSourceV2Reader(options) + } } class SimpleDataReaderFactory(start: Int, end: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index a131b16953e3..ea93fb4e1228 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -38,10 +38,15 @@ import org.apache.spark.util.SerializableConfiguration * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/jobId/` to `target`. */ -class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { +class SimpleWritableDataSource extends DataSourceV2 + with ReadSupport + with WriteSupport + with SessionConfigSupport { private val schema = new StructType().add("i", "long").add("j", "long") + override def keyPrefix: String = "simpleWritableDataSource" + class Reader(path: String, conf: Configuration) extends DataSourceReader { override def readSchema(): StructType = schema