Skip to content

Commit 41f2080

Browse files
jiangxb1987rdblue
authored andcommitted
[SPARK-22387][SQL] Propagate session configs to data source read/write options
Introduce a new interface `SessionConfigSupport` for `DataSourceV2`, it can help to propagate session configs with the specified key-prefix to all data source operations in this session. Add new test suite `DataSourceV2UtilsSuite`. Author: Xingbo Jiang <[email protected]> Closes apache#19861 from jiangxb1987/datasource-configs.
1 parent 9249cb5 commit 41f2080

File tree

3 files changed

+146
-0
lines changed

3 files changed

+146
-0
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
/**
26+
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
27+
* propagate session configs with the specified key-prefix to all data source operations in this
28+
* session.
29+
*/
30+
@InterfaceStability.Evolving
31+
public interface SessionConfigSupport {
32+
33+
/**
34+
* Key prefix of the session configs to propagate. Spark will extract all session configs that
35+
* starts with `spark.datasource.$keyPrefix`, turn `spark.datasource.$keyPrefix.xxx -&gt; yyy`
36+
* into `xxx -&gt; yyy`, and propagate them to all data source operations in this session.
37+
*/
38+
String keyPrefix();
39+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import java.util.regex.Pattern
21+
22+
import org.apache.spark.internal.Logging
23+
import org.apache.spark.sql.internal.SQLConf
24+
import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport}
25+
26+
private[sql] object DataSourceV2Utils extends Logging {
27+
28+
/**
29+
* Helper method that extracts and transforms session configs into k/v pairs, the k/v pairs will
30+
* be used to create data source options.
31+
* Only extract when `ds` implements [[SessionConfigSupport]], in this case we may fetch the
32+
* specified key-prefix from `ds`, and extract session configs with config keys that start with
33+
* `spark.datasource.$keyPrefix`. A session config `spark.datasource.$keyPrefix.xxx -> yyy` will
34+
* be transformed into `xxx -> yyy`.
35+
*
36+
* @param ds a [[DataSourceV2]] object
37+
* @param conf the session conf
38+
* @return an immutable map that contains all the extracted and transformed k/v pairs.
39+
*/
40+
def extractSessionConfigs(ds: DataSourceV2, conf: SQLConf): Map[String, String] = ds match {
41+
case cs: SessionConfigSupport =>
42+
val keyPrefix = cs.keyPrefix()
43+
require(keyPrefix != null, "The data source config key prefix can't be null.")
44+
45+
val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.+)")
46+
47+
conf.getAllConfs.flatMap { case (key, value) =>
48+
val m = pattern.matcher(key)
49+
if (m.matches() && m.groupCount() > 0) {
50+
Seq((m.group(1), value))
51+
} else {
52+
Seq.empty
53+
}
54+
}
55+
56+
case _ => Map.empty
57+
}
58+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
22+
import org.apache.spark.sql.internal.SQLConf
23+
24+
class DataSourceV2UtilsSuite extends SparkFunSuite {
25+
26+
private val keyPrefix = new DataSourceV2WithSessionConfig().keyPrefix
27+
28+
test("method withSessionConfig() should propagate session configs correctly") {
29+
// Only match configs with keys start with "spark.datasource.${keyPrefix}".
30+
val conf = new SQLConf
31+
conf.setConfString(s"spark.datasource.$keyPrefix.foo.bar", "false")
32+
conf.setConfString(s"spark.datasource.$keyPrefix.whateverConfigName", "123")
33+
conf.setConfString(s"spark.sql.$keyPrefix.config.name", "false")
34+
conf.setConfString("spark.datasource.another.config.name", "123")
35+
conf.setConfString(s"spark.datasource.$keyPrefix.", "123")
36+
val cs = classOf[DataSourceV2WithSessionConfig].newInstance()
37+
val confs = DataSourceV2Utils.extractSessionConfigs(cs.asInstanceOf[DataSourceV2], conf)
38+
assert(confs.size == 2)
39+
assert(confs.keySet.filter(_.startsWith("spark.datasource")).size == 0)
40+
assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
41+
assert(confs.keySet.contains("foo.bar"))
42+
assert(confs.keySet.contains("whateverConfigName"))
43+
}
44+
}
45+
46+
class DataSourceV2WithSessionConfig extends SimpleDataSourceV2 with SessionConfigSupport {
47+
48+
override def keyPrefix: String = "userDefinedDataSource"
49+
}

0 commit comments

Comments
 (0)