Skip to content

Commit ec5723c

Browse files
committed
move around function withSessionConfig.
1 parent eaa6cae commit ec5723c

File tree

3 files changed

+51
-27
lines changed

3 files changed

+51
-27
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.sql
2020
import java.util.{Locale, Properties}
2121

2222
import scala.collection.JavaConverters._
23-
import scala.collection.immutable
2423

2524
import org.apache.spark.Partition
2625
import org.apache.spark.annotation.InterfaceStability
@@ -33,8 +32,8 @@ import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser
3332
import org.apache.spark.sql.execution.datasources.csv._
3433
import org.apache.spark.sql.execution.datasources.jdbc._
3534
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
35+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ConfigSupport
3636
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
37-
import org.apache.spark.sql.internal.SQLConf
3837
import org.apache.spark.sql.sources.v2._
3938
import org.apache.spark.sql.types.{StringType, StructType}
4039
import org.apache.spark.unsafe.types.UTF8String
@@ -171,7 +170,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
171170
option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
172171
}
173172

174-
import DataFrameReader._
173+
import DataSourceV2ConfigSupport._
175174

176175
/**
177176
* Loads input in as a `DataFrame`, for data sources that support multiple paths.
@@ -743,25 +742,3 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
743742
private val extraOptions = new scala.collection.mutable.HashMap[String, String]
744743

745744
}
746-
747-
private[sql] object DataFrameReader {
748-
749-
/**
750-
* Helper method to filter session configs with config key that matches at least one of the given
751-
* prefixes.
752-
*
753-
* @param cs the config key-prefixes that should be filtered.
754-
* @param conf the session conf
755-
* @return an immutable map that contains all the session configs that should be propagated to
756-
* the data source.
757-
*/
758-
def withSessionConfig(
759-
cs: ConfigSupport,
760-
conf: SQLConf): immutable.Map[String, String] = {
761-
val prefixes = cs.getConfigPrefixes
762-
require(prefixes != null, "The config key-prefixes cann't be null.")
763-
conf.getAllConfs.filterKeys { confKey =>
764-
prefixes.asScala.exists(confKey.startsWith(_))
765-
}
766-
}
767-
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import scala.collection.JavaConverters._
21+
import scala.collection.immutable
22+
23+
import org.apache.spark.sql.internal.SQLConf
24+
import org.apache.spark.sql.sources.v2.ConfigSupport
25+
26+
private[sql] object DataSourceV2ConfigSupport {
27+
28+
/**
29+
* Helper method to filter session configs with config key that matches at least one of the given
30+
* prefixes.
31+
*
32+
* @param cs the config key-prefixes that should be filtered.
33+
* @param conf the session conf
34+
* @return an immutable map that contains all the session configs that should be propagated to
35+
* the data source.
36+
*/
37+
def withSessionConfig(
38+
cs: ConfigSupport,
39+
conf: SQLConf): immutable.Map[String, String] = {
40+
val prefixes = cs.getConfigPrefixes
41+
require(prefixes != null, "The config key-prefixes cann't be null.")
42+
conf.getAllConfs.filterKeys { confKey =>
43+
prefixes.asScala.exists(confKey.startsWith(_))
44+
}
45+
}
46+
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import java.util.{ArrayList, List => JList}
2222
import test.org.apache.spark.sql.sources.v2._
2323

2424
import org.apache.spark.SparkException
25-
import org.apache.spark.sql.{AnalysisException, DataFrameReader, QueryTest, Row}
25+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
2626
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
27+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ConfigSupport
2728
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.sources.{Filter, GreaterThan}
2930
import org.apache.spark.sql.sources.v2.reader._
@@ -50,7 +51,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
5051
SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "32",
5152
SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM.key -> "10000") {
5253
val cs = classOf[DataSourceV2WithConfig].newInstance().asInstanceOf[ConfigSupport]
53-
val confs = DataFrameReader.withSessionConfig(cs, SQLConf.get)
54+
val confs = DataSourceV2ConfigSupport.withSessionConfig(cs, SQLConf.get)
5455
assert(confs.size == 3)
5556
assert(confs.keySet.filter(_.startsWith("spark.sql.parquet")).size == 2)
5657
assert(confs.keySet.filter(

0 commit comments

Comments
 (0)