From 0cc174160bf04bc97de62c438d571b27224c53a0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 15 Sep 2018 17:24:11 -0700 Subject: [PATCH 1/4] [SPARK-25425][SQL] Extra options should override session options in DataSource V2 In the PR, I propose overriding session options by extra options in DataSource V2. Extra options are more specific and set via `.option()`, and should overwrite more generic session options. Entries from seconds map overwrites entries with the same key from the first map, for example: ```Scala scala> Map("option" -> false) ++ Map("option" -> true) res0: scala.collection.immutable.Map[String,Boolean] = Map(option -> true) ``` Added a test for checking which option is propagated to a data source in `load()`. Closes #22413 from MaxGekk/session-options. Lead-authored-by: Maxim Gekk Co-authored-by: Dongjoon Hyun Co-authored-by: Maxim Gekk Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/DataFrameReader.scala | 8 ++--- .../apache/spark/sql/DataFrameWriter.scala | 8 ++--- .../sql/sources/v2/DataSourceV2Suite.scala | 32 +++++++++++++++++++ .../sources/v2/SimpleWritableDataSource.scala | 7 +++- 4 files changed, 46 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 395e1c999f02..f8e27e935d5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -190,10 +190,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance() - val options = new DataSourceOptions((extraOptions ++ - DataSourceV2Utils.extractSessionConfigs( - ds = ds.asInstanceOf[DataSourceV2], - conf = sparkSession.sessionState.conf)).asJava) + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + ds = ds.asInstanceOf[DataSourceV2], + conf = sparkSession.sessionState.conf) + val options = new DataSourceOptions((sessionOptions ++ extraOptions)).asJava) // Streaming also uses the data source V2 API. So it may be that the data source implements // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 6c9fb52290e7..3fcefb1b6656 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -243,10 +243,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val ds = cls.newInstance() ds match { case ws: WriteSupport => - val options = new DataSourceOptions((extraOptions ++ - DataSourceV2Utils.extractSessionConfigs( - ds = ds.asInstanceOf[DataSourceV2], - conf = df.sparkSession.sessionState.conf)).asJava) + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + ds = ds.asInstanceOf[DataSourceV2], + conf = df.sparkSession.sessionState.conf) + val options = new DataSourceOptions((sessionOptions ++ extraOptions).asJava) // Using a timestamp and a random UUID to distinguish different writing jobs. This is good // enough as there won't be tons of writing jobs created at the same second. val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 6ad0e5f79bc4..d9e9c5380b02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -315,6 +315,38 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { checkCanonicalizedOutput(df, 2) checkCanonicalizedOutput(df.select('i), 1) } + + test("SPARK-25425: extra options should override sessions options during reading") { + val prefix = "spark.datasource.userDefinedDataSource." + val optionName = "optionA" + withSQLConf(prefix + optionName -> "true") { + val df = spark + .read + .option(optionName, false) + .format(classOf[DataSourceV2WithSessionConfig].getName).load() + val options = df.queryExecution.optimizedPlan.collectFirst { + case d: DataSourceV2Relation => d.options + } + assert(options.get.get(optionName) == Some("false")) + } + } + + test("SPARK-25425: extra options should override sessions options during writing") { + withTempPath { path => + val sessionPath = path.getCanonicalPath + withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) { + withTempPath { file => + val optionPath = file.getCanonicalPath + val format = classOf[SimpleWritableDataSource].getName + + val df = Seq((1L, 2L)).toDF("i", "j") + df.write.format(format).option("path", optionPath).save() + assert(!new File(sessionPath).exists) + checkAnswer(spark.read.format(format).option("path", optionPath).load(), df) + } + } + } + } } class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index a131b16953e3..ea93fb4e1228 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -38,10 +38,15 @@ import org.apache.spark.util.SerializableConfiguration * Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/jobId/` to `target`. */ -class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport { +class SimpleWritableDataSource extends DataSourceV2 + with ReadSupport + with WriteSupport + with SessionConfigSupport { private val schema = new StructType().add("i", "long").add("j", "long") + override def keyPrefix: String = "simpleWritableDataSource" + class Reader(path: String, conf: Configuration) extends DataSourceReader { override def readSchema(): StructType = schema From 744c5b8b0c8fbc7caa6d03902ed323df5c73986b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 19 Sep 2018 21:40:25 +0200 Subject: [PATCH 2/4] Adding missing import after merge to 2.4 --- .../org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index d9e9c5380b02..c2db9100681e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.sources.v2 +import java.io.File import java.util.{ArrayList, List => JList} import test.org.apache.spark.sql.sources.v2._ From c87a6de4a1de82514fad590c68f35fbfaf54d825 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 19 Sep 2018 22:20:21 +0200 Subject: [PATCH 3/4] Fix merge --- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index f8e27e935d5d..1d74b35d9210 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -193,7 +193,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( ds = ds.asInstanceOf[DataSourceV2], conf = sparkSession.sessionState.conf) - val options = new DataSourceOptions((sessionOptions ++ extraOptions)).asJava) + val options = new DataSourceOptions((sessionOptions ++ extraOptions).asJava) // Streaming also uses the data source V2 API. So it may be that the data source implements // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading From f8b5aa6b3d1fc9e953bcfef8f23aeb6f3ab7818d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 20 Sep 2018 11:47:57 +0200 Subject: [PATCH 4/4] Adding options to Reader --- .../sql/sources/v2/DataSourceV2Suite.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index c2db9100681e..ec81e89d9216 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -326,9 +326,9 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { .option(optionName, false) .format(classOf[DataSourceV2WithSessionConfig].getName).load() val options = df.queryExecution.optimizedPlan.collectFirst { - case d: DataSourceV2Relation => d.options + case DataSourceV2Relation(_, SimpleDataSourceV2Reader(options)) => options } - assert(options.get.get(optionName) == Some("false")) + assert(options.get.getBoolean(optionName, true) == false) } } @@ -350,17 +350,18 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } -class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { - - class Reader extends DataSourceReader { - override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") +case class SimpleDataSourceV2Reader(options: DataSourceOptions) extends DataSourceReader { + override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int") - override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = { - java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new SimpleDataReaderFactory(5, 10)) - } + override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = { + java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new SimpleDataReaderFactory(5, 10)) } +} - override def createReader(options: DataSourceOptions): DataSourceReader = new Reader +class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport { + override def createReader(options: DataSourceOptions): DataSourceReader = { + SimpleDataSourceV2Reader(options) + } } class SimpleDataReaderFactory(start: Int, end: Int)