Skip to content

Commit a9a8d3a

Browse files
MaxGekkdongjoon-hyun
authored andcommitted
[SPARK-25425][SQL][BACKPORT-2.4] Extra options should override session options in DataSource V2
## What changes were proposed in this pull request? In the PR, I propose overriding session options by extra options in DataSource V2. Extra options are more specific and set via `.option()`, and should overwrite more generic session options. ## How was this patch tested? Added tests for read and write paths. Closes #22474 from MaxGekk/session-options-2.4. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 9031c78 commit a9a8d3a

File tree

4 files changed

+45
-5
lines changed

4 files changed

+45
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
202202
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
203203
}
204204
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
205-
ds, extraOptions.toMap ++ sessionOptions + pathsOption,
205+
ds, sessionOptions ++ extraOptions.toMap + pathsOption,
206206
userSpecifiedSchema = userSpecifiedSchema))
207207
} else {
208208
loadV1Source(paths: _*)

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
241241
val source = cls.newInstance().asInstanceOf[DataSourceV2]
242242
source match {
243243
case ws: WriteSupport =>
244-
val options = extraOptions ++
245-
DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
244+
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
245+
source,
246+
df.sparkSession.sessionState.conf)
247+
val options = sessionOptions ++ extraOptions
248+
val relation = DataSourceV2Relation.create(source, options)
246249

247-
val relation = DataSourceV2Relation.create(source, options.toMap)
248250
if (mode == SaveMode.Append) {
249251
runCommand(df.sparkSession, "save") {
250252
AppendData.byName(relation, df.logicalPlan)

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.sources.v2
1919

20+
import java.io.File
2021
import java.util.{ArrayList, List => JList}
2122

2223
import test.org.apache.spark.sql.sources.v2._
@@ -322,6 +323,38 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
322323
checkCanonicalizedOutput(df, 2, 2)
323324
checkCanonicalizedOutput(df.select('i), 2, 1)
324325
}
326+
327+
test("SPARK-25425: extra options should override sessions options during reading") {
328+
val prefix = "spark.datasource.userDefinedDataSource."
329+
val optionName = "optionA"
330+
withSQLConf(prefix + optionName -> "true") {
331+
val df = spark
332+
.read
333+
.option(optionName, false)
334+
.format(classOf[DataSourceV2WithSessionConfig].getName).load()
335+
val options = df.queryExecution.optimizedPlan.collectFirst {
336+
case d: DataSourceV2Relation => d.options
337+
}
338+
assert(options.get.get(optionName) == Some("false"))
339+
}
340+
}
341+
342+
test("SPARK-25425: extra options should override sessions options during writing") {
343+
withTempPath { path =>
344+
val sessionPath = path.getCanonicalPath
345+
withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) {
346+
withTempPath { file =>
347+
val optionPath = file.getCanonicalPath
348+
val format = classOf[SimpleWritableDataSource].getName
349+
350+
val df = Seq((1L, 2L)).toDF("i", "j")
351+
df.write.format(format).option("path", optionPath).save()
352+
assert(!new File(sessionPath).exists)
353+
checkAnswer(spark.read.format(format).option("path", optionPath).load(), df)
354+
}
355+
}
356+
}
357+
}
325358
}
326359

327360
class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,15 @@ import org.apache.spark.util.SerializableConfiguration
3838
* Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
3939
* Each job moves files from `target/_temporary/jobId/` to `target`.
4040
*/
41-
class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport {
41+
class SimpleWritableDataSource extends DataSourceV2
42+
with ReadSupport
43+
with WriteSupport
44+
with SessionConfigSupport {
4245

4346
private val schema = new StructType().add("i", "long").add("j", "long")
4447

48+
override def keyPrefix: String = "simpleWritableDataSource"
49+
4550
class Reader(path: String, conf: Configuration) extends DataSourceReader {
4651
override def readSchema(): StructType = schema
4752

0 commit comments

Comments
 (0)