Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
* - This will throw an exception is the config is not optional and the value is not set.
*/
private[spark] def get[T](entry: ConfigEntry[T]): T = {
entry.readFrom(this)
entry.readFrom(settings, getenv)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[spark] class TypedConfigBuilder[T](
/** Creates a [[ConfigEntry]] that does not have a default value. */
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc,
parent._public)
parent._public, parent._expandVars)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -118,7 +118,7 @@ private[spark] class TypedConfigBuilder[T](
def createWithDefault(default: T): ConfigEntry[T] = {
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
stringConverter, parent._doc, parent._public, parent._expandVars)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -130,7 +130,7 @@ private[spark] class TypedConfigBuilder[T](
def createWithDefaultString(default: String): ConfigEntry[T] = {
val typedDefault = converter(default)
val entry = new ConfigEntryWithDefault[T](parent.key, typedDefault, converter, stringConverter,
parent._doc, parent._public)
parent._doc, parent._public, parent._expandVars)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -149,6 +149,7 @@ private[spark] case class ConfigBuilder(key: String) {
private[config] var _public = true
private[config] var _doc = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
private[config] var _expandVars = false

def internal(): ConfigBuilder = {
_public = false
Expand All @@ -160,6 +161,30 @@ private[spark] case class ConfigBuilder(key: String) {
this
}

/**
* Enable variable expansion in config values. If the config value contains variable references
* of the form "${prefix:variableName}", the reference will be replaced with the value of the
* variable depending on the prefix. The prefix can be one of:
*
* - no prefix: looks for the value in the Spark configuration
* - system: looks for the value in the system properties
* - env: looks for the value in the environment
*
* So referencing "${spark.master}" will look for the value of "spark.master" in the Spark
* configuration, while referencing "${env:MASTER}" will read the value from the "MASTER"
* environment variable.
*
* For known Spark configuration keys (i.e. those created using `ConfigBuilder`), references
* will also consider the default value when it exists.
*
* If the reference cannot be resolved, the original string will be retained. Variable expansion
* only applies to user-provided values, not to default values.
*/
def withVariableExpansion(): ConfigBuilder = {
_expandVars = true
this
}

/**
* Registers a callback for when the config entry is finally instantiated. Currently used by
* SQLConf to keep track of SQL configuration entries.
Expand Down Expand Up @@ -198,7 +223,10 @@ private[spark] case class ConfigBuilder(key: String) {
}

def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
new FallbackConfigEntry(key, _doc, _public, fallback)
new FallbackConfigEntry(key, _doc, _public, fallback, _expandVars)
}

/** A path config entry has type String and allows variable expansion. */
def pathConf: TypedConfigBuilder[String] = withVariableExpansion.stringConf

}
119 changes: 104 additions & 15 deletions core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.internal.config

import java.util.{Map => JMap}

import org.apache.spark.SparkConf

/**
Expand All @@ -40,19 +42,32 @@ private[spark] abstract class ConfigEntry[T] (
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
val isPublic: Boolean) {
val isPublic: Boolean,
private val expandVars: Boolean) {

import ConfigEntry._

registerEntry(this)

def defaultValueString: String

def readFrom(conf: SparkConf): T
def readFrom(conf: JMap[String, String], getenv: String => String): T

// This is used by SQLConf, since it doesn't use SparkConf to store settings and thus cannot
// use readFrom().
def defaultValue: Option[T] = None

override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}

protected def readAndExpand(
conf: JMap[String, String],
getenv: String => String,
usedRefs: Set[String] = Set()): Option[String] = {
Option(conf.get(key)).map { value =>
if (expandVars) expand(value, conf, getenv, usedRefs) else value
}
}

}

private class ConfigEntryWithDefault[T] (
Expand All @@ -61,15 +76,16 @@ private class ConfigEntryWithDefault[T] (
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
isPublic: Boolean,
private val expandVars: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic, expandVars) {

override def defaultValue: Option[T] = Some(_defaultValue)

override def defaultValueString: String = stringConverter(_defaultValue)

override def readFrom(conf: SparkConf): T = {
conf.getOption(key).map(valueConverter).getOrElse(_defaultValue)
def readFrom(conf: JMap[String, String], getenv: String => String): T = {
readAndExpand(conf, getenv).map(valueConverter).getOrElse(_defaultValue)
}

}
Expand All @@ -82,13 +98,16 @@ private[spark] class OptionalConfigEntry[T](
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean)
isPublic: Boolean,
private val expandVars: Boolean)
extends ConfigEntry[Option[T]](key, s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull, doc, isPublic) {
v => v.map(rawStringConverter).orNull, doc, isPublic, expandVars) {

override def defaultValueString: String = "<undefined>"

override def readFrom(conf: SparkConf): Option[T] = conf.getOption(key).map(rawValueConverter)
override def readFrom(conf: JMap[String, String], getenv: String => String): Option[T] = {
readAndExpand(conf, getenv).map(rawValueConverter)
}

}

Expand All @@ -99,13 +118,83 @@ private class FallbackConfigEntry[T] (
key: String,
doc: String,
isPublic: Boolean,
private val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
private[config] val fallback: ConfigEntry[T],
private val expandVars: Boolean)
extends ConfigEntry[T](
key,
fallback.valueConverter,
fallback.stringConverter,
doc,
isPublic,
expandVars) {

override def defaultValueString: String = s"<value of ${fallback.key}>"

override def readFrom(conf: SparkConf): T = {
conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
override def readFrom(conf: JMap[String, String], getenv: String => String): T = {
Option(conf.get(key)).map(valueConverter).getOrElse(fallback.readFrom(conf, getenv))
}

}

private object ConfigEntry {

private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()

private val REF_RE = "\\$\\{(?:(\\w+?):)?(\\S+?)\\}".r.pattern

def registerEntry(entry: ConfigEntry[_]): Unit = {
val existing = knownConfigs.putIfAbsent(entry.key, entry)
require(existing == null, s"Config entry ${entry.key} already registered!")
}

def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)

/**
* Expand the `value` according to the rules explained in `ConfigBuilder.withVariableExpansion`.
*/
def expand(
value: String,
conf: JMap[String, String],
getenv: String => String,
usedRefs: Set[String]): String = {
val matcher = REF_RE.matcher(value)
val result = new StringBuilder()
var end = 0

while (end < value.length() && matcher.find(end)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use Regex#replaceAllIn here instead of a loop.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks interesting, let me take a look.

val prefix = matcher.group(1)
val name = matcher.group(2)

result.append(value.substring(end, matcher.start()))

val replacement = prefix match {
case null =>
require(!usedRefs.contains(name), s"Circular reference in $value: $name")
Option(findEntry(name))
.flatMap(_.readAndExpand(conf, getenv, usedRefs = usedRefs + name))
.orElse(Option(conf.get(name)))
.orElse(defaultValueString(name))
case "system" => sys.props.get(name)
case "env" => Option(getenv(name))
case _ => throw new IllegalArgumentException(s"Invalid prefix: $prefix")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have to take a look at this. Throwing might actually break some existing code in SparkHadoopUtil.

}

result.append(replacement.getOrElse(matcher.group(0)))
end = matcher.end()
}

if (end < value.length()) {
result.append(value.substring(end, value.length()))
}
result.toString()
}

private def defaultValueString(key: String): Option[String] = {
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Some(e.defaultValueString)
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
case _ => None
}
}

}
Loading