diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 462daaa3fe745..8962cc3821f36 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -8641,11 +8641,6 @@ "duration() called on unfinished task" ] }, - "_LEGACY_ERROR_TEMP_3027" : { - "message" : [ - "Unrecognized : " - ] - }, "_LEGACY_ERROR_TEMP_3028" : { "message" : [ "" diff --git a/common/utils/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/common/utils/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala index f68ced0695056..0f8a6b5fe334d 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala @@ -57,6 +57,15 @@ private object ConfigHelpers { } } + def toEnum[E <: Enum[E]](s: String, enumClass: Class[E], key: String): E = { + enumClass.getEnumConstants.find(_.name().equalsIgnoreCase(s.trim)) match { + case Some(enum) => enum + case None => + throw new IllegalArgumentException( + s"$key should be one of ${enumClass.getEnumConstants.mkString(", ")}, but was $s") + } + } + def stringToSeq[T](str: String, converter: String => T): Seq[T] = { SparkStringUtils.stringToSeq(str).map(converter) } @@ -287,6 +296,11 @@ private[spark] case class ConfigBuilder(key: String) { new TypedConfigBuilder(this, toEnum(_, e, key)) } + def enumConf[E <: Enum[E]](e: Class[E]): TypedConfigBuilder[E] = { + checkPrependConfig + new TypedConfigBuilder(this, toEnum(_, e, key)) + } + def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = { checkPrependConfig new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit)) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 57b0647e59fd9..e21c772c00779 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -31,7 +31,6 @@ import org.apache.spark.network.crypto.AuthServerBootstrap import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap} import org.apache.spark.network.shuffle.ExternalBlockHandler -import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.util.TransportConf import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -86,11 +85,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana protected def newShuffleBlockHandler(conf: TransportConf): ExternalBlockHandler = { if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) { val shuffleDBName = sparkConf.get(config.SHUFFLE_SERVICE_DB_BACKEND) - val dbBackend = DBBackend.byName(shuffleDBName) - logInfo(log"Use ${MDC(SHUFFLE_DB_BACKEND_NAME, dbBackend.name())} as the implementation of " + + logInfo( + log"Use ${MDC(SHUFFLE_DB_BACKEND_NAME, shuffleDBName.name())} as the implementation of " + log"${MDC(SHUFFLE_DB_BACKEND_KEY, config.SHUFFLE_SERVICE_DB_BACKEND.key)}") new ExternalBlockHandler(conf, - findRegisteredExecutorsDBFile(dbBackend.fileName(registeredExecutorsDB))) + findRegisteredExecutorsDBFile(shuffleDBName.fileName(registeredExecutorsDB))) } else { new ExternalBlockHandler(conf, null) } diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala index 8a790291b4e72..30660a177416d 100644 --- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala +++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala @@ -267,19 +267,6 @@ private[spark] object SparkCoreErrors { new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3026") } - def unrecognizedSchedulerModePropertyError( - schedulerModeProperty: String, - schedulingModeConf: String): Throwable = { - new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_3027", - messageParameters = Map( - "schedulerModeProperty" -> schedulerModeProperty, - "schedulingModeConf" -> schedulingModeConf - ), - cause = null - ) - } - def sparkError(errorMsg: String): Throwable = { new SparkException( errorClass = "_LEGACY_ERROR_TEMP_3028", diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3ce374d0477d8..039387cba719f 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -807,10 +807,8 @@ package object config { .doc("Specifies a disk-based store used in shuffle service local db. " + "ROCKSDB or LEVELDB (deprecated).") .version("3.4.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValues(DBBackend.values.map(_.toString).toSet) - .createWithDefault(DBBackend.ROCKSDB.name) + .enumConf(classOf[DBBackend]) + .createWithDefault(DBBackend.ROCKSDB) private[spark] val SHUFFLE_SERVICE_PORT = ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337) @@ -2295,9 +2293,8 @@ package object config { private[spark] val SCHEDULER_MODE = ConfigBuilder("spark.scheduler.mode") .version("0.8.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .createWithDefault(SchedulingMode.FIFO.toString) + .enumConf(SchedulingMode) + .createWithDefault(SchedulingMode.FIFO) private[spark] val SCHEDULER_REVIVE_INTERVAL = ConfigBuilder("spark.scheduler.revive.interval") diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 6f64dff3f39d6..bea49fb279ee3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -89,7 +89,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext log"${MDC(LogKeys.FILE_NAME, DEFAULT_SCHEDULER_FILE)}") Some((is, DEFAULT_SCHEDULER_FILE)) } else { - val schedulingMode = SchedulingMode.withName(sc.conf.get(SCHEDULER_MODE)) + val schedulingMode = sc.conf.get(SCHEDULER_MODE) rootPool.addSchedulable(new Pool( DEFAULT_POOL_NAME, schedulingMode, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) logInfo(log"Fair scheduler configuration not found, created default pool: " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 63b784c47d15a..13018da5bc274 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -181,15 +181,7 @@ private[spark] class TaskSchedulerImpl( private var schedulableBuilder: SchedulableBuilder = null // default scheduler is FIFO - private val schedulingModeConf = conf.get(SCHEDULER_MODE) - val schedulingMode: SchedulingMode = - try { - SchedulingMode.withName(schedulingModeConf) - } catch { - case e: java.util.NoSuchElementException => - throw SparkCoreErrors.unrecognizedSchedulerModePropertyError(SCHEDULER_MODE_PROPERTY, - schedulingModeConf) - } + val schedulingMode: SchedulingMode = conf.get(SCHEDULER_MODE) val rootPool: Pool = new Pool("", schedulingMode, 0, 0) diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 948acb7112c8e..5999ce02bb43d 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -50,9 +50,7 @@ private[spark] object UIWorkloadGenerator { val conf = new SparkConf().setMaster(args(0)).setAppName("Spark UI tester") val schedulingMode = SchedulingMode.withName(args(1)) - if (schedulingMode == SchedulingMode.FAIR) { - conf.set(SCHEDULER_MODE, "FAIR") - } + conf.set(SCHEDULER_MODE, schedulingMode) val nJobSet = args(2).toInt val sc = new SparkContext(conf) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 65ed2684a5b00..da6b57a0bccb9 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -59,7 +59,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("local mode, FIFO scheduler") { - val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO") + val conf = new SparkConf().set(SCHEDULER_MODE.key, "FIFO") sc = new SparkContext("local[2]", "test", conf) testCount() testTake() @@ -68,7 +68,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("local mode, fair scheduler") { - val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR") + val conf = new SparkConf().set(SCHEDULER_MODE.key, "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath) sc = new SparkContext("local[2]", "test", conf) @@ -79,7 +79,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("cluster mode, FIFO scheduler") { - val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO") + val conf = new SparkConf().set(SCHEDULER_MODE.key, "FIFO") sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) testCount() testTake() @@ -88,7 +88,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("cluster mode, fair scheduler") { - val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR") + val conf = new SparkConf().set(SCHEDULER_MODE.key, "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala index 5aa542a0b9856..457e92b062808 100644 --- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala @@ -21,6 +21,7 @@ import java.util.Locale import java.util.concurrent.TimeUnit import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.network.shuffledb.DBBackend import org.apache.spark.network.util.ByteUnit import org.apache.spark.util.SparkConfWithEnv @@ -389,7 +390,7 @@ class ConfigEntrySuite extends SparkFunSuite { } - test("SPARK-51874: Add Enum support to ConfigBuilder") { + test("SPARK-51874: Add Scala Enumeration support to ConfigBuilder") { object MyTestEnum extends Enumeration { val X, Y, Z = Value } @@ -408,4 +409,20 @@ class ConfigEntrySuite extends SparkFunSuite { } assert(e.getMessage === s"${enumConf.key} should be one of X, Y, Z, but was A") } + + test("SPARK-51896: Add Java enum support to ConfigBuilder") { + val conf = new SparkConf() + val enumConf = ConfigBuilder("spark.test.java.enum.key") + .enumConf(classOf[DBBackend]) + .createWithDefault(DBBackend.LEVELDB) + assert(conf.get(enumConf) === DBBackend.LEVELDB) + conf.set(enumConf, DBBackend.ROCKSDB) + assert(conf.get(enumConf) === DBBackend.ROCKSDB) + val e = intercept[IllegalArgumentException] { + conf.set(enumConf.key, "ANYDB") + conf.get(enumConf) + } + assert(e.getMessage === + s"${enumConf.key} should be one of ${DBBackend.values.mkString(", ")}, but was ANYDB") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d5d2169235879..a0733c6f317a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -415,21 +415,14 @@ object SQLConf { private val VALID_LOG_LEVELS: Array[String] = Level.values.map(_.toString) - private def isValidLogLevel(level: String): Boolean = - VALID_LOG_LEVELS.contains(level.toUpperCase(Locale.ROOT)) - val PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.planChangeLog.level") .internal() .doc("Configures the log level for logging the change from the original plan to the new " + s"plan after a rule or batch is applied. The value can be " + s"${VALID_LOG_LEVELS.mkString(", ")}.") .version("3.1.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(isValidLogLevel, - "Invalid value for 'spark.sql.planChangeLog.level'. Valid values are " + - s"${VALID_LOG_LEVELS.mkString(", ")}.") - .createWithDefault("trace") + .enumConf(classOf[Level]) + .createWithDefault(Level.TRACE) val PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.planChangeLog.rules") .internal() @@ -461,12 +454,8 @@ object SQLConf { "the resolved expression tree in the single-pass bottom-up Resolver. The value can be " + s"${VALID_LOG_LEVELS.mkString(", ")}.") .version("4.0.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(isValidLogLevel, - "Invalid value for 'spark.sql.expressionTreeChangeLog.level'. Valid values are " + - s"${VALID_LOG_LEVELS.mkString(", ")}.") - .createWithDefault("trace") + .enumConf(classOf[Level]) + .createWithDefault(Level.TRACE) val LIGHTWEIGHT_PLAN_CHANGE_VALIDATION = buildConf("spark.sql.lightweightPlanChangeValidation") .internal() @@ -838,12 +827,8 @@ object SQLConf { .doc("Configures the log level for adaptive execution logging of plan changes. The value " + s"can be ${VALID_LOG_LEVELS.mkString(", ")}.") .version("3.0.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(isValidLogLevel, - "Invalid value for 'spark.sql.adaptive.logLevel'. Valid values are " + - s"${VALID_LOG_LEVELS.mkString(", ")}.") - .createWithDefault("debug") + .enumConf(classOf[Level]) + .createWithDefault(Level.DEBUG) val ADVISORY_PARTITION_SIZE_IN_BYTES = buildConf("spark.sql.adaptive.advisoryPartitionSizeInBytes") @@ -1856,10 +1841,8 @@ object SQLConf { .doc("The default storage level of `dataset.cache()`, `catalog.cacheTable()` and " + "sql query `CACHE TABLE t`.") .version("4.0.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValues(StorageLevelMapper.values.map(_.name()).toSet) - .createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name()) + .enumConf(classOf[StorageLevelMapper]) + .createWithDefault(StorageLevelMapper.MEMORY_AND_DISK) val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel") .internal() @@ -1868,12 +1851,8 @@ object SQLConf { "used for debugging purposes and not in the production environment, since it generates a " + "large amount of logs.") .version("4.0.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(isValidLogLevel, - "Invalid value for 'spark.sql.dataframeCache.logLevel'. Valid values are " + - s"${VALID_LOG_LEVELS.mkString(", ")}.") - .createWithDefault("trace") + .enumConf(classOf[Level]) + .createWithDefault(Level.TRACE) val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .internal() @@ -2076,12 +2055,8 @@ object SQLConf { .doc("Configures the log level for logging of codegen. " + s"The value can be ${VALID_LOG_LEVELS.mkString(", ")}.") .version("4.1.0") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(isValidLogLevel, - "Invalid value for 'spark.sql.codegen.logLevel'. Valid values are " + - s"${VALID_LOG_LEVELS.mkString(", ")}.") - .createWithDefault("DEBUG") + .enumConf(classOf[Level]) + .createWithDefault(Level.DEBUG) val CODEGEN_LOGGING_MAX_LINES = buildConf("spark.sql.codegen.logging.maxLines") .internal() @@ -5955,13 +5930,13 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def optimizerInSetSwitchThreshold: Int = getConf(OPTIMIZER_INSET_SWITCH_THRESHOLD) - def planChangeLogLevel: Level = Level.valueOf(getConf(PLAN_CHANGE_LOG_LEVEL)) + def planChangeLogLevel: Level = getConf(PLAN_CHANGE_LOG_LEVEL) def planChangeRules: Option[String] = getConf(PLAN_CHANGE_LOG_RULES) def planChangeBatches: Option[String] = getConf(PLAN_CHANGE_LOG_BATCHES) - def expressionTreeChangeLogLevel: Level = Level.valueOf(getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL)) + def expressionTreeChangeLogLevel: Level = getConf(EXPRESSION_TREE_CHANGE_LOG_LEVEL) def dynamicPartitionPruningEnabled: Boolean = getConf(DYNAMIC_PARTITION_PRUNING_ENABLED) @@ -6118,7 +6093,7 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) - def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL) + def adaptiveExecutionLogLevel: Level = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL) def fetchShuffleBlocksInBatch: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH) @@ -6207,7 +6182,7 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def codegenComments: Boolean = getConf(StaticSQLConf.CODEGEN_COMMENTS) - def codegenLogLevel: Level = Level.valueOf(getConf(CODEGEN_LOG_LEVEL)) + def codegenLogLevel: Level = getConf(CODEGEN_LOG_LEVEL) def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES) @@ -6434,9 +6409,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def viewSchemaCompensation: Boolean = getConf(VIEW_SCHEMA_COMPENSATION) def defaultCacheStorageLevel: StorageLevel = - StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL)) + StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL).name()) - def dataframeCacheLogLevel: String = getConf(DATAFRAME_CACHE_LOG_LEVEL) + def dataframeCacheLogLevel: Level = getConf(DATAFRAME_CACHE_LOG_LEVEL) def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala index 95b55797b294c..083c522287cab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.logging.log4j.Level +import org.slf4j.event.{Level => Slf4jLevel} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -103,8 +104,8 @@ class OptimizerLoggingSuite extends PlanTest { val error = intercept[IllegalArgumentException] { withSQLConf(SQLConf.PLAN_CHANGE_LOG_LEVEL.key -> level) {} } - assert(error.getMessage.contains( - "Invalid value for 'spark.sql.planChangeLog.level'.")) + assert(error.getMessage == s"${SQLConf.PLAN_CHANGE_LOG_LEVEL.key} should be one of " + + s"${classOf[Slf4jLevel].getEnumConstants.mkString(", ")}, but was $level") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 8fe7565c902a5..ef2b5c1e19cd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.internal.{LogEntry, Logging, MDC} +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} @@ -485,14 +485,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } object CacheManager extends Logging { - def logCacheOperation(f: => LogEntry): Unit = { - SQLConf.get.dataframeCacheLogLevel match { - case "TRACE" => logTrace(f) - case "DEBUG" => logDebug(f) - case "INFO" => logInfo(f) - case "WARN" => logWarning(f) - case "ERROR" => logError(f) - case _ => logTrace(f) - } + def logCacheOperation(f: => MessageWithContext): Unit = { + logBasedOnLevel(SQLConf.get.dataframeCacheLogLevel)(f) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 07d215f8a186f..996e01a0ea936 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -77,13 +77,7 @@ case class AdaptiveSparkPlanExec( @transient private val lock = new Object() @transient private val logOnLevel: ( => MessageWithContext) => Unit = - conf.adaptiveExecutionLogLevel match { - case "TRACE" => logTrace(_) - case "INFO" => logInfo(_) - case "WARN" => logWarning(_) - case "ERROR" => logError(_) - case _ => logDebug(_) - } + logBasedOnLevel(conf.adaptiveExecutionLogLevel) @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()