Skip to content

Commit 3a4d15e

Browse files
Feng Liugatorsmile
authored andcommitted
[SPARK-23518][SQL] Avoid metastore access when the users only want to read and write data frames
## What changes were proposed in this pull request? #18944 added one patch, which allowed a spark session to be created when the hive metastore server is down. However, it did not allow running any commands with the spark session. This brings troubles to the user who only wants to read / write data frames without metastore setup. ## How was this patch tested? Added some unit tests to read and write data frames based on the original HiveMetastoreLazyInitializationSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Feng Liu <[email protected]> Closes #20681 from liufengdb/completely-lazy.
1 parent 0b6cead commit 3a4d15e

File tree

6 files changed

+34
-15
lines changed

6 files changed

+34
-15
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ sparkSession <- if (windows_with_hadoop()) {
6767
sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
6868
}
6969
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
70+
# materialize the catalog implementation
71+
listTables()
7072

7173
mockLines <- c("{\"name\":\"Michael\"}",
7274
"{\"name\":\"Andy\", \"age\":30}",

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ object SessionCatalog {
5454
* This class must be thread-safe.
5555
*/
5656
class SessionCatalog(
57-
val externalCatalog: ExternalCatalog,
58-
globalTempViewManager: GlobalTempViewManager,
57+
externalCatalogBuilder: () => ExternalCatalog,
58+
globalTempViewManagerBuilder: () => GlobalTempViewManager,
5959
functionRegistry: FunctionRegistry,
6060
conf: SQLConf,
6161
hadoopConf: Configuration,
@@ -70,8 +70,8 @@ class SessionCatalog(
7070
functionRegistry: FunctionRegistry,
7171
conf: SQLConf) {
7272
this(
73-
externalCatalog,
74-
new GlobalTempViewManager("global_temp"),
73+
() => externalCatalog,
74+
() => new GlobalTempViewManager("global_temp"),
7575
functionRegistry,
7676
conf,
7777
new Configuration(),
@@ -87,6 +87,9 @@ class SessionCatalog(
8787
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
8888
}
8989

90+
lazy val externalCatalog = externalCatalogBuilder()
91+
lazy val globalTempViewManager = globalTempViewManagerBuilder()
92+
9093
/** List of temporary views, mapping from table name to their logical plan. */
9194
@GuardedBy("this")
9295
protected val tempViews = new mutable.HashMap[String, LogicalPlan]

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,8 @@ abstract class BaseSessionStateBuilder(
130130
*/
131131
protected lazy val catalog: SessionCatalog = {
132132
val catalog = new SessionCatalog(
133-
session.sharedState.externalCatalog,
134-
session.sharedState.globalTempViewManager,
133+
() => session.sharedState.externalCatalog,
134+
() => session.sharedState.globalTempViewManager,
135135
functionRegistry,
136136
conf,
137137
SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreLazyInitializationSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,20 @@ class HiveMetastoreLazyInitializationSuite extends SparkFunSuite {
3838
// We should be able to run Spark jobs without Hive client.
3939
assert(spark.sparkContext.range(0, 1).count() === 1)
4040

41+
// We should be able to use Spark SQL if no table references.
42+
assert(spark.sql("select 1 + 1").count() === 1)
43+
assert(spark.range(0, 1).count() === 1)
44+
45+
// We should be able to use fs
46+
val path = Utils.createTempDir()
47+
path.delete()
48+
try {
49+
spark.range(0, 1).write.parquet(path.getAbsolutePath)
50+
assert(spark.read.parquet(path.getAbsolutePath).count() === 1)
51+
} finally {
52+
Utils.deleteRecursively(path)
53+
}
54+
4155
// Make sure that we are not using the local derby metastore.
4256
val exceptionString = Utils.exceptionString(intercept[AnalysisException] {
4357
spark.sql("show tables")

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,17 @@ import org.apache.spark.sql.types.{DecimalType, DoubleType}
3939

4040

4141
private[sql] class HiveSessionCatalog(
42-
externalCatalog: HiveExternalCatalog,
43-
globalTempViewManager: GlobalTempViewManager,
42+
externalCatalogBuilder: () => HiveExternalCatalog,
43+
globalTempViewManagerBuilder: () => GlobalTempViewManager,
4444
val metastoreCatalog: HiveMetastoreCatalog,
4545
functionRegistry: FunctionRegistry,
4646
conf: SQLConf,
4747
hadoopConf: Configuration,
4848
parser: ParserInterface,
4949
functionResourceLoader: FunctionResourceLoader)
5050
extends SessionCatalog(
51-
externalCatalog,
52-
globalTempViewManager,
51+
externalCatalogBuilder,
52+
globalTempViewManagerBuilder,
5353
functionRegistry,
5454
conf,
5555
hadoopConf,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,16 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
4242
* Create a Hive aware resource loader.
4343
*/
4444
override protected lazy val resourceLoader: HiveSessionResourceLoader = {
45-
val client: HiveClient = externalCatalog.client
46-
new HiveSessionResourceLoader(session, client)
45+
new HiveSessionResourceLoader(session, () => externalCatalog.client)
4746
}
4847

4948
/**
5049
* Create a [[HiveSessionCatalog]].
5150
*/
5251
override protected lazy val catalog: HiveSessionCatalog = {
5352
val catalog = new HiveSessionCatalog(
54-
externalCatalog,
55-
session.sharedState.globalTempViewManager,
53+
() => externalCatalog,
54+
() => session.sharedState.globalTempViewManager,
5655
new HiveMetastoreCatalog(session),
5756
functionRegistry,
5857
conf,
@@ -105,8 +104,9 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
105104

106105
class HiveSessionResourceLoader(
107106
session: SparkSession,
108-
client: HiveClient)
107+
clientBuilder: () => HiveClient)
109108
extends SessionResourceLoader(session) {
109+
private lazy val client = clientBuilder()
110110
override def addJar(path: String): Unit = {
111111
client.addJar(path)
112112
super.addJar(path)

0 commit comments

Comments
 (0)