Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private[spark] class TypedConfigBuilder[T](
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc,
parent._public)
parent._public, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -144,7 +144,7 @@ private[spark] class TypedConfigBuilder[T](
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
stringConverter, parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -154,7 +154,7 @@ private[spark] class TypedConfigBuilder[T](
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter,
parent._doc, parent._public)
parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_ (entry))
entry
}
Expand All @@ -166,7 +166,7 @@ private[spark] class TypedConfigBuilder[T](
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, default, converter, stringConverter,
parent._doc, parent._public)
parent._doc, parent._public, parent._version)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -186,6 +186,7 @@ private[spark] case class ConfigBuilder(key: String) {
private[config] var _prependSeparator: String = ""
private[config] var _public = true
private[config] var _doc = ""
private[config] var _version = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
private[config] var _alternatives = List.empty[String]

Expand All @@ -199,6 +200,11 @@ private[spark] case class ConfigBuilder(key: String) {
this
}

def version(v: String): ConfigBuilder = {
_version = v
this
}

/**
* Registers a callback for when the config entry is finally instantiated. Currently used by
* SQLConf to keep track of SQL configuration entries.
Expand Down Expand Up @@ -255,7 +261,7 @@ private[spark] case class ConfigBuilder(key: String) {

def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc,
_public, fallback)
_public, _version, fallback)
_onCreate.foreach(_(entry))
entry
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ package org.apache.spark.internal.config
* @param doc the documentation for the configuration
* @param isPublic if this configuration is public to the user. If it's `false`, this
* configuration is only used internally and we should not expose it to users.
* @param version the spark version when the configuration was released.
* @tparam T the value type
*/
private[spark] abstract class ConfigEntry[T] (
Expand All @@ -49,7 +50,8 @@ private[spark] abstract class ConfigEntry[T] (
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
val isPublic: Boolean) {
val isPublic: Boolean,
val version: String) {

import ConfigEntry._

Expand All @@ -74,7 +76,8 @@ private[spark] abstract class ConfigEntry[T] (
def defaultValue: Option[T] = None

override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
s"public=$isPublic, version=$version)"
}
}

Expand All @@ -87,7 +90,8 @@ private class ConfigEntryWithDefault[T] (
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
isPublic: Boolean,
version: String)
extends ConfigEntry(
key,
prependedKey,
Expand All @@ -96,7 +100,8 @@ private class ConfigEntryWithDefault[T] (
valueConverter,
stringConverter,
doc,
isPublic
isPublic,
version
) {

override def defaultValue: Option[T] = Some(_defaultValue)
Expand All @@ -117,7 +122,8 @@ private class ConfigEntryWithDefaultFunction[T] (
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
isPublic: Boolean,
version: String)
extends ConfigEntry(
key,
prependedKey,
Expand All @@ -126,7 +132,8 @@ private class ConfigEntryWithDefaultFunction[T] (
valueConverter,
stringConverter,
doc,
isPublic
isPublic,
version
) {

override def defaultValue: Option[T] = Some(_defaultFunction())
Expand All @@ -147,7 +154,8 @@ private class ConfigEntryWithDefaultString[T] (
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
isPublic: Boolean,
version: String)
extends ConfigEntry(
key,
prependedKey,
Expand All @@ -156,7 +164,8 @@ private class ConfigEntryWithDefaultString[T] (
valueConverter,
stringConverter,
doc,
isPublic
isPublic,
version
) {

override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
Expand All @@ -181,7 +190,8 @@ private[spark] class OptionalConfigEntry[T](
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean)
isPublic: Boolean,
version: String)
extends ConfigEntry[Option[T]](
key,
prependedKey,
Expand All @@ -190,7 +200,8 @@ private[spark] class OptionalConfigEntry[T](
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull,
doc,
isPublic
isPublic,
version
) {

override def defaultValueString: String = ConfigEntry.UNDEFINED
Expand All @@ -210,6 +221,7 @@ private[spark] class FallbackConfigEntry[T] (
alternatives: List[String],
doc: String,
isPublic: Boolean,
version: String,
val fallback: ConfigEntry[T])
extends ConfigEntry[T](
key,
Expand All @@ -219,7 +231,8 @@ private[spark] class FallbackConfigEntry[T] (
fallback.valueConverter,
fallback.stringConverter,
doc,
isPublic
isPublic,
version
) {

override def defaultValueString: String = s"<value of ${fallback.key}>"
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/Deploy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,59 @@ package org.apache.spark.internal.config

private[spark] object Deploy {
val RECOVERY_MODE = ConfigBuilder("spark.deploy.recoveryMode")
.version("0.8.1")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: d66c01f

.stringConf
.createWithDefault("NONE")

val RECOVERY_MODE_FACTORY = ConfigBuilder("spark.deploy.recoveryMode.factory")
.version("1.2.0")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-1830 commit ID: deefd9d
This configuration appears in branch-1.3, but the version number in the pom.xml file corresponding to the commit is 1.2.0-SNAPSHOT

.stringConf
.createWithDefault("")

val RECOVERY_DIRECTORY = ConfigBuilder("spark.deploy.recoveryDirectory")
.version("0.8.1")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: d66c01f

.stringConf
.createWithDefault("")

val ZOOKEEPER_URL = ConfigBuilder("spark.deploy.zookeeper.url")
.doc(s"When `${RECOVERY_MODE.key}` is set to ZOOKEEPER, this " +
"configuration is used to set the zookeeper URL to connect to.")
.version("0.8.1")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: d66c01f

.stringConf
.createOptional

val ZOOKEEPER_DIRECTORY = ConfigBuilder("spark.deploy.zookeeper.dir")
.version("0.8.1")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: d66c01f

.stringConf
.createOptional

val RETAINED_APPLICATIONS = ConfigBuilder("spark.deploy.retainedApplications")
.version("0.8.0")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: 46eecd1

.intConf
.createWithDefault(200)

val RETAINED_DRIVERS = ConfigBuilder("spark.deploy.retainedDrivers")
.version("1.1.0")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: 7446f5f

.intConf
.createWithDefault(200)

val REAPER_ITERATIONS = ConfigBuilder("spark.dead.worker.persistence")
.version("0.8.0")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: 46eecd1

.intConf
.createWithDefault(15)

val MAX_EXECUTOR_RETRIES = ConfigBuilder("spark.deploy.maxExecutorRetries")
.version("1.6.3")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-16956, commit ID: ace458f

.intConf
.createWithDefault(10)

val SPREAD_OUT_APPS = ConfigBuilder("spark.deploy.spreadOut")
.version("0.6.1")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: bb2b9ff

.booleanConf
.createWithDefault(true)

val DEFAULT_CORES = ConfigBuilder("spark.deploy.defaultCores")
.version("0.9.0")
Copy link
Contributor Author

@beliefer beliefer Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No JIRA ID, commit ID: d8bcc8e

.intConf
.createWithDefault(Int.MaxValue)

Expand Down
5 changes: 4 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2597,22 +2597,25 @@ Spark subsystems.
### Deploy

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.deploy.recoveryMode</code></td>
<td>NONE</td>
<td>The recovery mode setting to recover submitted Spark jobs with cluster mode when it failed and relaunches.
This is only applicable for cluster mode when running with Standalone or Mesos.</td>
<td>0.8.1</td>
</tr>
<tr>
<td><code>spark.deploy.zookeeper.url</code></td>
<td>None</td>
<td>When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this configuration is used to set the zookeeper URL to connect to.</td>
<td>0.8.1</td>
</tr>
<tr>
<td><code>spark.deploy.zookeeper.dir</code></td>
<td>None</td>
<td>When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this configuration is used to set the zookeeper directory to store recovery state.</td>
<td>0.8.1</td>
</tr>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2857,10 +2857,10 @@ class SQLConf extends Serializable with Logging {
* Return all the configuration definitions that have been defined in [[SQLConf]]. Each
* definition contains key, defaultValue and doc.
*/
def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
def getAllDefinedConfs: Seq[(String, String, String, String)] = sqlConfEntries.synchronized {
sqlConfEntries.values.asScala.filter(_.isPublic).map { entry =>
val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString)
(entry.key, displayValue, entry.doc)
(entry.key, displayValue, entry.doc, entry.version)
}.toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[sql] object PythonSQLUtils {
FunctionRegistry.functionSet.flatMap(f => FunctionRegistry.builtin.lookupFunction(f)).toArray
}

def listSQLConfigs(): Array[(String, String, String)] = {
def listSQLConfigs(): Array[(String, String, String, String)] = {
val conf = new SQLConf()
// Py4J doesn't seem to translate Seq well, so we convert to an Array.
conf.getAllDefinedConfs.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,19 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
case Some(("-v", None)) =>
val runFunc = (sparkSession: SparkSession) => {
sparkSession.sessionState.conf.getAllDefinedConfs.sorted.map {
case (key, defaultValue, doc) =>
Row(key, Option(defaultValue).getOrElse("<undefined>"), doc)
case (key, defaultValue, doc, version) =>
Row(
key,
Option(defaultValue).getOrElse("<undefined>"),
doc,
Option(version).getOrElse("<unknown>"))
}
}
val schema = StructType(
StructField("key", StringType, nullable = false) ::
StructField("value", StringType, nullable = false) ::
StructField("meaning", StringType, nullable = false) :: Nil)
StructField("meaning", StringType, nullable = false) ::
StructField("Since version", StringType, nullable = false) :: Nil)
(schema.toAttributes, runFunc)

// Queries the deprecated "mapred.reduce.tasks" property.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo")

val displayValue = spark.sessionState.conf.getAllDefinedConfs
.find { case (key, _, _) => key == fallback.key }
.map { case (_, v, _) => v }
.find { case (key, _, _, _) => key == fallback.key }
.map { case (_, v, _, _) => v }
.get
assert(displayValue === fallback.defaultValueString)

Expand All @@ -297,8 +297,8 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo")

val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs
.find { case (key, _, _) => key == fallback.key }
.map { case (_, v, _) => v }
.find { case (key, _, _, _) => key == fallback.key }
.map { case (_, v, _, _) => v }
.get
assert(newDisplayValue === "lzo")

Expand Down
Loading