-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22387][SQL] Propagate session configs to data source read/write options #19861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
356c55e
b076a69
eaa6cae
ec5723c
84df37e
0dd7f2e
8329a6b
6b4fcab
ec9a717
ebb8d86
52aaf51
d964158
f7d5a4d
5292329
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2; | ||
|
|
||
| import org.apache.spark.annotation.InterfaceStability; | ||
| import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| /** | ||
| * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to | ||
| * propagate session configs with chosen key-prefixes to the particular data source. | ||
| */ | ||
| @InterfaceStability.Evolving | ||
| public interface ConfigSupport { | ||
|
||
|
|
||
| /** | ||
| * Create a list of key-prefixes, all session configs that match at least one of the prefixes | ||
| * will be propagated to the data source options. | ||
| */ | ||
| List<String> getConfigPrefixes(); | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ package org.apache.spark.sql | |
| import java.util.{Locale, Properties} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.immutable | ||
|
|
||
| import org.apache.spark.Partition | ||
| import org.apache.spark.annotation.InterfaceStability | ||
|
|
@@ -33,7 +34,8 @@ import org.apache.spark.sql.execution.datasources.csv._ | |
| import org.apache.spark.sql.execution.datasources.jdbc._ | ||
| import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource | ||
| import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation | ||
| import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.sources.v2._ | ||
| import org.apache.spark.sql.types.{StringType, StructType} | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
|
|
@@ -169,6 +171,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { | |
| option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)` | ||
| } | ||
|
|
||
| import DataFrameReader._ | ||
|
|
||
| /** | ||
| * Loads input in as a `DataFrame`, for data sources that support multiple paths. | ||
| * Only works if the source is a HadoopFsRelationProvider. | ||
|
|
@@ -184,9 +188,16 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { | |
|
|
||
| val cls = DataSource.lookupDataSource(source) | ||
| if (classOf[DataSourceV2].isAssignableFrom(cls)) { | ||
| val options = new DataSourceV2Options(extraOptions.asJava) | ||
| val dataSource = cls.newInstance() | ||
| val options = dataSource match { | ||
| case cs: ConfigSupport => | ||
| val confs = withSessionConfig(cs, sparkSession.sessionState.conf) | ||
| new DataSourceV2Options((confs ++ extraOptions).asJava) | ||
|
||
| case _ => | ||
| new DataSourceV2Options(extraOptions.asJava) | ||
| } | ||
|
|
||
| val reader = (cls.newInstance(), userSpecifiedSchema) match { | ||
| val reader = (dataSource, userSpecifiedSchema) match { | ||
| case (ds: ReadSupportWithSchema, Some(schema)) => | ||
| ds.createReader(schema, options) | ||
|
|
||
|
|
@@ -732,3 +743,25 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { | |
| private val extraOptions = new scala.collection.mutable.HashMap[String, String] | ||
|
|
||
| } | ||
|
|
||
| private[sql] object DataFrameReader { | ||
|
|
||
| /** | ||
| * Helper method to filter session configs with config key that matches at least one of the given | ||
| * prefixes. | ||
| * | ||
| * @param cs the config key-prefixes that should be filtered. | ||
| * @param conf the session conf | ||
| * @return an immutable map that contains all the session configs that should be propagated to | ||
| * the data source. | ||
| */ | ||
| def withSessionConfig( | ||
|
||
| cs: ConfigSupport, | ||
| conf: SQLConf): immutable.Map[String, String] = { | ||
| val prefixes = cs.getConfigPrefixes | ||
| require(prefixes != null, "The config key-prefixes cann't be null.") | ||
| conf.getAllConfs.filterKeys { confKey => | ||
| prefixes.asScala.exists(confKey.startsWith(_)) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,9 @@ import java.util.{ArrayList, List => JList} | |
| import test.org.apache.spark.sql.sources.v2._ | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.sql.{AnalysisException, QueryTest, Row} | ||
| import org.apache.spark.sql.{AnalysisException, DataFrameReader, QueryTest, Row} | ||
| import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.sources.{Filter, GreaterThan} | ||
| import org.apache.spark.sql.sources.v2.reader._ | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
|
|
@@ -43,6 +44,21 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { | |
| } | ||
| } | ||
|
|
||
| test("simple implementation with config support") { | ||
|
||
| withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false", | ||
| SQLConf.PARQUET_INT96_AS_TIMESTAMP.key -> "true", | ||
| SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "32", | ||
| SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM.key -> "10000") { | ||
| val cs = classOf[DataSourceV2WithConfig].newInstance().asInstanceOf[ConfigSupport] | ||
| val confs = DataFrameReader.withSessionConfig(cs, SQLConf.get) | ||
| assert(confs.size == 3) | ||
| assert(confs.keySet.filter(_.startsWith("spark.sql.parquet")).size == 2) | ||
| assert(confs.keySet.filter( | ||
| _.startsWith("spark.sql.sources.parallelPartitionDiscovery.threshold")).size == 1) | ||
| assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0) | ||
| } | ||
| } | ||
|
|
||
| test("advanced implementation") { | ||
| Seq(classOf[AdvancedDataSourceV2], classOf[JavaAdvancedDataSourceV2]).foreach { cls => | ||
| withClue(cls.getName) { | ||
|
|
@@ -179,7 +195,14 @@ class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with DataReader | |
| override def close(): Unit = {} | ||
| } | ||
|
|
||
| class DataSourceV2WithConfig extends SimpleDataSourceV2 with ConfigSupport { | ||
|
|
||
| override def getConfigPrefixes: JList[String] = { | ||
| java.util.Arrays.asList( | ||
| "spark.sql.parquet", | ||
| "spark.sql.sources.parallelPartitionDiscovery.threshold") | ||
| } | ||
| } | ||
|
|
||
| class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport { | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
propagate session configs with the specified key-prefix to all data source operations in this session