Skip to content

Commit 0c3efb6

Browse files
yaooqinncloud-fan
authored andcommitted
[SPARK-32991][SQL] [FOLLOWUP] Reset command relies on session initials first
### What changes were proposed in this pull request? As a follow-up of #30045, we modify the RESET command here to respect the session initial configs per session first then fall back to the `SharedState` conf, which makes each session could maintain a different copy of initial configs for resetting. ### Why are the changes needed? to make reset command saner. ### Does this PR introduce _any_ user-facing change? yes, RESET will respect session initials first not always go to the system defaults ### How was this patch tested? add new tests Closes #30642 from yaooqinn/SPARK-32991-F. Authored-by: Kent Yao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 205d8e4) Signed-off-by: Wenchen Fan <[email protected]>
1 parent dab9664 commit 0c3efb6

File tree

6 files changed

+104
-37
lines changed

6 files changed

+104
-37
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.util.matching.Regex
2929

3030
import org.apache.hadoop.fs.Path
3131

32-
import org.apache.spark.{SparkContext, TaskContext}
32+
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.internal.config._
3535
import org.apache.spark.internal.config.{IGNORE_MISSING_FILES => SPARK_IGNORE_MISSING_FILES}
@@ -77,6 +77,29 @@ object SQLConf {
7777
}
7878
}
7979

80+
/**
81+
* Merge all non-static configs to the SQLConf. For example, when the 1st [[SparkSession]] and
82+
* the global [[SharedState]] have been initialized, all static configs have taken affect and
83+
* should not be set to other values. Other later created sessions should respect all static
84+
* configs and only be able to change non-static configs.
85+
*/
86+
private[sql] def mergeNonStaticSQLConfigs(
87+
sqlConf: SQLConf,
88+
configs: Map[String, String]): Unit = {
89+
for ((k, v) <- configs if !staticConfKeys.contains(k)) {
90+
sqlConf.setConfString(k, v)
91+
}
92+
}
93+
94+
/**
95+
* Extract entries from `SparkConf` and put them in the `SQLConf`
96+
*/
97+
private[sql] def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
98+
sparkConf.getAll.foreach { case (k, v) =>
99+
sqlConf.setConfString(k, v)
100+
}
101+
}
102+
80103
/**
81104
* Default config. Only used when there is no active SparkSession for the thread.
82105
* See [[get]] for more information.

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class SparkSession private(
8383
@transient private val existingSharedState: Option[SharedState],
8484
@transient private val parentSessionState: Option[SessionState],
8585
@transient private[sql] val extensions: SparkSessionExtensions,
86-
@transient private val initialSessionOptions: Map[String, String])
86+
@transient private[sql] val initialSessionOptions: Map[String, String])
8787
extends Serializable with Closeable with Logging { self =>
8888

8989
// The call site where this SparkSession was constructed.

sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,18 @@ object SetCommand {
172172
case class ResetCommand(config: Option[String]) extends RunnableCommand with IgnoreCachedData {
173173

174174
override def run(sparkSession: SparkSession): Seq[Row] = {
175-
val defaults = sparkSession.sharedState.conf
175+
val globalInitialConfigs = sparkSession.sharedState.conf
176176
config match {
177177
case Some(key) =>
178178
sparkSession.conf.unset(key)
179-
defaults.getOption(key).foreach(sparkSession.conf.set(key, _))
179+
sparkSession.initialSessionOptions.get(key)
180+
.orElse(globalInitialConfigs.getOption(key))
181+
.foreach(sparkSession.conf.set(key, _))
180182
case None =>
181183
sparkSession.sessionState.conf.clear()
182-
defaults.getAll.foreach { case (k, v) =>
183-
sparkSession.sessionState.conf.setConfString(k, v)
184-
}
184+
SQLConf.mergeSparkConf(sparkSession.sessionState.conf, globalInitialConfigs)
185+
SQLConf.mergeNonStaticSQLConfigs(sparkSession.sessionState.conf,
186+
sparkSession.initialSessionOptions)
185187
}
186188
Seq.empty[Row]
187189
}

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.spark.sql.internal
1818

19-
import org.apache.spark.SparkConf
2019
import org.apache.spark.annotation.Unstable
2120
import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _}
2221
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, ResolveSessionCatalog}
@@ -73,15 +72,6 @@ abstract class BaseSessionStateBuilder(
7372
*/
7473
protected def extensions: SparkSessionExtensions = session.extensions
7574

