Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.{IGNORE_MISSING_FILES => SPARK_IGNORE_MISSING_FILES}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
Expand Down Expand Up @@ -709,7 +710,7 @@ object SQLConf {
val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
.doc("When true, check all the partition paths under the table\'s root directory " +
"when reading data stored in HDFS. This configuration will be deprecated in the future " +
"releases and replaced by spark.files.ignoreMissingFiles.")
s"releases and replaced by ${SPARK_IGNORE_MISSING_FILES.key}.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Regarding to your comment #19868 (comment), spark.sql.hive.verifyPartitionPath can be changed at runtime but spark.files.ignoreMissingFiles cannot be. Is it fair replacement?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just slightly reword that users can use spark.files.ignoreMissingFiles instead of saying it's a replacement if you're concerned. I don't believe this configuration is commonly used enough, and it should be fine.

If users find this is unreasonable, we can un-deprecate it later given the feedback.

.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -2145,6 +2146,46 @@ object SQLConf {
"silently removed.")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
* @param key The deprecated key.
* @param version Version of Spark where key was deprecated.
* @param comment Additional info regarding to the removed config. For example,
* reasons of config deprecation, what users should use instead of it.
*/
case class DeprecatedConfig(key: String, version: String, comment: String)

/**
* Maps deprecated SQL config keys to information about the deprecation.
*
* The extra information is logged as a warning when the SQL config is present
* in the user's configuration.
*/
val deprecatedSQLConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
DeprecatedConfig(VARIABLE_SUBSTITUTE_DEPTH.key, "2.1",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found where this config is used. We can remove it, I think.

"The SQL config is not used by Spark anymore."),
DeprecatedConfig(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE.key, "2.3",
"Behavior for `false` config value is considered as a bug, and " +
"it will be prohibited in the future releases."),
DeprecatedConfig(PARQUET_INT64_AS_TIMESTAMP_MILLIS.key, "2.3",
s"Use '${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}' instead of it."),
DeprecatedConfig(
PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key, "2.4",
"The config allows to switch to the behaviour before Spark 2.4 " +
"and will be removed in the future releases."),
DeprecatedConfig(HIVE_VERIFY_PARTITION_PATH.key, "3.0",
s"This config is replaced by '${SPARK_IGNORE_MISSING_FILES.key}'."),
DeprecatedConfig(ARROW_EXECUTION_ENABLED.key, "3.0",
s"Use '${ARROW_PYSPARK_EXECUTION_ENABLED.key}' instead of it."),
DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0",
s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it.")
)

Map(configs.map { cfg => cfg.key -> cfg } : _*)
}
}

/**
Expand Down
19 changes: 17 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.spark.sql

import org.apache.spark.annotation.Stable
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.RemovedConfig
import org.apache.spark.sql.internal.SQLConf.{DeprecatedConfig, RemovedConfig}

/**
* Runtime configuration interface for Spark. To access this, use `SparkSession.conf`.
Expand All @@ -30,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf.RemovedConfig
* @since 2.0.0
*/
@Stable
class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) extends Logging {

/**
* Sets the given Spark runtime configuration property.
Expand All @@ -40,6 +41,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
def set(key: String, value: String): Unit = {
requireNonStaticConf(key)
requireDefaultValueOfRemovedConf(key, value)
logDeprecationWarning(key)
sqlConf.setConfString(key, value)
}

Expand Down Expand Up @@ -128,6 +130,7 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
*/
def unset(key: String): Unit = {
requireNonStaticConf(key)
logDeprecationWarning(key)
sqlConf.unsetConf(key)
}

Expand Down Expand Up @@ -168,4 +171,16 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
}
}
}

/**
* Logs a warning message if the given config key is deprecated.
*/
private def logDeprecationWarning(key: String): Unit = {
SQLConf.deprecatedSQLConfigs.get(key).foreach {
case DeprecatedConfig(configName, version, comment) =>
logWarning(
s"The SQL config '$configName' has been deprecated in Spark v$version " +
s"and may be removed in the future. $comment")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

package org.apache.spark.sql.internal

import scala.collection.mutable.ArrayBuffer
import scala.language.reflectiveCalls

import org.apache.hadoop.fs.Path
import org.apache.log4j.{AppenderSkeleton, Level}
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.sql._
import org.apache.spark.sql.internal.StaticSQLConf._
Expand Down Expand Up @@ -330,4 +335,31 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
}
assert(e.getMessage.contains(config))
}

test("log deprecation warnings") {
val logAppender = new AppenderSkeleton {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The same class logAppender seems to be defined in some places below, so can we define a helper method for this test purpose somewhere (e.g., TestUtils)?

$grep -nr "extends AppenderSkeleton" .
./catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala:36:  class MockAppender extends AppenderSkeleton {
./catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala:525:    class MockAppender extends AppenderSkeleton {
./catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala:42:  class MockAppender extends AppenderSkeleton {
./core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:1766:    class TestAppender extends AppenderSkeleton {
./core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala:41:  class MockAppender extends AppenderSkeleton {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is slightly orthogonal to the PR but if you think it makes sense I will do that here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think its ok in follow-up.

val loggingEvents = new ArrayBuffer[LoggingEvent]()

override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent)
override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}
def check(config: String): Unit = {
assert(logAppender.loggingEvents.exists(
e => e.getLevel == Level.WARN &&
e.getRenderedMessage.contains(config)))
}

val config1 = "spark.sql.hive.verifyPartitionPath"
withLogAppender(logAppender) {
spark.conf.set(config1, true)
}
check(config1)

val config2 = "spark.sql.execution.pandas.respectSessionTimeZone"
withLogAppender(logAppender) {
spark.conf.unset(config2)
}
check(config2)
}
}