Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ private[spark] object SparkConf extends Logging {
*
* The alternates are used in the order defined in this map. If deprecated configs are
* present in the user's configuration, a warning is logged.
*
* TODO: consolidate it with `ConfigBuilder.withAlternative`.
*/
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ private[spark] class TypedConfigBuilder[T](

/** Creates a [[ConfigEntry]] that does not have a default value. */
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc,
parent._public)
val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter,
stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -140,17 +140,17 @@ private[spark] class TypedConfigBuilder[T](
createWithDefaultString(default.asInstanceOf[String])
} else {
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives,
transformedDefault, converter, stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
}

/** Creates a [[ConfigEntry]] with a function to determine the default value */
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, defaultFunc, converter,
stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc,
converter, stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_ (entry))
entry
}
Expand All @@ -160,8 +160,8 @@ private[spark] class TypedConfigBuilder[T](
* [[String]] and must be a valid value for the entry.
*/
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key, default, converter, stringConverter,
parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default,
converter, stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -180,6 +180,7 @@ private[spark] case class ConfigBuilder(key: String) {
private[config] var _public = true
private[config] var _doc = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
private[config] var _alternatives = List.empty[String]

def internal(): ConfigBuilder = {
_public = false
Expand All @@ -200,6 +201,11 @@ private[spark] case class ConfigBuilder(key: String) {
this
}

def withAlternative(key: String): ConfigBuilder = {
_alternatives = _alternatives :+ key
this
}

def intConf: TypedConfigBuilder[Int] = {
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
}
Expand Down Expand Up @@ -229,7 +235,7 @@ private[spark] case class ConfigBuilder(key: String) {
}

def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
new FallbackConfigEntry(key, _doc, _public, fallback)
new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
}

def regexConf: TypedConfigBuilder[Regex] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ package org.apache.spark.internal.config
*/
private[spark] abstract class ConfigEntry[T] (
val key: String,
val alternatives: List[String],
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
Expand All @@ -52,70 +53,75 @@ private[spark] abstract class ConfigEntry[T] (

def defaultValueString: String

protected def readString(reader: ConfigReader): Option[String] = {
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
}

def readFrom(reader: ConfigReader): T

def defaultValue: Option[T] = None

override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}

}

private class ConfigEntryWithDefault[T] (
key: String,
alternatives: List[String],
_defaultValue: T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {

override def defaultValue: Option[T] = Some(_defaultValue)

override def defaultValueString: String = stringConverter(_defaultValue)

def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(_defaultValue)
readString(reader).map(valueConverter).getOrElse(_defaultValue)
}
}

private class ConfigEntryWithDefaultFunction[T] (
key: String,
alternatives: List[String],
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {

override def defaultValue: Option[T] = Some(_defaultFunction())

override def defaultValueString: String = stringConverter(_defaultFunction())

def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(_defaultFunction())
readString(reader).map(valueConverter).getOrElse(_defaultFunction())
}
}

private class ConfigEntryWithDefaultString[T] (
key: String,
alternatives: List[String],
_defaultValue: String,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {

override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))

override def defaultValueString: String = _defaultValue

def readFrom(reader: ConfigReader): T = {
val value = reader.get(key).getOrElse(reader.substitute(_defaultValue))
val value = readString(reader).getOrElse(reader.substitute(_defaultValue))
valueConverter(value)
}

}


Expand All @@ -124,37 +130,39 @@ private class ConfigEntryWithDefaultString[T] (
*/
private[spark] class OptionalConfigEntry[T](
key: String,
alternatives: List[String],
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry[Option[T]](key, s => Some(rawValueConverter(s)),
extends ConfigEntry[Option[T]](key, alternatives,
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull, doc, isPublic) {

override def defaultValueString: String = "<undefined>"

override def readFrom(reader: ConfigReader): Option[T] = {
reader.get(key).map(rawValueConverter)
readString(reader).map(rawValueConverter)
}

}

/**
* A config entry whose default value is defined by another config entry.
*/
private class FallbackConfigEntry[T] (
key: String,
alternatives: List[String],
doc: String,
isPublic: Boolean,
private[config] val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
extends ConfigEntry[T](key, alternatives,
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {

override def defaultValueString: String = s"<value of ${fallback.key}>"

override def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader))
readString(reader).map(valueConverter).getOrElse(fallback.readFrom(reader))
}

}

private[spark] object ConfigEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,16 @@ private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvi
}

/**
* A config provider that only reads Spark config keys, and considers default values for known
* configs when fetching configuration values.
* A config provider that only reads Spark config keys.
*/
private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {

import ConfigEntry._

override def get(key: String): Option[String] = {
if (key.startsWith("spark.")) {
Option(conf.get(key)).orElse(defaultValueString(key))
Option(conf.get(key))
} else {
None
}
}

private def defaultValueString(key: String): Option[String] = {
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
case e: FallbackConfigEntry[_] => get(e.fallback.key)
case _ => None
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
require(!usedRefs.contains(ref), s"Circular reference in $input: $ref")

val replacement = bindings.get(prefix)
.flatMap(_.get(name))
.flatMap(getOrDefault(_, name))
.map { v => substitute(v, usedRefs + ref) }
.getOrElse(m.matched)
Regex.quoteReplacement(replacement)
Expand All @@ -102,4 +102,20 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
}
}

/**
* Gets the value of a config from the given `ConfigProvider`. If no value is found for this
* config, and the `ConfigEntry` defines this config has default value, return the default value.
*/
private def getOrDefault(conf: ConfigProvider, key: String): Option[String] = {
conf.get(key).orElse {
ConfigEntry.findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultFunction[_] => Option(e.defaultValueString)
case e: FallbackConfigEntry[_] => getOrDefault(conf, e.fallback.key)
case _ => None
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ package object config {
.createOptional
// End blacklist confs

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
.withAlternative("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.createWithDefault(10000)

// This property sets the root namespace for metrics reporting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.DynamicVariable

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

Expand All @@ -34,23 +34,14 @@ import org.apache.spark.util.Utils
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {

self =>

import LiveListenerBus._

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,31 @@ class ConfigEntrySuite extends SparkFunSuite {
data = 2
assert(conf.get(iConf) === 2)
}

test("conf entry: alternative keys") {
val conf = new SparkConf()
val iConf = ConfigBuilder(testKey("a"))
.withAlternative(testKey("b"))
.withAlternative(testKey("c"))
.intConf.createWithDefault(0)

// no key is set, return default value.
assert(conf.get(iConf) === 0)

// the primary key is set, the alternative keys are not set, return the value of primary key.
conf.set(testKey("a"), "1")
assert(conf.get(iConf) === 1)

// the primary key and alternative keys are all set, return the value of primary key.
conf.set(testKey("b"), "2")
conf.set(testKey("c"), "3")
assert(conf.get(iConf) === 1)

// the primary key is not set, (some of) the alternative keys are set, return the value of the
// first alternative key that is set.
conf.remove(testKey("a"))
assert(conf.get(iConf) === 2)
conf.remove(testKey("b"))
assert(conf.get(iConf) === 3)
}
}