Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object SetCommand {
case class ResetCommand(config: Option[String]) extends RunnableCommand with IgnoreCachedData {

override def run(sparkSession: SparkSession): Seq[Row] = {
val defaults = sparkSession.sparkContext.conf
val defaults = sparkSession.sharedState.conf
config match {
case Some(key) =>
sparkSession.conf.unset(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ private[sql] class SharedState(

SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf, sparkContext.hadoopConfiguration)

private val (conf, hadoopConf) = {
private[sql] val (conf, hadoopConf) = {
// Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
// both spark conf and hadoop conf avoiding be affected by any SparkSession level options
SharedState.loadHiveConfFile(sparkContext.conf, sparkContext.hadoopConfiguration)
SharedState.loadHiveConfFile(
sparkContext.conf, sparkContext.hadoopConfiguration, initialConfigs)
val confClone = sparkContext.conf.clone()
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
Expand Down Expand Up @@ -227,7 +228,8 @@ object SharedState extends Logging {
*/
def loadHiveConfFile(
sparkConf: SparkConf,
hadoopConf: Configuration): Unit = {
hadoopConf: Configuration,
initialConfigs: scala.collection.Map[String, String] = Map.empty): Unit = {
val hiveWarehouseKey = "hive.metastore.warehouse.dir"
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
Expand All @@ -238,11 +240,13 @@ object SharedState extends Logging {
hadoopConf.setIfUnset(entry.getKey, entry.getValue)
}
}
val sparkWarehouse =
initialConfigs.get(WAREHOUSE_PATH.key).orElse(sparkConf.getOption(WAREHOUSE_PATH.key))
// hive.metastore.warehouse.dir only stay in hadoopConf
sparkConf.remove(hiveWarehouseKey)
// Set the Hive metastore warehouse path to the one we use
val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey)
val warehousePath = if (hiveWarehouseDir != null && !sparkConf.contains(WAREHOUSE_PATH.key)) {
val warehousePath = if (hiveWarehouseDir != null && sparkWarehouse.isEmpty) {
// If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
// we will respect the value of hive.metastore.warehouse.dir.
sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
Expand All @@ -254,9 +258,10 @@ object SharedState extends Logging {
// the value of spark.sql.warehouse.dir.
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH)
val sparkWarehouseDir = sparkWarehouse.getOrElse(sparkConf.get(WAREHOUSE_PATH))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sparkWarehouse already has .orElse(sparkConf.getOption(WAREHOUSE_PATH.key)). Do you mean sparkWarehouse.getOrElse(WAREHOUSE_PATH.defaultValueString)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, here will finally touch the default value string if never matched before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change to sparkWarehouse.getOrElse(WAREHOUSE_PATH.defaultValueString) so that it's more explicit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " +
s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
sparkConf.set(WAREHOUSE_PATH.key, sparkWarehouseDir)
hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir)
sparkWarehouseDir
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,53 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
session.stop()
}
}

test("SPARK-32991: Use conf in shared state as the original configuration for RESET") {
val wh = "spark.sql.warehouse.dir"
val td = "spark.sql.globalTempDatabase"
val custom = "spark.sql.custom"

val conf = new SparkConf()
.setMaster("local")
.setAppName("SPARK-32991")
.set(wh, "./data1")
.set(td, "bob")

val sc = new SparkContext(conf)

val spark = SparkSession.builder()
.config(wh, "./data2")
.config(td, "alice")
.config(custom, "kyao")
.getOrCreate()

val sharedWH = spark.sharedState.conf.get(wh)
val sharedTD = spark.sharedState.conf.get(td)
val sharedCustom = spark.sharedState.conf.get(custom)
assert(sharedWH === "./data2",
"The warehouse dir in shared state should be determined by the 1st created spark session")
assert(sharedTD === "alice",
"Static sql configs in shared state should be determined by the 1st created spark session")
assert(sharedCustom === "kyao",
"Dynamic sql configs in shared state should be determined by the 1st created spark session")


assert(spark.conf.get(wh) === sharedWH)
assert(spark.conf.get(td) === sharedTD)
assert(spark.conf.get(custom) === sharedCustom)

spark.sql("RESET")

assert(spark.conf.get(wh) === sharedWH,
"The warehouse dir in shared state should be respect after RESET")
assert(spark.conf.get(td) === sharedTD,
"Static sql configs in shared state should be respect after RESET")
assert(spark.conf.get(custom) === sharedCustom,
"Dynamic sql configs in shared state should be respect after RESET")

val spark2 = SparkSession.builder().getOrCreate()
assert(spark2.conf.get(wh) === sharedWH)
assert(spark2.conf.get(td) === sharedTD)
assert(spark2.conf.get(custom) === sharedCustom)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,45 @@ package org.apache.spark.sql.hive
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.internal.SharedState
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.util.Utils

class HiveSharedStateSuite extends SparkFunSuite {

override def beforeEach(): Unit = {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
super.beforeEach()
}

test("initial configs should be passed to SharedState but not SparkContext") {
val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
val sc = SparkContext.getOrCreate(conf)
val wareHouseDir = Utils.createTempDir().toString
val invalidPath = "invalid/path"
val metastorePath = Utils.createTempDir()
val tmpDb = "tmp_db"

// The initial configs used to generate SharedState, none of these should affect the global
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong now, as warehouse conf is an exception.

// shared SparkContext's configurations. Especially, all these configs are passed to the cloned
// confs inside SharedState except metastore warehouse dir.
// confs inside SharedState for share.
val initialConfigs = Map("spark.foo" -> "bar",
WAREHOUSE_PATH.key -> invalidPath,
ConfVars.METASTOREWAREHOUSE.varname -> invalidPath,
WAREHOUSE_PATH.key -> wareHouseDir,
ConfVars.METASTOREWAREHOUSE.varname -> wareHouseDir,
CATALOG_IMPLEMENTATION.key -> "hive",
ConfVars.METASTORECONNECTURLKEY.varname ->
s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true",
GLOBAL_TEMP_DATABASE.key -> tmpDb)

val state = new SharedState(sc, initialConfigs)
assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
"warehouse conf in session options can't affect application wide spark conf")
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath,
"warehouse conf in session options can't affect application wide hadoop conf")
val builder = SparkSession.builder()
initialConfigs.foreach { case (k, v) => builder.config(k, v) }
val ss = builder.getOrCreate()
val state = ss.sharedState
assert(sc.conf.get(WAREHOUSE_PATH.key) === wareHouseDir,
"initial warehouse conf in session options can affect application wide spark conf")
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) === wareHouseDir,
"initial warehouse conf in session options can affect application wide hadoop conf")

assert(!state.sparkContext.conf.contains("spark.foo"),
"static spark conf should not be affected by session")
Expand All @@ -57,9 +67,20 @@ class HiveSharedStateSuite extends SparkFunSuite {
val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
assert(client.getConf("spark.foo", "") === "bar",
"session level conf should be passed to catalog")
assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, invalidPath) !== invalidPath,
"session level conf should be passed to catalog except warehouse dir")
assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, "") === wareHouseDir,
"session level conf should be passed to catalog")

assert(state.globalTempViewManager.database === tmpDb)

val ss2 =
builder.config("spark.foo", "bar2222").config(WAREHOUSE_PATH.key, invalidPath).getOrCreate()

assert(ss2.sparkContext.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
"warehouse conf in session options can't affect application wide spark conf")
assert(ss2.sparkContext.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !==
invalidPath, "warehouse conf in session options can't affect application wide hadoop conf")
assert(ss.conf.get("spark.foo") === "bar2222", "session level conf should be passed to catalog")
assert(ss.conf.get(WAREHOUSE_PATH) !== invalidPath,
"session level conf should be passed to catalog")
}
}