76-
/**
77-
* Extract entries from `SparkConf` and put them in the `SQLConf`
78-
*/
79-
protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): Unit = {
80-
sparkConf.getAll.foreach { case (k, v) =>
81-
sqlConf.setConfString(k, v)
82-
}
83-
}
84-
8575
/**
8676
* SQL-specific key-value configurations.
8777
*
@@ -92,15 +82,15 @@ abstract class BaseSessionStateBuilder(
9282
parentState.map { s =>
9383
val cloned = s.conf.clone()
9484
if (session.sparkContext.conf.get(StaticSQLConf.SQL_LEGACY_SESSION_INIT_WITH_DEFAULTS)) {
95-
mergeSparkConf(cloned, session.sparkContext.conf)
85+
SQLConf.mergeSparkConf(cloned, session.sparkContext.conf)
9686
}
9787
cloned
9888
}.getOrElse {
9989
val conf = new SQLConf
100-
mergeSparkConf(conf, session.sparkContext.conf)
101-
options.foreach {
102-
case (k, v) => conf.setConfString(k, v)
103-
}
90+
SQLConf.mergeSparkConf(conf, session.sharedState.conf)
91+
// the later added configs to spark conf shall be respected too
92+
SQLConf.mergeNonStaticSQLConfigs(conf, session.sparkContext.conf.getAll.toMap)
93+
SQLConf.mergeNonStaticSQLConfigs(conf, session.initialSessionOptions)
10494
conf
10595
}
10696
}
@@ -372,7 +362,7 @@ private[sql] trait WithTestConf { self: BaseSessionStateBuilder =>
372362
parentState.map { s =>
373363
val cloned = s.conf.clone()
374364
if (session.sparkContext.conf.get(StaticSQLConf.SQL_LEGACY_SESSION_INIT_WITH_DEFAULTS)) {
375-
mergeSparkConf(conf, session.sparkContext.conf)
365+
SQLConf.mergeSparkConf(conf, session.sparkContext.conf)
376366
}
377367
cloned
378368
}.getOrElse {
@@ -384,7 +374,7 @@ private[sql] trait WithTestConf { self: BaseSessionStateBuilder =>
384374
overrideConfigurations.foreach { case (key, value) => setConfString(key, value) }
385375
}
386376
}
387-
mergeSparkConf(conf, session.sparkContext.conf)
377+
SQLConf.mergeSparkConf(conf, session.sparkContext.conf)
388378
conf
389379
}
390380
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,12 @@ private[sql] class SharedState(
6767
case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key =>
6868
logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " +
6969
s"in SparkSession's options, it should be set statically for cross-session usages")
70-
case (k, v) =>
71-
logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v")
70+
case (k, v) if SQLConf.staticConfKeys.contains(k) =>
71+
logDebug(s"Applying static initial session options to SparkConf: $k -> $v")
7272
confClone.set(k, v)
73+
case (k, v) =>
74+
logDebug(s"Applying other initial session options to HadoopConf: $k -> $v")
7375
hadoopConfClone.set(k, v)
74-
7576
}
7677
(confClone, hadoopConfClone)
7778
}

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.internal.config.EXECUTOR_ALLOW_SPARK_CONTEXT
2424
import org.apache.spark.internal.config.UI.UI_ENABLED
2525
import org.apache.spark.sql.internal.SQLConf
2626
import org.apache.spark.sql.internal.StaticSQLConf._
27+
import org.apache.spark.util.ThreadUtils
2728

2829
/**
2930
* Test cases for the builder pattern of [[SparkSession]].
@@ -305,34 +306,84 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
305306
// newly specified values
306307
val sharedWH = spark.sharedState.conf.get(wh)
307308
val sharedTD = spark.sharedState.conf.get(td)
308-
val sharedCustom = spark.sharedState.conf.get(custom)
309309
assert(sharedWH === "./data2",
310310
"The warehouse dir in shared state should be determined by the 1st created spark session")
311311
assert(sharedTD === "alice",
312312
"Static sql configs in shared state should be determined by the 1st created spark session")
313-
assert(sharedCustom === "kyao",
314-
"Dynamic sql configs in shared state should be determined by the 1st created spark session")
313+
assert(spark.sharedState.conf.getOption(custom).isEmpty,
314+
"Dynamic sql configs is session specific")
315315

316316
assert(spark.conf.get(wh) === sharedWH,
317317
"The warehouse dir in session conf and shared state conf should be consistent")
318318
assert(spark.conf.get(td) === sharedTD,
319319
"Static sql configs in session conf and shared state conf should be consistent")
320-
assert(spark.conf.get(custom) === sharedCustom,
321-
"Dynamic sql configs in session conf and shared state conf should be consistent before" +
322-
" setting to new ones")
320+
assert(spark.conf.get(custom) === "kyao", "Dynamic sql configs is session specific")
323321

324322
spark.sql("RESET")
325323

326324
assert(spark.conf.get(wh) === sharedWH,
327325
"The warehouse dir in shared state should be respect after RESET")
328326
assert(spark.conf.get(td) === sharedTD,
329327
"Static sql configs in shared state should be respect after RESET")
330-
assert(spark.conf.get(custom) === sharedCustom,
331-
"Dynamic sql configs in shared state should be respect after RESET")
328+
assert(spark.conf.get(custom) === "kyao",
329+
"Dynamic sql configs in session initial map should be respect after RESET")
332330

333-
val spark2 = SparkSession.builder().getOrCreate()
331+
val spark2 = SparkSession.builder()
332+
.config(wh, "./data3")
333+
.config(custom, "kyaoo").getOrCreate()
334334
assert(spark2.conf.get(wh) === sharedWH)
335335
assert(spark2.conf.get(td) === sharedTD)
336-
assert(spark2.conf.get(custom) === sharedCustom)
336+
assert(spark2.conf.get(custom) === "kyaoo")
337+
}
338+
339+
test("SPARK-32991: RESET should work properly with multi threads") {
340+
val wh = "spark.sql.warehouse.dir"
341+
val td = "spark.sql.globalTempDatabase"
342+
val custom = "spark.sql.custom"
343+
val spark = ThreadUtils.runInNewThread("new session 0", false) {
344+
SparkSession.builder()
345+
.master("local")
346+
.config(wh, "./data0")
347+
.config(td, "bob")
348+
.config(custom, "c0")
349+
.getOrCreate()
350+
}
351+
352+
spark.sql(s"SET $custom=c1")
353+
assert(spark.conf.get(custom) === "c1")
354+
spark.sql("RESET")
355+
assert(spark.conf.get(wh) === "./data0",
356+
"The warehouse dir in shared state should be respect after RESET")
357+
assert(spark.conf.get(td) === "bob",
358+
"Static sql configs in shared state should be respect after RESET")
359+
assert(spark.conf.get(custom) === "c0",
360+
"Dynamic sql configs in shared state should be respect after RESET")
361+
362+
val spark1 = ThreadUtils.runInNewThread("new session 1", false) {
363+
SparkSession.builder().getOrCreate()
364+
}
365+
366+
assert(spark === spark1)
367+
368+
// TODO: SPARK-33718: After clear sessions, the SharedState will be unreachable, then all
369+
// the new static will take effect.
370+
SparkSession.clearDefaultSession()
371+
val spark2 = ThreadUtils.runInNewThread("new session 2", false) {
372+
SparkSession.builder()
373+
.master("local")
374+
.config(wh, "./data1")
375+
.config(td, "alice")
376+
.config(custom, "c2")
377+
.getOrCreate()
378+
}
379+
380+
assert(spark2 !== spark)
381+
spark2.sql(s"SET $custom=c1")
382+
assert(spark2.conf.get(custom) === "c1")
383+
spark2.sql("RESET")
384+
assert(spark2.conf.get(wh) === "./data1")
385+
assert(spark2.conf.get(td) === "alice")
386+
assert(spark2.conf.get(custom) === "c2")
387+
337388
}
338389
}

0 commit comments

Comments
 (0)