From f253fad00c14376e950804849481fa6252cd8154 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Oct 2020 11:33:31 +0800 Subject: [PATCH 1/6] [SPARK-32991][SQL] Use conf in shared state as the original configuration for RESET --- .../sql/execution/command/SetCommand.scala | 2 +- .../spark/sql/internal/SharedState.scala | 15 ++++-- .../spark/sql/SparkSessionBuilderSuite.scala | 49 +++++++++++++++++++ 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index fd89e361fe3d..61ee6d7f4a29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -172,7 +172,7 @@ object SetCommand { case class ResetCommand(config: Option[String]) extends RunnableCommand with IgnoreCachedData { override def run(sparkSession: SparkSession): Seq[Row] = { - val defaults = sparkSession.sparkContext.conf + val defaults = sparkSession.sharedState.conf config match { case Some(key) => sparkSession.conf.unset(key) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index ce4385d88f1e..cc23a33cf88e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -55,10 +55,11 @@ private[sql] class SharedState( SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf, sparkContext.hadoopConfiguration) - private val (conf, hadoopConf) = { + private[sql] val (conf, hadoopConf) = { // Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into // both spark conf and hadoop conf avoiding be affected by any SparkSession level options - SharedState.loadHiveConfFile(sparkContext.conf, sparkContext.hadoopConfiguration) + SharedState.loadHiveConfFile( + sparkContext.conf, sparkContext.hadoopConfiguration, initialConfigs) val confClone = sparkContext.conf.clone() val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing @@ -227,7 +228,8 @@ object SharedState extends Logging { */ def loadHiveConfFile( sparkConf: SparkConf, - hadoopConf: Configuration): Unit = { + hadoopConf: Configuration, + initialConfigs: scala.collection.Map[String, String] = Map.empty): Unit = { val hiveWarehouseKey = "hive.metastore.warehouse.dir" val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") if (configFile != null) { @@ -238,11 +240,13 @@ object SharedState extends Logging { hadoopConf.setIfUnset(entry.getKey, entry.getValue) } } + val sparkWarehouse = + initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key)) // hive.metastore.warehouse.dir only stay in hadoopConf sparkConf.remove(hiveWarehouseKey) // Set the Hive metastore warehouse path to the one we use val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey) - val warehousePath = if (hiveWarehouseDir != null && !sparkConf.contains(WAREHOUSE_PATH.key)) { + val warehousePath = if (hiveWarehouseDir != null && sparkWarehouse.isEmpty) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) @@ -254,9 +258,10 @@ object SharedState extends Logging { // the value of spark.sql.warehouse.dir. // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH) + val sparkWarehouseDir = sparkWarehouse.getOrElse(sparkConf.get(WAREHOUSE_PATH)) logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " + s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") + sparkConf.set(WAREHOUSE_PATH.key, sparkWarehouseDir) hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir) sparkWarehouseDir } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 9da32d02aa72..336052a99b6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -281,4 +281,53 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { () } } + + test("SPARK-32991: Use conf in shared state as the original configuration for RESET") { + val wh = "spark.sql.warehouse.dir" + val td = "spark.sql.globalTempDatabase" + val custom = "spark.sql.custom" + + val conf = new SparkConf() + .setMaster("local") + .setAppName("SPARK-32991") + .set(wh, "./data1") + .set(td, "bob") + + val sc = new SparkContext(conf) + + val spark = SparkSession.builder() + .config(wh, "./data2") + .config(td, "alice") + .config(custom, "kyao") + .getOrCreate() + + val sharedWH = spark.sharedState.conf.get(wh) + val sharedTD = spark.sharedState.conf.get(td) + val sharedCustom = spark.sharedState.conf.get(custom) + assert(sharedWH === "./data2", + "The warehouse dir in shared state should be determined by the 1st created spark session") + assert(sharedTD === "alice", + "Static sql configs in shared state should be determined by the 1st created spark session") + assert(sharedCustom === "kyao", + "Dynamic sql configs in shared state should be determined by the 1st created spark session") + + + assert(spark.conf.get(wh) === sharedWH) + assert(spark.conf.get(td) === sharedTD) + assert(spark.conf.get(custom) === sharedCustom) + + spark.sql("RESET") + + assert(spark.conf.get(wh) === sharedWH, + "The warehouse dir in shared state should be respect after RESET") + assert(spark.conf.get(td) === sharedTD, + "Static sql configs in shared state should be respect after RESET") + assert(spark.conf.get(custom) === sharedCustom, + "Dynamic sql configs in shared state should be respect after RESET") + + val spark2 = SparkSession.builder().getOrCreate() + assert(spark2.conf.get(wh) === sharedWH) + assert(spark2.conf.get(td) === sharedTD) + assert(spark2.conf.get(custom) === sharedCustom) + } } From a52c86afa3636f819989782c48346a533ca7a079 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Oct 2020 15:05:47 +0800 Subject: [PATCH 2/6] fix test --- .../spark/sql/hive/HiveSharedStateSuite.scala | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 78535b094b83..aaaed7a9a7dc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.sql.internal.SharedState +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.util.Utils @@ -29,26 +29,30 @@ class HiveSharedStateSuite extends SparkFunSuite { test("initial configs should be passed to SharedState but not SparkContext") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") val sc = SparkContext.getOrCreate(conf) + val wareHouseDir = Utils.createTempDir().toString val invalidPath = "invalid/path" val metastorePath = Utils.createTempDir() val tmpDb = "tmp_db" // The initial configs used to generate SharedState, none of these should affect the global // shared SparkContext's configurations. Especially, all these configs are passed to the cloned - // confs inside SharedState except metastore warehouse dir. + // confs inside SharedState for share. val initialConfigs = Map("spark.foo" -> "bar", - WAREHOUSE_PATH.key -> invalidPath, - ConfVars.METASTOREWAREHOUSE.varname -> invalidPath, + WAREHOUSE_PATH.key -> wareHouseDir, + ConfVars.METASTOREWAREHOUSE.varname -> wareHouseDir, CATALOG_IMPLEMENTATION.key -> "hive", ConfVars.METASTORECONNECTURLKEY.varname -> s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true", GLOBAL_TEMP_DATABASE.key -> tmpDb) - val state = new SharedState(sc, initialConfigs) - assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath, - "warehouse conf in session options can't affect application wide spark conf") - assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath, - "warehouse conf in session options can't affect application wide hadoop conf") + val builder = SparkSession.builder() + initialConfigs.foreach { case (k, v) => builder.config(k, v) } + val ss = builder.getOrCreate() + val state = ss.sharedState + assert(sc.conf.get(WAREHOUSE_PATH.key) === wareHouseDir, + "initial warehouse conf in session options can affect application wide spark conf") + assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) === wareHouseDir, + "initial warehouse conf in session options can affect application wide hadoop conf") assert(!state.sparkContext.conf.contains("spark.foo"), "static spark conf should not be affected by session") @@ -57,9 +61,20 @@ class HiveSharedStateSuite extends SparkFunSuite { val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client assert(client.getConf("spark.foo", "") === "bar", "session level conf should be passed to catalog") - assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, invalidPath) !== invalidPath, - "session level conf should be passed to catalog except warehouse dir") + assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, "") === wareHouseDir, + "session level conf should be passed to catalog") assert(state.globalTempViewManager.database === tmpDb) + + val ss2 = + builder.config("spark.foo", "bar2222").config(WAREHOUSE_PATH.key, invalidPath).getOrCreate() + + assert(ss2.sparkContext.conf.get(WAREHOUSE_PATH.key) !== invalidPath, + "warehouse conf in session options can't affect application wide spark conf") + assert(ss2.sparkContext.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== + invalidPath, "warehouse conf in session options can't affect application wide hadoop conf") + assert(ss.conf.get("spark.foo") === "bar2222", "session level conf should be passed to catalog") + assert(ss.conf.get(WAREHOUSE_PATH) !== invalidPath, + "session level conf should be passed to catalog") } } From ccbf0aefd7070e13fe998b35a1ae48796a0c8f38 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Oct 2020 16:40:01 +0800 Subject: [PATCH 3/6] fix test --- .../org/apache/spark/sql/hive/HiveSharedStateSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index aaaed7a9a7dc..c245fe7b2f3c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -26,6 +26,12 @@ import org.apache.spark.util.Utils class HiveSharedStateSuite extends SparkFunSuite { + override def beforeEach(): Unit = { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + super.beforeEach() + } + test("initial configs should be passed to SharedState but not SparkContext") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") val sc = SparkContext.getOrCreate(conf) From 91c2e91b3c6133951a44214911ede8e51ffc07fa Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 16 Oct 2020 23:23:39 +0800 Subject: [PATCH 4/6] update comments and fix test --- .../apache/spark/sql/SparkSessionBuilderSuite.scala | 13 +++++++++---- .../spark/sql/hive/HiveSharedStateSuite.scala | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 8536ab3e8177..23695af0f59c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -320,6 +320,8 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { .config(custom, "kyao") .getOrCreate() + // When creating the first session like above, we will update the shared spark conf to the + // newly specified values val sharedWH = spark.sharedState.conf.get(wh) val sharedTD = spark.sharedState.conf.get(td) val sharedCustom = spark.sharedState.conf.get(custom) @@ -330,10 +332,13 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach { assert(sharedCustom === "kyao", "Dynamic sql configs in shared state should be determined by the 1st created spark session") - - assert(spark.conf.get(wh) === sharedWH) - assert(spark.conf.get(td) === sharedTD) - assert(spark.conf.get(custom) === sharedCustom) + assert(spark.conf.get(wh) === sharedWH, + "The warehouse dir in session conf and shared state conf should be consistent") + assert(spark.conf.get(td) === sharedTD, + "Static sql configs in session conf and shared state conf should be consistent") + assert(spark.conf.get(custom) === sharedCustom, + "Dynamic sql configs in session conf and shared state conf should be consistent before" + + " setting to new ones") spark.sql("RESET") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index c245fe7b2f3c..f47427cc8da6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.Utils class HiveSharedStateSuite extends SparkFunSuite { override def beforeEach(): Unit = { - SparkSession.clearActiveSession() + SparkSession.clearActiveSessionInternal() SparkSession.clearDefaultSession() super.beforeEach() } From 0cbb4bc82360f0bc75b2186ea9c6df00b52bb84c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 22 Oct 2020 17:39:52 +0800 Subject: [PATCH 5/6] update comment --- .../org/apache/spark/sql/hive/HiveSharedStateSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index f47427cc8da6..d2d4546ea18e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -41,8 +41,9 @@ class HiveSharedStateSuite extends SparkFunSuite { val tmpDb = "tmp_db" // The initial configs used to generate SharedState, none of these should affect the global - // shared SparkContext's configurations. Especially, all these configs are passed to the cloned - // confs inside SharedState for share. + // shared SparkContext's configurations, except spark.sql.warehouse.dir. + // Especially, all these configs are passed to the cloned confs inside SharedState for sharing + // cross sessions. val initialConfigs = Map("spark.foo" -> "bar", WAREHOUSE_PATH.key -> wareHouseDir, ConfVars.METASTOREWAREHOUSE.varname -> wareHouseDir, From d1947961a366cd76086b9996c4d9687b7c63f3f7 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 22 Oct 2020 23:38:26 +0800 Subject: [PATCH 6/6] dft val str --- .../scala/org/apache/spark/sql/internal/SharedState.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index cc23a33cf88e..1acdc4bd5f0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -240,13 +240,13 @@ object SharedState extends Logging { hadoopConf.setIfUnset(entry.getKey, entry.getValue) } } - val sparkWarehouse = + val sparkWarehouseOption = initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key)) // hive.metastore.warehouse.dir only stay in hadoopConf sparkConf.remove(hiveWarehouseKey) // Set the Hive metastore warehouse path to the one we use val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey) - val warehousePath = if (hiveWarehouseDir != null && sparkWarehouse.isEmpty) { + val warehousePath = if (hiveWarehouseDir != null && sparkWarehouseOption.isEmpty) { // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, // we will respect the value of hive.metastore.warehouse.dir. sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) @@ -258,7 +258,7 @@ object SharedState extends Logging { // the value of spark.sql.warehouse.dir. // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - val sparkWarehouseDir = sparkWarehouse.getOrElse(sparkConf.get(WAREHOUSE_PATH)) + val sparkWarehouseDir = sparkWarehouseOption.getOrElse(WAREHOUSE_PATH.defaultValueString) logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " + s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") sparkConf.set(WAREHOUSE_PATH.key, sparkWarehouseDir)