diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1e05b6e2f99e5..10f60e2a3810b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkContext, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.{IGNORE_MISSING_FILES => SPARK_IGNORE_MISSING_FILES} import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode @@ -709,7 +710,7 @@ object SQLConf { val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + "when reading data stored in HDFS. This configuration will be deprecated in the future " + - "releases and replaced by spark.files.ignoreMissingFiles.") + s"releases and replaced by ${SPARK_IGNORE_MISSING_FILES.key}.") .booleanConf .createWithDefault(false) @@ -2145,6 +2146,46 @@ object SQLConf { "silently removed.") .booleanConf .createWithDefault(false) + + /** + * Holds information about keys that have been deprecated. + * + * @param key The deprecated key. + * @param version Version of Spark where key was deprecated. + * @param comment Additional info regarding to the removed config. For example, + * reasons of config deprecation, what users should use instead of it. + */ + case class DeprecatedConfig(key: String, version: String, comment: String) + + /** + * Maps deprecated SQL config keys to information about the deprecation. + * + * The extra information is logged as a warning when the SQL config is present + * in the user's configuration. + */ + val deprecatedSQLConfigs: Map[String, DeprecatedConfig] = { + val configs = Seq( + DeprecatedConfig(VARIABLE_SUBSTITUTE_DEPTH.key, "2.1", + "The SQL config is not used by Spark anymore."), + DeprecatedConfig(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE.key, "2.3", + "Behavior for `false` config value is considered as a bug, and " + + "it will be prohibited in the future releases."), + DeprecatedConfig(PARQUET_INT64_AS_TIMESTAMP_MILLIS.key, "2.3", + s"Use '${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}' instead of it."), + DeprecatedConfig( + PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key, "2.4", + "The config allows to switch to the behaviour before Spark 2.4 " + + "and will be removed in the future releases."), + DeprecatedConfig(HIVE_VERIFY_PARTITION_PATH.key, "3.0", + s"This config is replaced by '${SPARK_IGNORE_MISSING_FILES.key}'."), + DeprecatedConfig(ARROW_EXECUTION_ENABLED.key, "3.0", + s"Use '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' instead of it."), + DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0", + s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it.") + ) + + Map(configs.map { cfg => cfg.key -> cfg } : _*) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index 87ac4b6db4ac0..e1b44b5918143 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql import org.apache.spark.annotation.Stable +import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.RemovedConfig +import org.apache.spark.sql.internal.SQLConf.{DeprecatedConfig, RemovedConfig} /** * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`. @@ -30,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf.RemovedConfig * @since 2.0.0 */ @Stable -class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { +class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) extends Logging { /** * Sets the given Spark runtime configuration property. @@ -40,6 +41,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { def set(key: String, value: String): Unit = { requireNonStaticConf(key) requireDefaultValueOfRemovedConf(key, value) + logDeprecationWarning(key) sqlConf.setConfString(key, value) } @@ -128,6 +130,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { */ def unset(key: String): Unit = { requireNonStaticConf(key) + logDeprecationWarning(key) sqlConf.unsetConf(key) } @@ -168,4 +171,16 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { } } } + + /** + * Logs a warning message if the given config key is deprecated. + */ + private def logDeprecationWarning(key: String): Unit = { + SQLConf.deprecatedSQLConfigs.get(key).foreach { + case DeprecatedConfig(configName, version, comment) => + logWarning( + s"The SQL config '$configName' has been deprecated in Spark v$version " + + s"and may be removed in the future. $comment") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 8786696119528..23a70f912a5d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -17,7 +17,12 @@ package org.apache.spark.sql.internal +import scala.collection.mutable.ArrayBuffer +import scala.language.reflectiveCalls + import org.apache.hadoop.fs.Path +import org.apache.log4j.{AppenderSkeleton, Level} +import org.apache.log4j.spi.LoggingEvent import org.apache.spark.sql._ import org.apache.spark.sql.internal.StaticSQLConf._ @@ -330,4 +335,31 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } assert(e.getMessage.contains(config)) } + + test("log deprecation warnings") { + val logAppender = new AppenderSkeleton { + val loggingEvents = new ArrayBuffer[LoggingEvent]() + + override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent) + override def close(): Unit = {} + override def requiresLayout(): Boolean = false + } + def check(config: String): Unit = { + assert(logAppender.loggingEvents.exists( + e => e.getLevel == Level.WARN && + e.getRenderedMessage.contains(config))) + } + + val config1 = "spark.sql.hive.verifyPartitionPath" + withLogAppender(logAppender) { + spark.conf.set(config1, true) + } + check(config1) + + val config2 = "spark.sql.execution.pandas.respectSessionTimeZone" + withLogAppender(logAppender) { + spark.conf.unset(config2) + } + check(config2) + } }