Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,16 @@ object SetCommand {
case class ResetCommand(config: Option[String]) extends RunnableCommand with IgnoreCachedData {

override def run(sparkSession: SparkSession): Seq[Row] = {
val conf = sparkSession.sessionState.conf
val defaults = sparkSession.sparkContext.conf
config match {
case Some(key) =>
conf.unsetConf(key)
defaults.getOption(key).foreach(conf.setConfString(key, _))
sparkSession.conf.unset(key)
defaults.getOption(key).foreach(sparkSession.conf.set(key, _))
case None =>
conf.clear()
defaults.getAll.foreach { case (k, v) => conf.setConfString(k, v) }
sparkSession.sessionState.conf.clear()
defaults.getAll.foreach { case (k, v) =>
sparkSession.sessionState.conf.setConfString(k, v)
}
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,12 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
sql(s"set ${SQLConf.GROUP_BY_ORDINAL.key}=false")
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === false)
assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 1)
assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
sql(s"reset")
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL))
assert(sql(s"set").where(s"key = '${SQLConf.GROUP_BY_ORDINAL.key}'").count() == 0)
assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"))
} finally {
sql(s"set ${SQLConf.GROUP_BY_ORDINAL}=$original")
}
Expand Down Expand Up @@ -182,18 +185,42 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-32406: reset - single configuration") {
spark.sessionState.conf.clear()
// spark core conf w/o entry registered
val appId = spark.sparkContext.getConf.getAppId
sql("RESET spark.app.id")
assert(spark.conf.get("spark.app.id") === appId, "Should not change spark core ones")
// spark core conf w/ entry registered
val e1 = intercept[AnalysisException](sql("RESET spark.executor.cores"))
assert(e1.getMessage === "Cannot modify the value of a Spark config: spark.executor.cores;")

// user defined settings
sql("SET spark.abc=xyz")
assert(spark.conf.get("spark.abc") === "xyz")
sql("RESET spark.abc")
intercept[NoSuchElementException](spark.conf.get("spark.abc"))
sql("RESET spark.abc") // ignore nonexistent keys

// runtime sql configs
val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL)
sql(s"SET ${SQLConf.GROUP_BY_ORDINAL.key}=false")
sql(s"RESET ${SQLConf.GROUP_BY_ORDINAL.key}")
assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === original)

// runtime sql configs with optional defaults
assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES).isEmpty)
sql(s"RESET ${SQLConf.OPTIMIZER_EXCLUDED_RULES.key}")
assert(spark.conf.get(SQLConf.OPTIMIZER_EXCLUDED_RULES) ===
Some("org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation"))
sql(s"SET ${SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_RULES.key}=abc")
sql(s"RESET ${SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_RULES.key}")
assert(spark.conf.get(SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_RULES).isEmpty)

// static sql configs
val e2 = intercept[AnalysisException](sql(s"RESET ${StaticSQLConf.WAREHOUSE_PATH.key}"))
assert(e2.getMessage ===
s"Cannot modify the value of a static config: ${StaticSQLConf.WAREHOUSE_PATH.key};")

}

test("invalid conf value") {
Expand Down