Skip to content

Commit ec9a717

Browse files
committed
simplify ConfigSupport interface.
1 parent 6b4fcab commit ec9a717

File tree

4 files changed

+32
-119
lines changed

4 files changed

+32
-119
lines changed

sql/core/src/main/java/org/apache/spark/sql/sources/v2/ConfigSupport.java

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,33 +30,9 @@
3030
public interface ConfigSupport {
3131

3232
/**
33-
* Create a list of key-prefixes, all session configs that match at least one of the prefixes
34-
* will be propagated to the data source options.
35-
* If the returned list is empty, no session config will be propagated.
33+
* Name for the specified data source, will extract all session configs that starts with
34+
* `spark.datasource.$name`, turn `spark.datasource.$name.xxx -> yyy` into
35+
* `xxx -> yyy`, and propagate them to all data source operations in this session.
3636
*/
37-
List<String> getConfigPrefixes();
38-
39-
/**
40-
* Create a mapping from session config names to data source option names. If a propagated
41-
* session config's key doesn't exist in this mapping, the "spark.sql.${source}" prefix will
42-
* be trimmed. For example, if the data source name is "parquet", perform the following config
43-
* key mapping by default:
44-
* "spark.sql.parquet.int96AsTimestamp" -&gt; "int96AsTimestamp",
45-
* "spark.sql.parquet.compression.codec" -&gt; "compression.codec",
46-
* "spark.sql.columnNameOfCorruptRecord" -&gt; "columnNameOfCorruptRecord".
47-
*
48-
* If the mapping is specified, for example, the returned map contains an entry
49-
* ("spark.sql.columnNameOfCorruptRecord" -&gt; "colNameCorrupt"), then the session config
50-
* "spark.sql.columnNameOfCorruptRecord" will be converted to "colNameCorrupt" in
51-
* {@link DataSourceV2Options}.
52-
*/
53-
Map<String, String> getConfigMapping();
54-
55-
/**
56-
* Create a list of valid data source option names. When the list is specified, a session
57-
* config will NOT be propagated if its corresponding option name is not in the list.
58-
*
59-
* If the returned list is empty, don't check the option names.
60-
*/
61-
List<String> getValidOptions();
37+
String name();
6238
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
190190
val dataSource = cls.newInstance()
191191
val options = dataSource match {
192192
case cs: ConfigSupport =>
193-
val confs = withSessionConfig(cs, source, sparkSession.sessionState.conf)
193+
val confs = withSessionConfig(cs.name, sparkSession.sessionState.conf)
194194
new DataSourceV2Options((confs ++ extraOptions).asJava)
195195
case _ =>
196196
new DataSourceV2Options(extraOptions.asJava)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ConfigSupport.scala

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,57 +29,29 @@ import org.apache.spark.sql.sources.v2.ConfigSupport
2929
private[sql] object DataSourceV2ConfigSupport extends Logging {
3030

3131
/**
32-
* Helper method to propagate session configs with config key that matches at least one of the
33-
* given prefixes to the corresponding data source options.
32+
* Helper method that turns session configs with config keys that start with
33+
* `spark.datasource.$name` into k/v pairs, the k/v pairs will be used to create data source
34+
* options.
35+
* A session config `spark.datasource.$name.xxx -> yyy` will be transformed into
36+
* `xxx -> yyy`.
3437
*
35-
* @param cs the session config propagate help class
36-
* @param source the data source format
38+
* @param name the data source name
3739
* @param conf the session conf
38-
* @return an immutable map that contains all the session configs that should be propagated to
39-
* the data source.
40+
* @return an immutable map that contains all the extracted and transformed k/v pairs.
4041
*/
4142
def withSessionConfig(
42-
cs: ConfigSupport,
43-
source: String,
43+
name: String,
4444
conf: SQLConf): immutable.Map[String, String] = {
45-
val prefixes = cs.getConfigPrefixes
46-
require(prefixes != null, "The config key-prefixes cann't be null.")
47-
val mapping = cs.getConfigMapping.asScala
48-
val validOptions = cs.getValidOptions
49-
require(validOptions != null, "The valid options list cann't be null.")
45+
require(name != null, "The data source name can't be null.")
5046

51-
val pattern = Pattern.compile(s"spark\\.sql(\\.$source)?\\.(.*)")
47+
val pattern = Pattern.compile(s"spark\\.datasource\\.$name\\.(.*)")
5248
val filteredConfigs = conf.getAllConfs.filterKeys { confKey =>
53-
prefixes.asScala.exists(confKey.startsWith(_))
49+
confKey.startsWith(s"spark.datasource.$name")
5450
}
55-
val convertedConfigs = filteredConfigs.map{ entry =>
56-
val newKey = mapping.get(entry._1).getOrElse {
57-
val m = pattern.matcher(entry._1)
58-
if (m.matches()) {
59-
m.group(2)
60-
} else {
61-
// Unable to recognize the session config key.
62-
logWarning(s"Unrecognizable session config name ${entry._1}.")
63-
entry._1
64-
}
65-
}
66-
(newKey, entry._2)
67-
}
68-
if (validOptions.size == 0) {
69-
convertedConfigs
70-
} else {
71-
// Check whether all the valid options are propagated.
72-
validOptions.asScala.foreach { optionName =>
73-
if (!convertedConfigs.keySet.contains(optionName)) {
74-
logWarning(s"Data source option '$optionName' is required, but not propagated from " +
75-
"session config, please check the config settings.")
76-
}
77-
}
78-
79-
// Filter the valid options.
80-
convertedConfigs.filterKeys { optionName =>
81-
validOptions.contains(optionName)
82-
}
51+
filteredConfigs.map { entry =>
52+
val m = pattern.matcher(entry._1)
53+
require(m.matches() && m.groupCount() > 0, s"Fail in matching ${entry._1} with $pattern.")
54+
(m.group(1), entry._2)
8355
}
8456
}
8557
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 12 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.spark.sql.sources.v2
1919

20-
import java.util
2120
import java.util.{ArrayList, List => JList}
2221

2322
import test.org.apache.spark.sql.sources.v2._
@@ -35,6 +34,8 @@ import org.apache.spark.sql.types.StructType
3534
class DataSourceV2Suite extends QueryTest with SharedSQLContext {
3635
import testImplicits._
3736

37+
private val dsName = "userDefinedDataSource"
38+
3839
test("simplest implementation") {
3940
Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
4041
withClue(cls.getName) {
@@ -47,32 +48,18 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
4748
}
4849

4950
test("simple implementation with config support") {
50-
withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false",
51-
SQLConf.PARQUET_COMPRESSION.key -> "uncompressed",
52-
SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "32",
53-
SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM.key -> "10000") {
51+
// Only match configs with keys start with "spark.datasource.${dsName}".
52+
withSQLConf(s"spark.datasource.$dsName.foo.bar" -> "false",
53+
s"spark.datasource.$dsName.whateverConfigName" -> "123",
54+
s"spark.sql.$dsName.config.name" -> "false",
55+
s"spark.datasource.another.config.name" -> "123") {
5456
val cs = classOf[DataSourceV2WithConfig].newInstance().asInstanceOf[ConfigSupport]
55-
val confs = DataSourceV2ConfigSupport.withSessionConfig(cs, "parquet", SQLConf.get)
56-
assert(confs.size == 3)
57-
assert(confs.keySet.filter(_.startsWith("spark.sql.parquet")).size == 0)
58-
assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
59-
assert(confs.keySet.contains("compressionCodec"))
60-
assert(confs.keySet.contains("sources.parallelPartitionDiscovery.threshold"))
61-
}
62-
}
63-
64-
test("config support with validOptions") {
65-
withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false",
66-
SQLConf.PARQUET_COMPRESSION.key -> "uncompressed",
67-
SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "32",
68-
SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM.key -> "10000") {
69-
val cs = classOf[DataSourceV2WithValidOptions].newInstance().asInstanceOf[ConfigSupport]
70-
val confs = DataSourceV2ConfigSupport.withSessionConfig(cs, "parquet", SQLConf.get)
57+
val confs = DataSourceV2ConfigSupport.withSessionConfig(cs.name, SQLConf.get)
7158
assert(confs.size == 2)
72-
assert(confs.keySet.filter(_.startsWith("spark.sql.parquet")).size == 0)
59+
assert(confs.keySet.filter(_.startsWith("spark.datasource")).size == 0)
7360
assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
74-
assert(confs.keySet.contains("compressionCodec"))
75-
assert(confs.keySet.contains("sources.parallelPartitionDiscovery.threshold"))
61+
assert(confs.keySet.contains("foo.bar"))
62+
assert(confs.keySet.contains("whateverConfigName"))
7663
}
7764
}
7865

@@ -214,29 +201,7 @@ class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with DataReader
214201

215202
class DataSourceV2WithConfig extends SimpleDataSourceV2 with ConfigSupport {
216203

217-
override def getConfigPrefixes: JList[String] = {
218-
java.util.Arrays.asList(
219-
"spark.sql.parquet",
220-
"spark.sql.sources.parallelPartitionDiscovery.threshold")
221-
}
222-
223-
override def getConfigMapping: util.Map[String, String] = {
224-
val configMap = new util.HashMap[String, String]()
225-
configMap.put("spark.sql.parquet.compression.codec", "compressionCodec")
226-
configMap
227-
}
228-
229-
override def getValidOptions: JList[String] = new util.ArrayList[String]()
230-
}
231-
232-
class DataSourceV2WithValidOptions extends DataSourceV2WithConfig {
233-
234-
override def getValidOptions: JList[String] = {
235-
java.util.Arrays.asList(
236-
"sources.parallelPartitionDiscovery.threshold",
237-
"compressionCodec",
238-
"not.exist.option")
239-
}
204+
override def name: String = "userDefinedDataSource"
240205
}
241206

242207
class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {

0 commit comments

Comments
 (0)