Skip to content

Commit

Permalink
[SPARK-26794][SQL] SparkSession enableHiveSupport does not point to h…
Browse files Browse the repository at this point in the history
…ive but in-memory while the SparkContext exists

## What changes were proposed in this pull request?

```java
public class SqlDemo {
    public static void main(final String[] args) throws Exception {
        SparkConf conf = new SparkConf().setAppName("spark-sql-demo");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SparkSession ss = SparkSession.builder().enableHiveSupport().getOrCreate();
        ss.sql("show databases").show();
    }
}
```
Before https://issues.apache.org/jira/browse/SPARK-20946, the demo above point to the right hive metastore if the hive-site.xml is present. But now it can only point to the default in-memory one.

Catalog is now as a variable shared across SparkSessions, it is instantiated with SparkContext's conf. After https://issues.apache.org/jira/browse/SPARK-20946, Session level configs are not pass to SparkContext's conf anymore, so the enableHiveSupport API takes no affect on the catalog instance.

You can set spark.sql.catalogImplementation=hive application wide to solve the problem, or never create a sc before you call SparkSession.builder().enableHiveSupport().getOrCreate()

Here we respect the SparkSession level configuration at the first time to generate catalog within SharedState

## How was this patch tested?

1. add ut
2. manually
```scala
test("enableHiveSupport has right to determine the catalog while using an existing sc") {
    val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
    val sc = SparkContext.getOrCreate(conf)
    val ss = SparkSession.builder().enableHiveSupport().getOrCreate()
    assert(ss.sharedState.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog],
      "The catalog should be hive ")

    val ss2 = SparkSession.builder().getOrCreate()
    assert(ss2.sharedState.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog],
      "The catalog should be shared across sessions")
  }
```

Without this fix, the above test will fail.
You can apply it to `org.apache.spark.sql.hive.HiveSharedStateSuite`,
and run,
```sbt
./build/sbt  -Phadoop-2.7 -Phive  "hive/testOnly org.apache.spark.sql.hive.HiveSharedStateSuite"
```
to verify.

Closes apache#23709 from yaooqinn/SPARK-26794.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
yaooqinn authored and IceMimosa committed Jun 13, 2019
1 parent a96a1a1 commit fd460fa
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class SparkSession private(
@InterfaceStability.Unstable
@transient
lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sparkContext))
existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}

/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*
* @param sparkContext The Spark context associated with this SharedState
* @param initialConfigs The configs from the very first created SparkSession
*/
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
private[sql] class SharedState(
val sparkContext: SparkContext,
initialConfigs: scala.collection.Map[String, String])
extends Logging {

// Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
// the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
Expand Down Expand Up @@ -77,6 +83,27 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
}
logInfo(s"Warehouse path is '$warehousePath'.")

// These 2 variables should be initiated after `warehousePath`, because in the first place we need
// to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
// both spark conf and hadoop conf avoiding be affected by any SparkSession level options
private val (conf, hadoopConf) = {
val confClone = sparkContext.conf.clone()
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
initialConfigs.foreach {
case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key =>
logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " +
s"in SparkSession's options, it should be set statically for cross-session usages")
case (k, v) =>
logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v")
confClone.set(k, v)
hadoopConfClone.set(k, v)

}
(confClone, hadoopConfClone)
}

/**
* Class for caching query results reused in future executions.
Expand All @@ -89,7 +116,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
val statusStore: SQLAppStatusStore = {
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
val listener = new SQLAppStatusListener(conf, kvStore, live = true)
sparkContext.listenerBus.addToStatusQueue(listener)
val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
sparkContext.ui.foreach(new SQLTab(statusStore, _))
Expand All @@ -101,9 +128,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
*/
lazy val externalCatalog: ExternalCatalogWithListener = {
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)
SharedState.externalCatalogClassName(conf), conf, hadoopConf)

val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
Expand Down Expand Up @@ -137,7 +162,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
// System preserved database should not exists in metastore. However it's hard to guarantee it
// for every session, because case-sensitivity differs. Here we always lowercase it to make our
// life easier.
val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
if (externalCatalog.databaseExists(globalTempDB)) {
throw new SparkException(
s"$globalTempDB is a system preserved database, please rename your existing database " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private[hive] class TestHiveExternalCatalog(
private[hive] class TestHiveSharedState(
sc: SparkContext,
hiveClient: Option[HiveClient] = None)
extends SharedState(sc) {
extends SharedState(sc, initialConfigs = Map.empty[String, String]) {

override lazy val externalCatalog: ExternalCatalogWithListener = {
new ExternalCatalogWithListener(new TestHiveExternalCatalog(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.internal.SharedState
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.util.Utils

class HiveSharedStateSuite extends SparkFunSuite {

test("initial configs should be passed to SharedState but not SparkContext") {
val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
val sc = SparkContext.getOrCreate(conf)
val invalidPath = "invalid/path"
val metastorePath = Utils.createTempDir()
val tmpDb = "tmp_db"

// The initial configs used to generate SharedState, none of these should affect the global
// shared SparkContext's configurations. Especially, all these configs are passed to the cloned
// confs inside SharedState except metastore warehouse dir.
val initialConfigs = Map("spark.foo" -> "bar",
WAREHOUSE_PATH.key -> invalidPath,
ConfVars.METASTOREWAREHOUSE.varname -> invalidPath,
CATALOG_IMPLEMENTATION.key -> "hive",
ConfVars.METASTORECONNECTURLKEY.varname ->
s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true",
GLOBAL_TEMP_DATABASE.key -> tmpDb)

val state = new SharedState(sc, initialConfigs)
assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options")
assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
"warehouse conf in session options can't affect application wide spark conf")
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath,
"warehouse conf in session options can't affect application wide hadoop conf")

assert(!state.sparkContext.conf.contains("spark.foo"),
"static spark conf should not be affected by session")
assert(state.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog],
"Initial SparkSession options can determine the catalog")
val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
assert(client.getConf("spark.foo", "") === "bar",
"session level conf should be passed to catalog")
assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, invalidPath) !== invalidPath,
"session level conf should be passed to catalog except warehouse dir")

assert(state.globalTempViewManager.database === tmpDb)
}
}

0 comments on commit fd460fa

Please sign in to comment.