Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -8641,11 +8641,6 @@
"duration() called on unfinished task"
]
},
"_LEGACY_ERROR_TEMP_3027" : {
"message" : [
"Unrecognized <schedulerModeProperty>: <schedulingModeConf>"
]
},
"_LEGACY_ERROR_TEMP_3028" : {
"message" : [
"<errorMsg>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ private object ConfigHelpers {
}
}

def toEnum[E <: Enum[E]](s: String, enumClass: Class[E], key: String): E = {
enumClass.getEnumConstants.find(_.name().equalsIgnoreCase(s.trim)) match {
case Some(enum) => enum
case None =>
throw new IllegalArgumentException(
s"$key should be one of ${enumClass.getEnumConstants.mkString(", ")}, but was $s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have an error condition for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for the scala enum

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have a separate PR for this to make all the condition-less errors uniform

}
}

def stringToSeq[T](str: String, converter: String => T): Seq[T] = {
SparkStringUtils.stringToSeq(str).map(converter)
}
Expand Down Expand Up @@ -287,6 +296,11 @@ private[spark] case class ConfigBuilder(key: String) {
new TypedConfigBuilder(this, toEnum(_, e, key))
}

def enumConf[E <: Enum[E]](e: Class[E]): TypedConfigBuilder[E] = {
checkPrependConfig
new TypedConfigBuilder(this, toEnum(_, e, key))
}

def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.network.crypto.AuthServerBootstrap
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.ExternalBlockHandler
import org.apache.spark.network.shuffledb.DBBackend
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.{ShutdownHookManager, Utils}

Expand Down Expand Up @@ -86,11 +85,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
protected def newShuffleBlockHandler(conf: TransportConf): ExternalBlockHandler = {
if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND)
val dbBackend = DBBackend.byName(shuffleDBName)
logInfo(log"Use ${MDC(SHUFFLE_DB_BACKEND_NAME, dbBackend.name())} as the implementation of " +
logInfo(
log"Use ${MDC(SHUFFLE_DB_BACKEND_NAME, shuffleDBName.name())} as the implementation of " +
log"${MDC(SHUFFLE_DB_BACKEND_KEY, config.SHUFFLE_SERVICE_DB_BACKEND.key)}")
new ExternalBlockHandler(conf,
findRegisteredExecutorsDBFile(dbBackend.fileName(registeredExecutorsDB)))
findRegisteredExecutorsDBFile(shuffleDBName.fileName(registeredExecutorsDB)))
} else {
new ExternalBlockHandler(conf, null)
}
Expand Down
13 changes: 0 additions & 13 deletions core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,19 +267,6 @@ private[spark] object SparkCoreErrors {
new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3026")
}

def unrecognizedSchedulerModePropertyError(
schedulerModeProperty: String,
schedulingModeConf: String): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_3027",
messageParameters = Map(
"schedulerModeProperty" -> schedulerModeProperty,
"schedulingModeConf" -> schedulingModeConf
),
cause = null
)
}

