Skip to content
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;

import java.util.List;
import java.util.Map;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* propagate session configs with chosen key-prefixes to the particular data source.
Copy link
Contributor

@cloud-fan cloud-fan Dec 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propagate session configs with the specified key-prefix to all data source operations in this session

*/
@InterfaceStability.Evolving
public interface ConfigSupport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: SessionConfigSupport


/**
* Name for the specified data source, will extract all session configs that starts with
* `spark.datasource.$name`, turn `spark.datasource.$name.xxx -> yyy` into
* `xxx -> yyy`, and propagate them to all data source operations in this session.
*/
String name();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about keyPrefix

}
16 changes: 13 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser
import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ConfigSupport
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -169,6 +170,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
option("path", path).load(Seq.empty: _*) // force invocation of `load(...varargs...)`
}

import DataSourceV2ConfigSupport._

/**
* Loads input in as a `DataFrame`, for data sources that support multiple paths.
* Only works if the source is a HadoopFsRelationProvider.
Expand All @@ -184,9 +187,16 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

val cls = DataSource.lookupDataSource(source)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val options = new DataSourceV2Options(extraOptions.asJava)
val dataSource = cls.newInstance()
val options = dataSource match {
case cs: ConfigSupport =>
val confs = withSessionConfig(cs.name, sparkSession.sessionState.conf)
new DataSourceV2Options((confs ++ extraOptions).asJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened if they have duplicate names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Should the confs in the extraOptions have a higher priority? WDYT @cloud-fan ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea extraOptions needs higher priority.

case _ =>
new DataSourceV2Options(extraOptions.asJava)
}

val reader = (cls.newInstance(), userSpecifiedSchema) match {
val reader = (dataSource, userSpecifiedSchema) match {
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import java.util.regex.Pattern

import scala.collection.JavaConverters._
import scala.collection.immutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.ConfigSupport

private[sql] object DataSourceV2ConfigSupport extends Logging {
Copy link
Contributor

@cloud-fan cloud-fan Dec 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about just naming it Utils? We may add more functions in it for other purpose in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rename it to DataSourceV2Utils, since the name Utils may conflict with org.apache.spark.util.Utils?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good idea


/**
* Helper method that turns session configs with config keys that start with
* `spark.datasource.$name` into k/v pairs, the k/v pairs will be used to create data source
* options.
* A session config `spark.datasource.$name.xxx -> yyy` will be transformed into
* `xxx -> yyy`.
*
* @param name the data source name
* @param conf the session conf
* @return an immutable map that contains all the extracted and transformed k/v pairs.
*/
def withSessionConfig(
name: String,
conf: SQLConf): immutable.Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Scala Map by default refers to immutable.Map

require(name != null, "The data source name can't be null.")

val pattern = Pattern.compile(s"spark\\.datasource\\.$name\\.(.*)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be a member variable to avoid re-compile it everytime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s"^spark..." to make sure the matched string starts with spark...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regex contains $keyPrefix, so we are not able to remove it from the method.

val filteredConfigs = conf.getAllConfs.filterKeys { confKey =>
confKey.startsWith(s"spark.datasource.$name")
}
filteredConfigs.map { entry =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a flatMap to just loop the input once?

val m = pattern.matcher(entry._1)
require(m.matches() && m.groupCount() > 0, s"Fail in matching ${entry._1} with $pattern.")
(m.group(1), entry._2)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import test.org.apache.spark.sql.sources.v2._
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ConfigSupport
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{Filter, GreaterThan}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.test.SharedSQLContext
Expand All @@ -32,6 +34,8 @@ import org.apache.spark.sql.types.StructType
class DataSourceV2Suite extends QueryTest with SharedSQLContext {
import testImplicits._

private val dsName = "userDefinedDataSource"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: keyPrefix


test("simplest implementation") {
Seq(classOf[SimpleDataSourceV2], classOf[JavaSimpleDataSourceV2]).foreach { cls =>
withClue(cls.getName) {
Expand All @@ -43,6 +47,22 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
}

test("simple implementation with config support") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't need to be here, but just a new test suite for the new Utils

// Only match configs with keys start with "spark.datasource.${dsName}".
withSQLConf(s"spark.datasource.$dsName.foo.bar" -> "false",
s"spark.datasource.$dsName.whateverConfigName" -> "123",
s"spark.sql.$dsName.config.name" -> "false",
s"spark.datasource.another.config.name" -> "123") {
val cs = classOf[DataSourceV2WithConfig].newInstance().asInstanceOf[ConfigSupport]
val confs = DataSourceV2ConfigSupport.withSessionConfig(cs.name, SQLConf.get)
assert(confs.size == 2)
assert(confs.keySet.filter(_.startsWith("spark.datasource")).size == 0)
assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
assert(confs.keySet.contains("foo.bar"))
assert(confs.keySet.contains("whateverConfigName"))
}
}

test("advanced implementation") {
Seq(classOf[AdvancedDataSourceV2], classOf[JavaAdvancedDataSourceV2]).foreach { cls =>
withClue(cls.getName) {
Expand Down Expand Up @@ -179,7 +199,10 @@ class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with DataReader
override def close(): Unit = {}
}

class DataSourceV2WithConfig extends SimpleDataSourceV2 with ConfigSupport {

override def name: String = "userDefinedDataSource"
}

class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {

Expand Down