Skip to content

Commit 78cbcbd

Browse files
author
Andrew Or
committed
Fix tests
The biggest change here is moving HiveMetastoreCatalog from HiveCatalog (the external one) to HiveSessionCatalog (the session specific one). This is needed because HMC depends on a lot of session specific things for, e.g. creating data source tables. This was failing tests that do things with multiple sessions, i.e. HiveQuerySuite.
1 parent 9f5154f commit 78cbcbd

File tree

16 files changed

+198
-160
lines changed

16 files changed

+198
-160
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
3434
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null)
3535

3636
/**
37-
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
37+
* Holds the name of a relation that has yet to be looked up in a catalog.
3838
*/
3939
case class UnresolvedRelation(
4040
tableIdentifier: TableIdentifier,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,17 +201,17 @@ class InMemoryCatalog extends ExternalCatalog {
201201
}
202202

203203
override def listTables(db: String): Seq[String] = synchronized {
204-
requireDbExists(db)
205-
catalog(db).tables.keySet.toSeq
204+
if (databaseExists(db)) {
205+
catalog(db).tables.keySet.toSeq
206+
} else {
207+
Seq()
208+
}
206209
}
207210

208211
override def listTables(db: String, pattern: String): Seq[String] = synchronized {
209-
requireDbExists(db)
210212
filterPattern(listTables(db), pattern)
211213
}
212214

213-
override def refreshTable(db: String, table: String): Unit = { /* no-op */ }
214-
215215
// --------------------------------------------------------------------------
216216
// Partitions
217217
// --------------------------------------------------------------------------

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,13 @@ class SessionCatalog(externalCatalog: ExternalCatalog, caseSensitiveAnalysis: Bo
4141
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
4242
// check whether the temporary table or function exists, then, if not, operate on
4343
// the corresponding item in the current database.
44-
protected[this] var currentDb = "default"
44+
protected[this] var currentDb = {
45+
val defaultName = "default"
46+
val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map())
47+
// Initialize default database if it doesn't already exist
48+
createDatabase(defaultDbDefinition, ignoreIfExists = true)
49+
defaultName
50+
}
4551

4652
/**
4753
* Format table name, taking into account case sensitivity.
@@ -239,7 +245,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, caseSensitiveAnalysis: Bo
239245
if (name.database.isDefined || !tempTables.containsKey(table)) {
240246
externalCatalog.tableExists(db, table)
241247
} else {
242-
true
248+
true // it's a temporary table
243249
}
244250
}
245251

@@ -264,11 +270,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, caseSensitiveAnalysis: Bo
264270
/**
265271
* Refresh the cache entry for a metastore table, if any.
266272
*/
267-
def refreshTable(name: TableIdentifier): Unit = {
268-
val db = name.database.getOrElse(currentDb)
269-
val table = formatTableName(name.table)
270-
externalCatalog.refreshTable(db, table)
271-
}
273+
def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
272274

273275
/**
274276
* Return a temporary table exactly as it was stored.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ abstract class ExternalCatalog {
9797

9898
def listTables(db: String, pattern: String): Seq[String]
9999

100-
def refreshTable(db: String, table: String)
101-
102100
// --------------------------------------------------------------------------
103101
// Partitions
104102
// --------------------------------------------------------------------------

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,10 @@ class AnalysisSuite extends AnalysisTest {
161161
}
162162

163163
test("resolve relations") {
164-
assertAnalysisError(
165-
UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))
166-
164+
assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq())
167165
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
168-
169166
checkAnalysis(
170167
UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false)
171-
172168
checkAnalysis(
173169
UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false)
174170
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala

Lines changed: 5 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,16 @@ import org.apache.spark.sql.AnalysisException
2727
import org.apache.spark.sql.catalyst.TableIdentifier
2828
import org.apache.spark.sql.catalyst.analysis.NoSuchItemException
2929
import org.apache.spark.sql.catalyst.catalog._
30-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
31-
import org.apache.spark.sql.catalyst.rules.Rule
32-
import org.apache.spark.sql.execution.datasources.BucketSpec
3330
import org.apache.spark.sql.hive.client.HiveClient
34-
import org.apache.spark.sql.types.StructType
3531

3632

3733
/**
3834
* A persistent implementation of the system catalog using Hive.
3935
* All public methods must be synchronized for thread-safety.
4036
*/
41-
private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog with Logging {
37+
private[spark] class HiveCatalog(var client: HiveClient) extends ExternalCatalog with Logging {
4238
import ExternalCatalog._
4339

44-
// Legacy catalog for handling data source tables.
45-
// TODO: integrate this in a better way; it's confusing to have a catalog in a catalog.
46-
private var metastoreCatalog: HiveMetastoreCatalog = _
47-
4840
// Exceptions thrown by the hive client that we would like to wrap
4941
private val clientExceptions = Set(
5042
classOf[HiveException].getCanonicalName,
@@ -93,19 +85,14 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
9385
withClient { getTable(db, table) }
9486
}
9587

96-
private def requireInitialized(): Unit = {
97-
require(metastoreCatalog != null, "catalog not yet initialized!")
98-
}
99-
10088
/**
101-
* Initialize [[HiveMetastoreCatalog]] when the [[HiveContext]] is ready.
102-
* This is needed to avoid initialization order cycles with [[HiveContext]].
89+
* Switch our client to one that belongs to the new session.
10390
*/
104-
def initialize(hiveContext: HiveContext): Unit = {
105-
metastoreCatalog = new HiveMetastoreCatalog(client, hiveContext)
91+
def newSession(newClient: HiveClient): this.type = {
92+
client = newClient
93+
this
10694
}
10795

108-
10996
// --------------------------------------------------------------------------
11097
// Databases
11198
// --------------------------------------------------------------------------
@@ -207,19 +194,13 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
207194
}
208195

209196
override def listTables(db: String): Seq[String] = withClient {
210-
requireDbExists(db)
211197
client.listTables(db)
212198
}
213199

214200
override def listTables(db: String, pattern: String): Seq[String] = withClient {
215-
requireDbExists(db)
216201
client.listTables(db, pattern)
217202
}
218203

219-
override def refreshTable(db: String, table: String): Unit = {
220-
refreshTable(TableIdentifier(table, Some(db)))
221-
}
222-
223204
// --------------------------------------------------------------------------
224205
// Partitions
225206
// --------------------------------------------------------------------------
@@ -319,69 +300,4 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit
319300
client.listFunctions(db, pattern)
320301
}
321302

322-
323-
// ----------------------------------------------------------------
324-
// | Methods and fields for interacting with HiveMetastoreCatalog |
325-
// ----------------------------------------------------------------
326-
327-
lazy val ParquetConversions: Rule[LogicalPlan] = {
328-
requireInitialized()
329-
metastoreCatalog.ParquetConversions
330-
}
331-
332-
lazy val CreateTables: Rule[LogicalPlan] = {
333-
requireInitialized()
334-
metastoreCatalog.CreateTables
335-
}
336-
337-
lazy val PreInsertionCasts: Rule[LogicalPlan] = {
338-
requireInitialized()
339-
metastoreCatalog.PreInsertionCasts
340-
}
341-
342-
def refreshTable(table: TableIdentifier): Unit = {
343-
requireInitialized()
344-
metastoreCatalog.refreshTable(table)
345-
}
346-
347-
def invalidateTable(table: TableIdentifier): Unit = {
348-
requireInitialized()
349-
metastoreCatalog.invalidateTable(table)
350-
}
351-
352-
def invalidateCache(): Unit = {
353-
requireInitialized()
354-
metastoreCatalog.cachedDataSourceTables.invalidateAll()
355-
}
356-
357-
def createDataSourceTable(
358-
table: TableIdentifier,
359-
userSpecifiedSchema: Option[StructType],
360-
partitionColumns: Array[String],
361-
bucketSpec: Option[BucketSpec],
362-
provider: String,
363-
options: Map[String, String],
364-
isExternal: Boolean): Unit = {
365-
requireInitialized()
366-
metastoreCatalog.createDataSourceTable(
367-
table, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal)
368-
}
369-
370-
def lookupRelation(table: TableIdentifier, alias: Option[String]): LogicalPlan = {
371-
requireInitialized()
372-
metastoreCatalog.lookupRelation(table, alias)
373-
}
374-
375-
def hiveDefaultTableFilePath(table: TableIdentifier): String = {
376-
requireInitialized()
377-
metastoreCatalog.hiveDefaultTableFilePath(table)
378-
}
379-
380-
// For testing only
381-
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
382-
requireInitialized()
383-
val key = metastoreCatalog.getQualifiedTableName(table)
384-
metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
385-
}
386-
387303
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,23 +114,22 @@ class HiveContext private[hive](
114114

115115
logDebug("create HiveContext")
116116

117-
// Initialize catalog after context creation to avoid initialization ordering issues
118-
hiveCatalog.initialize(this)
119-
120117
/**
121118
* Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF,
122119
* temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader
123120
* and Hive client (both of execution and metadata) with existing HiveContext.
124121
*/
125122
override def newSession(): HiveContext = {
123+
val newExecutionHive = executionHive.newSession()
124+
val newMetadataHive = metadataHive.newSession()
126125
new HiveContext(
127126
sc = sc,
128127
cacheManager = cacheManager,
129128
listener = listener,
130-
executionHive = executionHive.newSession(),
131-
metadataHive = metadataHive.newSession(),
129+
executionHive = newExecutionHive,
130+
metadataHive = newMetadataHive,
132131
isRootContext = false,
133-
hiveCatalog = hiveCatalog)
132+
hiveCatalog = hiveCatalog.newSession(newMetadataHive))
134133
}
135134

136135
@transient
@@ -211,12 +210,12 @@ class HiveContext private[hive](
211210
*/
212211
def refreshTable(tableName: String): Unit = {
213212
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
214-
hiveCatalog.refreshTable(tableIdent)
213+
sessionState.sessionCatalog.refreshTable(tableIdent)
215214
}
216215

217216
protected[hive] def invalidateTable(tableName: String): Unit = {
218217
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
219-
hiveCatalog.invalidateTable(tableIdent)
218+
sessionState.sessionCatalog.invalidateTable(tableIdent)
220219
}
221220

222221
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,29 @@ package org.apache.spark.sql.hive
2020
import org.apache.spark.sql.catalyst.TableIdentifier
2121
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
2222
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
23+
import org.apache.spark.sql.catalyst.rules.Rule
24+
import org.apache.spark.sql.execution.datasources.BucketSpec
25+
import org.apache.spark.sql.hive.client.HiveClient
26+
import org.apache.spark.sql.types.StructType
2327

2428

25-
class HiveSessionCatalog(hiveCatalog: HiveCatalog) extends SessionCatalog(hiveCatalog) {
29+
class HiveSessionCatalog(
30+
externalCatalog: HiveCatalog,
31+
client: HiveClient,
32+
context: HiveContext,
33+
caseSensitiveAnalysis: Boolean)
34+
extends SessionCatalog(externalCatalog, caseSensitiveAnalysis) {
35+
36+
override def setCurrentDatabase(db: String): Unit = {
37+
super.setCurrentDatabase(db)
38+
client.setCurrentDatabase(db)
39+
}
2640

2741
override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
2842
val table = formatTableName(name.table)
2943
if (name.database.isDefined || !tempTables.containsKey(table)) {
3044
val newName = name.copy(table = table)
31-
hiveCatalog.lookupRelation(newName, alias)
45+
metastoreCatalog.lookupRelation(newName, alias)
3246
} else {
3347
val relation = tempTables.get(table)
3448
val tableWithQualifiers = SubqueryAlias(table, relation)
@@ -38,4 +52,52 @@ class HiveSessionCatalog(hiveCatalog: HiveCatalog) extends SessionCatalog(hiveCa
3852
}
3953
}
4054

55+
// ----------------------------------------------------------------
56+
// | Methods and fields for interacting with HiveMetastoreCatalog |
57+
// ----------------------------------------------------------------
58+
59+
// Catalog for handling data source tables. TODO: This really doesn't belong here since it is
60+
// essentially a cache for metastore tables. However, it relies on a lot of session-specific
61+
// things so it would be a lot of work to split its functionality between HiveSessionCatalog
62+
// and HiveCatalog. We should still do it at some point...
63+
private val metastoreCatalog = new HiveMetastoreCatalog(client, context)
64+
65+
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
66+
val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
67+
val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts
68+
69+
override def refreshTable(name: TableIdentifier): Unit = {
70+
metastoreCatalog.refreshTable(name)
71+
}
72+
73+
def invalidateTable(name: TableIdentifier): Unit = {
74+
metastoreCatalog.invalidateTable(name)
75+
}
76+
77+
def invalidateCache(): Unit = {
78+
metastoreCatalog.cachedDataSourceTables.invalidateAll()
79+
}
80+
81+
def createDataSourceTable(
82+
name: TableIdentifier,
83+
userSpecifiedSchema: Option[StructType],
84+
partitionColumns: Array[String],
85+
bucketSpec: Option[BucketSpec],
86+
provider: String,
87+
options: Map[String, String],
88+
isExternal: Boolean): Unit = {
89+
metastoreCatalog.createDataSourceTable(
90+
name, userSpecifiedSchema, partitionColumns, bucketSpec, provider, options, isExternal)
91+
}
92+
93+
def hiveDefaultTableFilePath(name: TableIdentifier): String = {
94+
metastoreCatalog.hiveDefaultTableFilePath(name)
95+
}
96+
97+
// For testing only
98+
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
99+
val key = metastoreCatalog.getQualifiedTableName(table)
100+
metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
101+
}
102+
41103
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
3737
/**
3838
* Internal catalog for managing table and database states.
3939
*/
40-
override lazy val sessionCatalog = new HiveSessionCatalog(ctx.hiveCatalog)
40+
override lazy val sessionCatalog = {
41+
new HiveSessionCatalog(
42+
ctx.hiveCatalog, ctx.metadataHive, ctx, caseSensitiveAnalysis = false)
43+
}
4144

4245
/**
4346
* Internal catalog for managing functions registered by the user.
@@ -53,9 +56,9 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
5356
override lazy val analyzer: Analyzer = {
5457
new Analyzer(sessionCatalog, functionRegistry, conf) {
5558
override val extendedResolutionRules =
56-
ctx.hiveCatalog.ParquetConversions ::
57-
ctx.hiveCatalog.CreateTables ::
58-
ctx.hiveCatalog.PreInsertionCasts ::
59+
sessionCatalog.ParquetConversions ::
60+
sessionCatalog.CreateTables ::
61+
sessionCatalog.PreInsertionCasts ::
5962
python.ExtractPythonUDFs ::
6063
PreInsertCastAndRename ::
6164
DataSourceAnalysis ::

0 commit comments

Comments
 (0)