Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[spark] class OptionalConfigEntry[T](
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull, doc, isPublic) {

override def defaultValueString: String = "<undefined>"
override def defaultValueString: String = ConfigEntry.UNDEFINED

override def readFrom(reader: ConfigReader): Option[T] = {
readString(reader).map(rawValueConverter)
Expand All @@ -149,12 +149,12 @@ private[spark] class OptionalConfigEntry[T](
/**
* A config entry whose default value is defined by another config entry.
*/
private class FallbackConfigEntry[T] (
private[spark] class FallbackConfigEntry[T] (
key: String,
alternatives: List[String],
doc: String,
isPublic: Boolean,
private[config] val fallback: ConfigEntry[T])
val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, alternatives,
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {

Expand All @@ -167,6 +167,8 @@ private class FallbackConfigEntry[T] (

private[spark] object ConfigEntry {

val UNDEFINED = "<undefined>"

private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()

def registerEntry(entry: ConfigEntry[_]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ class SQLConf extends Serializable with Logging {
Option(settings.get(key)).
orElse {
// Try to use the default value
Option(sqlConfEntries.get(key)).map(_.defaultValueString)
Option(sqlConfEntries.get(key)).map { e => e.stringConverter(e.readFrom(reader)) }
}.
getOrElse(throw new NoSuchElementException(key))
}
Expand Down Expand Up @@ -1417,14 +1417,21 @@ class SQLConf extends Serializable with Logging {
* not set yet, return `defaultValue`.
*/
def getConfString(key: String, defaultValue: String): String = {
if (defaultValue != null && defaultValue != "<undefined>") {
if (defaultValue != null && defaultValue != ConfigEntry.UNDEFINED) {
val entry = sqlConfEntries.get(key)
if (entry != null) {
// Only verify configs in the SQLConf object
entry.valueConverter(defaultValue)
}
}
Option(settings.get(key)).getOrElse(defaultValue)
Option(settings.get(key)).getOrElse {
// If the key is not set, need to check whether the config entry is registered and is
// a fallback conf, so that we can check its parent.
sqlConfEntries.get(key) match {
case e: FallbackConfigEntry[_] => getConfString(e.fallback.key, defaultValue)
case _ => defaultValue
}
}
}

/**
Expand All @@ -1440,7 +1447,8 @@ class SQLConf extends Serializable with Logging {
*/
def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
sqlConfEntries.values.asScala.filter(_.isPublic).map { entry =>
(entry.key, getConfString(entry.key, entry.defaultValueString), entry.doc)
val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString)
(entry.key, displayValue, entry.doc)
}.toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,34 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {

spark.sessionState.conf.clear()
}

test("SPARK-22779: correctly compute default value for fallback configs") {
val fallback = SQLConf.buildConf("spark.sql.__test__.spark_22779")
.fallbackConf(SQLConf.PARQUET_COMPRESSION)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SQLConf, all our conf are TypedConfigBuilder, instead of ConfigBuilder. The type-safe ConfigBuilder is unable to call fallbackConf , right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea sounds like this PR is fixing a non-existing issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true no existing config uses this, but Reynold was trying to fix this so my guess is he's trying to add one and it wasn't working as expected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at least this is good for renaming. In the future, if we rename the SQLConf, we can use this mechanism.


assert(spark.sessionState.conf.getConfString(fallback.key) ===
SQLConf.PARQUET_COMPRESSION.defaultValue.get)
assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo")

val displayValue = spark.sessionState.conf.getAllDefinedConfs
.find { case (key, _, _) => key == fallback.key }
.map { case (_, v, _) => v }
.get
assert(displayValue === fallback.defaultValueString)

spark.sessionState.conf.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
assert(spark.sessionState.conf.getConfString(fallback.key) === "gzip")

spark.sessionState.conf.setConf(fallback, "lzo")
assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo")

val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
.find { case (key, _, _) => key == fallback.key }
.map { case (_, v, _) => v }
.get
assert(newDisplayValue === "lzo")

SQLConf.unregister(fallback)
}

}