Skip to content

Commit c3dd2a2

Browse files
Marcelo Vanzingatorsmile
authored andcommitted
[SPARK-22779][SQL] Resolve default values for fallback configs.
SQLConf allows some callers to define a custom default value for configs, and that complicates a little bit the handling of fallback config entries, since most of the default value resolution is hidden by the config code. This change peaks into the internals of these fallback configs to figure out the correct default value, and also returns the current human-readable default when showing the default value (e.g. through "set -v"). Author: Marcelo Vanzin <[email protected]> Closes #19974 from vanzin/SPARK-22779.
1 parent f8c7c1f commit c3dd2a2

File tree

3 files changed

+47
-7
lines changed

3 files changed

+47
-7
lines changed

core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private[spark] class OptionalConfigEntry[T](
139139
s => Some(rawValueConverter(s)),
140140
v => v.map(rawStringConverter).orNull, doc, isPublic) {
141141

142-
override def defaultValueString: String = "<undefined>"
142+
override def defaultValueString: String = ConfigEntry.UNDEFINED
143143

144144
override def readFrom(reader: ConfigReader): Option[T] = {
145145
readString(reader).map(rawValueConverter)
@@ -149,12 +149,12 @@ private[spark] class OptionalConfigEntry[T](
149149
/**
150150
* A config entry whose default value is defined by another config entry.
151151
*/
152-
private class FallbackConfigEntry[T] (
152+
private[spark] class FallbackConfigEntry[T] (
153153
key: String,
154154
alternatives: List[String],
155155
doc: String,
156156
isPublic: Boolean,
157-
private[config] val fallback: ConfigEntry[T])
157+
val fallback: ConfigEntry[T])
158158
extends ConfigEntry[T](key, alternatives,
159159
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
160160

@@ -167,6 +167,8 @@ private class FallbackConfigEntry[T] (
167167

168168
private[spark] object ConfigEntry {
169169

170+
val UNDEFINED = "<undefined>"
171+
170172
private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
171173

172174
def registerEntry(entry: ConfigEntry[_]): Unit = {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,7 +1379,7 @@ class SQLConf extends Serializable with Logging {
13791379
Option(settings.get(key)).
13801380
orElse {
13811381
// Try to use the default value
1382-
Option(sqlConfEntries.get(key)).map(_.defaultValueString)
1382+
Option(sqlConfEntries.get(key)).map { e => e.stringConverter(e.readFrom(reader)) }
13831383
}.
13841384
getOrElse(throw new NoSuchElementException(key))
13851385
}
@@ -1417,14 +1417,21 @@ class SQLConf extends Serializable with Logging {
14171417
* not set yet, return `defaultValue`.
14181418
*/
14191419
def getConfString(key: String, defaultValue: String): String = {
1420-
if (defaultValue != null && defaultValue != "<undefined>") {
1420+
if (defaultValue != null && defaultValue != ConfigEntry.UNDEFINED) {
14211421
val entry = sqlConfEntries.get(key)
14221422
if (entry != null) {
14231423
// Only verify configs in the SQLConf object
14241424
entry.valueConverter(defaultValue)
14251425
}
14261426
}
1427-
Option(settings.get(key)).getOrElse(defaultValue)
1427+
Option(settings.get(key)).getOrElse {
1428+
// If the key is not set, need to check whether the config entry is registered and is
1429+
// a fallback conf, so that we can check its parent.
1430+
sqlConfEntries.get(key) match {
1431+
case e: FallbackConfigEntry[_] => getConfString(e.fallback.key, defaultValue)
1432+
case _ => defaultValue
1433+
}
1434+
}
14281435
}
14291436

14301437
/**
@@ -1440,7 +1447,8 @@ class SQLConf extends Serializable with Logging {
14401447
*/
14411448
def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
14421449
sqlConfEntries.values.asScala.filter(_.isPublic).map { entry =>
1443-
(entry.key, getConfString(entry.key, entry.defaultValueString), entry.doc)
1450+
val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString)
1451+
(entry.key, displayValue, entry.doc)
14441452
}.toSeq
14451453
}
14461454

sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,34 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
280280

281281
spark.sessionState.conf.clear()
282282
}
283+
284+
test("SPARK-22779: correctly compute default value for fallback configs") {
285+
val fallback = SQLConf.buildConf("spark.sql.__test__.spark_22779")
286+
.fallbackConf(SQLConf.PARQUET_COMPRESSION)
287+
288+
assert(spark.sessionState.conf.getConfString(fallback.key) ===
289+
SQLConf.PARQUET_COMPRESSION.defaultValue.get)
290+
assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo")
291+
292+
val displayValue = spark.sessionState.conf.getAllDefinedConfs
293+
.find { case (key, _, _) => key == fallback.key }
294+
.map { case (_, v, _) => v }
295+
.get
296+
assert(displayValue === fallback.defaultValueString)
297+
298+
spark.sessionState.conf.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
299+
assert(spark.sessionState.conf.getConfString(fallback.key) === "gzip")
300+
301+
spark.sessionState.conf.setConf(fallback, "lzo")
302+
assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo")
303+
304+
val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
305+
.find { case (key, _, _) => key == fallback.key }
306+
.map { case (_, v, _) => v }
307+
.get
308+
assert(newDisplayValue === "lzo")
309+
310+
SQLConf.unregister(fallback)
311+
}
312+
283313
}

0 commit comments

Comments
 (0)