def sparkError(errorMsg: String): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_3028",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,10 +807,8 @@ package object config {
.doc("Specifies a disk-based store used in shuffle service local db. " +
"ROCKSDB or LEVELDB (deprecated).")
.version("3.4.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(DBBackend.values.map(_.toString).toSet)
.createWithDefault(DBBackend.ROCKSDB.name)
.enumConf(classOf[DBBackend])
.createWithDefault(DBBackend.ROCKSDB)

private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337)
Expand Down Expand Up @@ -2295,9 +2293,8 @@ package object config {
private[spark] val SCHEDULER_MODE =
ConfigBuilder("spark.scheduler.mode")
.version("0.8.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.createWithDefault(SchedulingMode.FIFO.toString)
.enumConf(SchedulingMode)
.createWithDefault(SchedulingMode.FIFO)

private[spark] val SCHEDULER_REVIVE_INTERVAL =
ConfigBuilder("spark.scheduler.revive.interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext
log"${MDC(LogKeys.FILE_NAME, DEFAULT_SCHEDULER_FILE)}")
Some((is, DEFAULT_SCHEDULER_FILE))
} else {
val schedulingMode = SchedulingMode.withName(sc.conf.get(SCHEDULER_MODE))
val schedulingMode = sc.conf.get(SCHEDULER_MODE)
rootPool.addSchedulable(new Pool(
DEFAULT_POOL_NAME, schedulingMode, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
logInfo(log"Fair scheduler configuration not found, created default pool: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,7 @@ private[spark] class TaskSchedulerImpl(

private var schedulableBuilder: SchedulableBuilder = null
// default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE)
val schedulingMode: SchedulingMode =
try {
SchedulingMode.withName(schedulingModeConf)
} catch {
case e: java.util.NoSuchElementException =>
throw SparkCoreErrors.unrecognizedSchedulerModePropertyError(SCHEDULER_MODE_PROPERTY,
schedulingModeConf)
}
val schedulingMode: SchedulingMode = conf.get(SCHEDULER_MODE)

val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ private[spark] object UIWorkloadGenerator {
val conf = new SparkConf().setMaster(args(0)).setAppName("Spark UI tester")

val schedulingMode = SchedulingMode.withName(args(1))
if (schedulingMode == SchedulingMode.FAIR) {
conf.set(SCHEDULER_MODE, "FAIR")
}
conf.set(SCHEDULER_MODE, schedulingMode)
val nJobSet = args(2).toInt
val sc = new SparkContext(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}

test("local mode, FIFO scheduler") {
val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO")
val conf = new SparkConf().set(SCHEDULER_MODE.key, "FIFO")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, is this a breaking change, @yaooqinn ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that I missed this in Scala version PR. I'm curious if we can avoid this kind of programing behavior change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dongjoon-hyun,set with ConfigEntry is in private scope

sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
Expand All @@ -68,7 +68,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}

test("local mode, fair scheduler") {
val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR")
val conf = new SparkConf().set(SCHEDULER_MODE.key, "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext("local[2]", "test", conf)
Expand All @@ -79,7 +79,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}

test("cluster mode, FIFO scheduler") {
val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO")
val conf = new SparkConf().set(SCHEDULER_MODE.key, "FIFO")
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
testCount()
testTake()
Expand All @@ -88,7 +88,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
}

test("cluster mode, fair scheduler") {
val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR")
val conf = new SparkConf().set(SCHEDULER_MODE.key, "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath)
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.network.shuffledb.DBBackend
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.SparkConfWithEnv

Expand Down Expand Up @@ -389,7 +390,7 @@ class ConfigEntrySuite extends SparkFunSuite {
}


test("SPARK-51874: Add Enum support to ConfigBuilder") {
test("SPARK-51874: Add Scala Enumeration support to ConfigBuilder") {
object MyTestEnum extends Enumeration {
val X, Y, Z = Value
}
Expand All @@ -408,4 +409,20 @@ class ConfigEntrySuite extends SparkFunSuite {
}
assert(e.getMessage === s"${enumConf.key} should be one of X, Y, Z, but was A")
}

test("SPARK-51896: Add Java enum support to ConfigBuilder") {
val conf = new SparkConf()
val enumConf = ConfigBuilder("spark.test.java.enum.key")
.enumConf(classOf[DBBackend])
.createWithDefault(DBBackend.LEVELDB)
assert(conf.get(enumConf) === DBBackend.LEVELDB)
conf.set(enumConf, DBBackend.ROCKSDB)
assert(conf.get(enumConf) === DBBackend.ROCKSDB)
val e = intercept[IllegalArgumentException] {
conf.set(enumConf.key, "ANYDB")
conf.get(enumConf)
}
assert(e.getMessage ===
s"${enumConf.key} should be one of ${DBBackend.values.mkString(", ")}, but was ANYDB")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -415,21 +415,14 @@ object SQLConf {

private val VALID_LOG_LEVELS: Array[String] = Level.values.map(_.toString)

private def isValidLogLevel(level: String): Boolean =
VALID_LOG_LEVELS.contains(level.toUpperCase(Locale.ROOT))

val PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.planChangeLog.level")
.internal()
.doc("Configures the log level for logging the change from the original plan to the new " +
s"plan after a rule or batch is applied. The value can be " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.version("3.1.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(isValidLogLevel,
"Invalid value for 'spark.sql.planChangeLog.level'. Valid values are " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.createWithDefault("trace")
.enumConf(classOf[Level])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Level is from log4j?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's org.slf4j.event.Level

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If slf4j is no longer used in the future, will there be a risk of introducing breaking changes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, there is no such risk since these changes are all targeting private APIs.

Assuming removing slf4j does break stuff here, it will break w/ or w/o this PR, see

https://github.com/apache/spark/pull/50691/files#diff-13c5b65678b327277c68d17910ae93629801af00117a0e3da007afd95b6c6764R5933

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

.createWithDefault(Level.TRACE)

val PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.planChangeLog.rules")
.internal()
Expand Down Expand Up @@ -461,12 +454,8 @@ object SQLConf {
"the resolved expression tree in the single-pass bottom-up Resolver. The value can be " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.version("4.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(isValidLogLevel,
"Invalid value for 'spark.sql.expressionTreeChangeLog.level'. Valid values are " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.createWithDefault("trace")
.enumConf(classOf[Level])
.createWithDefault(Level.TRACE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we get if we get this conf from PySpark (or SparkR)?

Copy link
Member Author

@yaooqinn yaooqinn Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set/get as string is still available due to the underlying T=>str/str=>T conversion


val LIGHTWEIGHT_PLAN_CHANGE_VALIDATION = buildConf("spark.sql.lightweightPlanChangeValidation")
.internal()
Expand Down Expand Up @@ -838,12 +827,8 @@ object SQLConf {
.doc("Configures the log level for adaptive execution logging of plan changes. The value " +
s"can be ${VALID_LOG_LEVELS.mkString(", ")}.")
.version("3.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(isValidLogLevel,
"Invalid value for 'spark.sql.adaptive.logLevel'. Valid values are " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.createWithDefault("debug")
.enumConf(classOf[Level])
.createWithDefault(Level.DEBUG)

val ADVISORY_PARTITION_SIZE_IN_BYTES =
buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes")
Expand Down Expand Up @@ -1856,10 +1841,8 @@ object SQLConf {
.doc("The default storage level of `dataset.cache()`, `catalog.cacheTable()` and " +
"sql query `CACHE TABLE t`.")
.version("4.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(StorageLevelMapper.values.map(_.name()).toSet)
.createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name())
.enumConf(classOf[StorageLevelMapper])
.createWithDefault(StorageLevelMapper.MEMORY_AND_DISK)

val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel")
.internal()
Expand All @@ -1868,12 +1851,8 @@ object SQLConf {
"used for debugging purposes and not in the production environment, since it generates a " +
"large amount of logs.")
.version("4.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(isValidLogLevel,
"Invalid value for 'spark.sql.dataframeCache.logLevel'. Valid values are " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.createWithDefault("trace")
.enumConf(classOf[Level])
.createWithDefault(Level.TRACE)

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
Expand Down Expand Up @@ -2076,12 +2055,8 @@ object SQLConf {
.doc("Configures the log level for logging of codegen. " +
s"The value can be ${VALID_LOG_LEVELS.mkString(", ")}.")
.version("4.1.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(isValidLogLevel,
"Invalid value for 'spark.sql.codegen.logLevel'. Valid values are " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.createWithDefault("DEBUG")
.enumConf(classOf[Level])
.createWithDefault(Level.DEBUG)

val CODEGEN_LOGGING_MAX_LINES = buildConf("spark.sql.codegen.logging.maxLines")
.internal()
Expand Down Expand Up @@ -5955,13 +5930,13 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def optimizerInSetSwitchThreshold: Int = getConf(OPTIMIZER_INSET_SWITCH_THRESHOLD)

def planChangeLogLevel: Level = Level.valueOf(getConf(PLAN_CHANGE_LOG_LEVEL))
def planChangeLogLevel: Level = getConf(PLAN_CHANGE_LOG_LEVEL)

def planChangeRules: Option[String] = getConf(PLAN_CHANGE_LOG_RULES)

def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES)

def expressionTreeChangeLogLevel: Level = Level.valueOf(getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL))
def expressionTreeChangeLogLevel: Level = getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL)

def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED)

Expand Down Expand Up @@ -6118,7 +6093,7 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
def adaptiveExecutionLogLevel: Level = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)

def fetchShuffleBlocksInBatch: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH)

Expand Down Expand Up @@ -6207,7 +6182,7 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def codegenComments: Boolean = getConf(StaticSQLConf.CODEGEN_COMMENTS)

def codegenLogLevel: Level = Level.valueOf(getConf(CODEGEN_LOG_LEVEL))
def codegenLogLevel: Level = getConf(CODEGEN_LOG_LEVEL)

def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)

Expand Down Expand Up @@ -6434,9 +6409,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def viewSchemaCompensation: Boolean = getConf(VIEW_SCHEMA_COMPENSATION)

def defaultCacheStorageLevel: StorageLevel =
StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL))
StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL).name())

def dataframeCacheLogLevel: String = getConf(DATAFRAME_CACHE_LOG_LEVEL)
def dataframeCacheLogLevel: Level = getConf(DATAFRAME_CACHE_LOG_LEVEL)

def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer

import org.apache.logging.log4j.Level
import org.slf4j.event.{Level => Slf4jLevel}

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand Down Expand Up @@ -103,8 +104,8 @@ class OptimizerLoggingSuite extends PlanTest {
val error = intercept[IllegalArgumentException] {
withSQLConf(SQLConf.PLAN_CHANGE_LOG_LEVEL.key -> level) {}
}
assert(error.getMessage.contains(
"Invalid value for 'spark.sql.planChangeLog.level'."))
assert(error.getMessage == s"${SQLConf.PLAN_CHANGE_LOG_LEVEL.key} should be one of " +
s"${classOf[Slf4jLevel].getEnumConstants.mkString(", ")}, but was $level")
}
}

Expand Down
Loading