Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ private[spark] object SparkConf extends Logging {
*
* The alternates are used in the order defined in this map. If deprecated configs are
* present in the user's configuration, a warning is logged.
*
* TODO: consolidate it with `ConfigBuilder.withAlternative`.
*/
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ private[spark] class TypedConfigBuilder[T](

/** Creates a [[ConfigEntry]] that does not have a default value. */
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, converter, stringConverter, parent._doc,
parent._public)
val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter,
stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -140,17 +140,17 @@ private[spark] class TypedConfigBuilder[T](
createWithDefaultString(default.asInstanceOf[String])
} else {
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives,
transformedDefault, converter, stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
}

/** Creates a [[ConfigEntry]] with a function to determine the default value */
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, defaultFunc, converter,
stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc,
converter, stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_ (entry))
entry
}
Expand All @@ -160,8 +160,8 @@ private[spark] class TypedConfigBuilder[T](
* [[String]] and must be a valid value for the entry.
*/
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key, default, converter, stringConverter,
parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default,
converter, stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -180,6 +180,7 @@ private[spark] case class ConfigBuilder(key: String) {
private[config] var _public = true
private[config] var _doc = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
private[config] var _alternatives = List.empty[String]

def internal(): ConfigBuilder = {
_public = false
Expand All @@ -200,6 +201,11 @@ private[spark] case class ConfigBuilder(key: String) {
this
}

def withAlternative(key: String): ConfigBuilder = {
_alternatives = _alternatives :+ key
this
}

def intConf: TypedConfigBuilder[Int] = {
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
}
Expand Down Expand Up @@ -229,7 +235,7 @@ private[spark] case class ConfigBuilder(key: String) {
}

def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
new FallbackConfigEntry(key, _doc, _public, fallback)
new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
}

def regexConf: TypedConfigBuilder[Regex] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ package org.apache.spark.internal.config
*/
private[spark] abstract class ConfigEntry[T] (
val key: String,
val alternatives: List[String],
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
Expand All @@ -52,70 +53,75 @@ private[spark] abstract class ConfigEntry[T] (

def defaultValueString: String

protected def readString(reader: ConfigReader): Option[String] = {
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
}

def readFrom(reader: ConfigReader): T

def defaultValue: Option[T] = None

override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}

}

private class ConfigEntryWithDefault[T] (
key: String,
alternatives: List[String],
_defaultValue: T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {

override def defaultValue: Option[T] = Some(_defaultValue)

override def defaultValueString: String = stringConverter(_defaultValue)

def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(_defaultValue)
readString(reader).map(valueConverter).getOrElse(_defaultValue)
}
}

private class ConfigEntryWithDefaultFunction[T] (
key: String,
alternatives: List[String],
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {

override def defaultValue: Option[T] = Some(_defaultFunction())

override def defaultValueString: String = stringConverter(_defaultFunction())

def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(_defaultFunction())
readString(reader).map(valueConverter).getOrElse(_defaultFunction())
}
}

private class ConfigEntryWithDefaultString[T] (
key: String,
alternatives: List[String],
_defaultValue: String,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {
extends ConfigEntry(key, alternatives, valueConverter, stringConverter, doc, isPublic) {

override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))

override def defaultValueString: String = _defaultValue

def readFrom(reader: ConfigReader): T = {
val value = reader.get(key).getOrElse(reader.substitute(_defaultValue))
val value = readString(reader).getOrElse(reader.substitute(_defaultValue))
valueConverter(value)
}

}


Expand All @@ -124,37 +130,39 @@ private class ConfigEntryWithDefaultString[T] (
*/
private[spark] class OptionalConfigEntry[T](
key: String,
alternatives: List[String],
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry[Option[T]](key, s => Some(rawValueConverter(s)),
extends ConfigEntry[Option[T]](key, alternatives,
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull, doc, isPublic) {

override def defaultValueString: String = "<undefined>"

override def readFrom(reader: ConfigReader): Option[T] = {
reader.get(key).map(rawValueConverter)
readString(reader).map(rawValueConverter)
}

}

/**
* A config entry whose default value is defined by another config entry.
*/
private class FallbackConfigEntry[T] (
key: String,
alternatives: List[String],
doc: String,
isPublic: Boolean,
private[config] val fallback: ConfigEntry[T])
extends ConfigEntry[T](key, fallback.valueConverter, fallback.stringConverter, doc, isPublic) {
extends ConfigEntry[T](key, alternatives,
fallback.valueConverter, fallback.stringConverter, doc, isPublic) {

override def defaultValueString: String = s"<value of ${fallback.key}>"

override def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(fallback.readFrom(reader))
readString(reader).map(valueConverter).getOrElse(fallback.readFrom(reader))
}

}

private[spark] object ConfigEntry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,16 @@ private[spark] class MapProvider(conf: JMap[String, String]) extends ConfigProvi
}

/**
* A config provider that only reads Spark config keys, and considers default values for known
* configs when fetching configuration values.
* A config provider that only reads Spark config keys.
*/
private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends ConfigProvider {

import ConfigEntry._

override def get(key: String): Option[String] = {
if (key.startsWith("spark.")) {
Option(conf.get(key)).orElse(defaultValueString(key))
Option(conf.get(key))
} else {
None
}
}

private def defaultValueString(key: String): Option[String] = {
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
case e: FallbackConfigEntry[_] => get(e.fallback.key)
case _ => None
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
require(!usedRefs.contains(ref), s"Circular reference in $input: $ref")

val replacement = bindings.get(prefix)
.flatMap(_.get(name))
.flatMap(getOrDefault(_, name))
.map { v => substitute(v, usedRefs + ref) }
.getOrElse(m.matched)
Regex.quoteReplacement(replacement)
Expand All @@ -102,4 +102,16 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
}
}

private def getOrDefault(conf: ConfigProvider, key: String): Option[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comment for this method.

conf.get(key).orElse {
ConfigEntry.findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultFunction[_] => Option(e.defaultValueString)
case e: FallbackConfigEntry[_] => getOrDefault(conf, e.fallback.key)
case _ => None
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ package object config {
.createOptional
// End blacklist confs

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
.withAlternative("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.createWithDefault(10000)

// This property sets the root namespace for metrics reporting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.DynamicVariable

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

Expand All @@ -34,23 +34,14 @@ import org.apache.spark.util.Utils
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {

self =>

import LiveListenerBus._

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,31 @@ class ConfigEntrySuite extends SparkFunSuite {
data = 2
assert(conf.get(iConf) === 2)
}

test("conf entry: alternative keys") {
val conf = new SparkConf()
val iConf = ConfigBuilder(testKey("a"))
.withAlternative(testKey("b"))
.withAlternative(testKey("c"))
.intConf.createWithDefault(0)

// no key is set, return default value.
assert(conf.get(iConf) === 0)

// the primary key is set, the alternative keys are not set, return the value of primary key.
conf.set(testKey("a"), "1")
assert(conf.get(iConf) === 1)

// the primary key and alternative keys are all set, return the value of primary key.
conf.set(testKey("b"), "2")
conf.set(testKey("c"), "3")
assert(conf.get(iConf) === 1)

// the primary key is not set, (some of) the alternative keys are set, return the value of the
// first alternative key that is set.
conf.remove(testKey("a"))
assert(conf.get(iConf) === 2)
conf.remove(testKey("b"))
assert(conf.get(iConf) === 3)
}
}