Skip to content

Commit eebc8c1

Browse files
author
Andrew Or
committed
[SPARK-13923][SPARK-14014][SQL] Session catalog follow-ups
## What changes were proposed in this pull request? This patch addresses the remaining comments left in apache#11750 and apache#11918 after they are merged. For a full list of changes in this patch, just trace the commits. ## How was this patch tested? `SessionCatalogSuite` and `CatalogTestCases` Author: Andrew Or <[email protected]> Closes apache#12006 from andrewor14/session-catalog-followup.
1 parent 34c0638 commit eebc8c1

File tree

24 files changed

+131
-125
lines changed

24 files changed

+131
-125
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class InMemoryCatalog extends ExternalCatalog {
155155
tableDefinition: CatalogTable,
156156
ignoreIfExists: Boolean): Unit = synchronized {
157157
requireDbExists(db)
158-
val table = tableDefinition.name.table
158+
val table = tableDefinition.identifier.table
159159
if (tableExists(db, table)) {
160160
if (!ignoreIfExists) {
161161
throw new AnalysisException(s"Table '$table' already exists in database '$db'")
@@ -182,14 +182,14 @@ class InMemoryCatalog extends ExternalCatalog {
182182
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
183183
requireTableExists(db, oldName)
184184
val oldDesc = catalog(db).tables(oldName)
185-
oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db)))
185+
oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
186186
catalog(db).tables.put(newName, oldDesc)
187187
catalog(db).tables.remove(oldName)
188188
}
189189

190190
override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized {
191-
requireTableExists(db, tableDefinition.name.table)
192-
catalog(db).tables(tableDefinition.name.table).table = tableDefinition
191+
requireTableExists(db, tableDefinition.identifier.table)
192+
catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition
193193
}
194194

195195
override def getTable(db: String, table: String): CatalogTable = synchronized {
@@ -296,10 +296,10 @@ class InMemoryCatalog extends ExternalCatalog {
296296

297297
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
298298
requireDbExists(db)
299-
if (functionExists(db, func.name.funcName)) {
299+
if (functionExists(db, func.identifier.funcName)) {
300300
throw new AnalysisException(s"Function '$func' already exists in '$db' database")
301301
} else {
302-
catalog(db).functions.put(func.name.funcName, func)
302+
catalog(db).functions.put(func.identifier.funcName, func)
303303
}
304304
}
305305

@@ -310,14 +310,14 @@ class InMemoryCatalog extends ExternalCatalog {
310310

311311
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
312312
requireFunctionExists(db, oldName)
313-
val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db)))
313+
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
314314
catalog(db).functions.remove(oldName)
315315
catalog(db).functions.put(newName, newFunc)
316316
}
317317

318318
override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
319-
requireFunctionExists(db, funcDefinition.name.funcName)
320-
catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
319+
requireFunctionExists(db, funcDefinition.identifier.funcName)
320+
catalog(db).functions.put(funcDefinition.identifier.funcName, funcDefinition)
321321
}
322322

323323
override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.catalog
1919

20-
import java.util.concurrent.ConcurrentHashMap
21-
22-
import scala.collection.JavaConverters._
20+
import scala.collection.mutable
2321

2422
import org.apache.spark.sql.AnalysisException
2523
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
@@ -31,6 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
3129
* An internal catalog that is used by a Spark Session. This internal catalog serves as a
3230
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
3331
* tables and functions of the Spark Session that it belongs to.
32+
*
33+
* This class is not thread-safe.
3434
*/
3535
class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
3636
import ExternalCatalog._
@@ -39,8 +39,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
3939
this(externalCatalog, new SimpleCatalystConf(true))
4040
}
4141

42-
protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
43-
protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction]
42+
protected[this] val tempTables = new mutable.HashMap[String, LogicalPlan]
43+
protected[this] val tempFunctions = new mutable.HashMap[String, CatalogFunction]
4444

4545
// Note: we track current database here because certain operations do not explicitly
4646
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
@@ -122,9 +122,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
122122
* If no such database is specified, create it in the current database.
123123
*/
124124
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
125-
val db = tableDefinition.name.database.getOrElse(currentDb)
126-
val table = formatTableName(tableDefinition.name.table)
127-
val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
125+
val db = tableDefinition.identifier.database.getOrElse(currentDb)
126+
val table = formatTableName(tableDefinition.identifier.table)
127+
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
128128
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
129129
}
130130

@@ -138,9 +138,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
138138
* this becomes a no-op.
139139
*/
140140
def alterTable(tableDefinition: CatalogTable): Unit = {
141-
val db = tableDefinition.name.database.getOrElse(currentDb)
142-
val table = formatTableName(tableDefinition.name.table)
143-
val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db)))
141+
val db = tableDefinition.identifier.database.getOrElse(currentDb)
142+
val table = formatTableName(tableDefinition.identifier.table)
143+
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
144144
externalCatalog.alterTable(db, newTableDefinition)
145145
}
146146

@@ -164,9 +164,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
164164
def createTempTable(
165165
name: String,
166166
tableDefinition: LogicalPlan,
167-
ignoreIfExists: Boolean): Unit = {
167+
overrideIfExists: Boolean): Unit = {
168168
val table = formatTableName(name)
169-
if (tempTables.containsKey(table) && !ignoreIfExists) {
169+
if (tempTables.contains(table) && !overrideIfExists) {
170170
throw new AnalysisException(s"Temporary table '$name' already exists.")
171171
}
172172
tempTables.put(table, tableDefinition)
@@ -188,10 +188,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
188188
val db = oldName.database.getOrElse(currentDb)
189189
val oldTableName = formatTableName(oldName.table)
190190
val newTableName = formatTableName(newName.table)
191-
if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) {
191+
if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
192192
externalCatalog.renameTable(db, oldTableName, newTableName)
193193
} else {
194-
val table = tempTables.remove(oldTableName)
194+
val table = tempTables(oldTableName)
195+
tempTables.remove(oldTableName)
195196
tempTables.put(newTableName, table)
196197
}
197198
}
@@ -206,7 +207,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
206207
def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
207208
val db = name.database.getOrElse(currentDb)
208209
val table = formatTableName(name.table)
209-
if (name.database.isDefined || !tempTables.containsKey(table)) {
210+
if (name.database.isDefined || !tempTables.contains(table)) {
210211
externalCatalog.dropTable(db, table, ignoreIfNotExists)
211212
} else {
212213
tempTables.remove(table)
@@ -224,11 +225,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
224225
val db = name.database.getOrElse(currentDb)
225226
val table = formatTableName(name.table)
226227
val relation =
227-
if (name.database.isDefined || !tempTables.containsKey(table)) {
228+
if (name.database.isDefined || !tempTables.contains(table)) {
228229
val metadata = externalCatalog.getTable(db, table)
229230
CatalogRelation(db, metadata, alias)
230231
} else {
231-
tempTables.get(table)
232+
tempTables(table)
232233
}
233234
val qualifiedTable = SubqueryAlias(table, relation)
234235
// If an alias was specified by the lookup, wrap the plan in a subquery so that
@@ -247,7 +248,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
247248
def tableExists(name: TableIdentifier): Boolean = {
248249
val db = name.database.getOrElse(currentDb)
249250
val table = formatTableName(name.table)
250-
if (name.database.isDefined || !tempTables.containsKey(table)) {
251+
if (name.database.isDefined || !tempTables.contains(table)) {
251252
externalCatalog.tableExists(db, table)
252253
} else {
253254
true // it's a temporary table
@@ -266,7 +267,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
266267
val dbTables =
267268
externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) }
268269
val regex = pattern.replaceAll("\\*", ".*").r
269-
val _tempTables = tempTables.keys().asScala
270+
val _tempTables = tempTables.keys.toSeq
270271
.filter { t => regex.pattern.matcher(t).matches() }
271272
.map { t => TableIdentifier(t) }
272273
dbTables ++ _tempTables
@@ -290,7 +291,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
290291
* For testing only.
291292
*/
292293
private[catalog] def getTempTable(name: String): Option[LogicalPlan] = {
293-
Option(tempTables.get(name))
294+
tempTables.get(name)
294295
}
295296

296297
// ----------------------------------------------------------------------------
@@ -399,9 +400,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
399400
* If no such database is specified, create it in the current database.
400401
*/
401402
def createFunction(funcDefinition: CatalogFunction): Unit = {
402-
val db = funcDefinition.name.database.getOrElse(currentDb)
403+
val db = funcDefinition.identifier.database.getOrElse(currentDb)
403404
val newFuncDefinition = funcDefinition.copy(
404-
name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
405+
identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
405406
externalCatalog.createFunction(db, newFuncDefinition)
406407
}
407408

@@ -424,9 +425,9 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
424425
* this becomes a no-op.
425426
*/
426427
def alterFunction(funcDefinition: CatalogFunction): Unit = {
427-
val db = funcDefinition.name.database.getOrElse(currentDb)
428+
val db = funcDefinition.identifier.database.getOrElse(currentDb)
428429
val newFuncDefinition = funcDefinition.copy(
429-
name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
430+
identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
430431
externalCatalog.alterFunction(db, newFuncDefinition)
431432
}
432433

@@ -439,10 +440,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
439440
* This assumes no database is specified in `funcDefinition`.
440441
*/
441442
def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
442-
require(funcDefinition.name.database.isEmpty,
443+
require(funcDefinition.identifier.database.isEmpty,
443444
"attempted to create a temporary function while specifying a database")
444-
val name = funcDefinition.name.funcName
445-
if (tempFunctions.containsKey(name) && !ignoreIfExists) {
445+
val name = funcDefinition.identifier.funcName
446+
if (tempFunctions.contains(name) && !ignoreIfExists) {
446447
throw new AnalysisException(s"Temporary function '$name' already exists.")
447448
}
448449
tempFunctions.put(name, funcDefinition)
@@ -455,7 +456,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
455456
// Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate
456457
// dropFunction and dropTempFunction.
457458
def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
458-
if (!tempFunctions.containsKey(name) && !ignoreIfNotExists) {
459+
if (!tempFunctions.contains(name) && !ignoreIfNotExists) {
459460
throw new AnalysisException(
460461
s"Temporary function '$name' cannot be dropped because it does not exist!")
461462
}
@@ -476,11 +477,12 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
476477
throw new AnalysisException("rename does not support moving functions across databases")
477478
}
478479
val db = oldName.database.getOrElse(currentDb)
479-
if (oldName.database.isDefined || !tempFunctions.containsKey(oldName.funcName)) {
480+
if (oldName.database.isDefined || !tempFunctions.contains(oldName.funcName)) {
480481
externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
481482
} else {
482-
val func = tempFunctions.remove(oldName.funcName)
483-
val newFunc = func.copy(name = func.name.copy(funcName = newName.funcName))
483+
val func = tempFunctions(oldName.funcName)
484+
val newFunc = func.copy(identifier = func.identifier.copy(funcName = newName.funcName))
485+
tempFunctions.remove(oldName.funcName)
484486
tempFunctions.put(newName.funcName, newFunc)
485487
}
486488
}
@@ -494,10 +496,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
494496
*/
495497
def getFunction(name: FunctionIdentifier): CatalogFunction = {
496498
val db = name.database.getOrElse(currentDb)
497-
if (name.database.isDefined || !tempFunctions.containsKey(name.funcName)) {
499+
if (name.database.isDefined || !tempFunctions.contains(name.funcName)) {
498500
externalCatalog.getFunction(db, name.funcName)
499501
} else {
500-
tempFunctions.get(name.funcName)
502+
tempFunctions(name.funcName)
501503
}
502504
}
503505

@@ -510,7 +512,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
510512
val dbFunctions =
511513
externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
512514
val regex = pattern.replaceAll("\\*", ".*").r
513-
val _tempFunctions = tempFunctions.keys().asScala
515+
val _tempFunctions = tempFunctions.keys.toSeq
514516
.filter { f => regex.pattern.matcher(f).matches() }
515517
.map { f => FunctionIdentifier(f) }
516518
dbFunctions ++ _tempFunctions
@@ -520,7 +522,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
520522
* Return a temporary function. For testing only.
521523
*/
522524
private[catalog] def getTempFunction(name: String): Option[CatalogFunction] = {
523-
Option(tempFunctions.get(name))
525+
tempFunctions.get(name)
524526
}
525527

526528
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,10 @@ abstract class ExternalCatalog {
169169
/**
170170
* A function defined in the catalog.
171171
*
172-
* @param name name of the function
172+
* @param identifier name of the function
173173
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
174174
*/
175-
case class CatalogFunction(name: FunctionIdentifier, className: String)
175+
case class CatalogFunction(identifier: FunctionIdentifier, className: String)
176176

177177

178178
/**
@@ -216,7 +216,7 @@ case class CatalogTablePartition(
216216
* future once we have a better understanding of how we want to handle skewed columns.
217217
*/
218218
case class CatalogTable(
219-
name: TableIdentifier,
219+
identifier: TableIdentifier,
220220
tableType: CatalogTableType,
221221
storage: CatalogStorageFormat,
222222
schema: Seq[CatalogColumn],
@@ -230,12 +230,12 @@ case class CatalogTable(
230230
viewText: Option[String] = None) {
231231

232232
/** Return the database this table was specified to belong to, assuming it exists. */
233-
def database: String = name.database.getOrElse {
234-
throw new AnalysisException(s"table $name did not specify database")
233+
def database: String = identifier.database.getOrElse {
234+
throw new AnalysisException(s"table $identifier did not specify database")
235235
}
236236

237237
/** Return the fully qualified name of this table, assuming the database was specified. */
238-
def qualifiedName: String = name.unquotedString
238+
def qualifiedName: String = identifier.unquotedString
239239

240240
/** Syntactic sugar to update a field in `storage`. */
241241
def withNewStorage(
@@ -290,6 +290,6 @@ case class CatalogRelation(
290290
// TODO: implement this
291291
override def output: Seq[Attribute] = Seq.empty
292292

293-
require(metadata.name.database == Some(db),
293+
require(metadata.identifier.database == Some(db),
294294
"provided database does not much the one specified in the table definition")
295295
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ trait AnalysisTest extends PlanTest {
3131
private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
3232
val conf = new SimpleCatalystConf(caseSensitive)
3333
val catalog = new SessionCatalog(new InMemoryCatalog, conf)
34-
catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true)
34+
catalog.createTempTable("TaBlE", TestRelations.testRelation, overrideIfExists = true)
3535
new Analyzer(catalog, EmptyFunctionRegistry, conf) {
3636
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
3737
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
5252
private val b: Expression = UnresolvedAttribute("b")
5353

5454
before {
55-
catalog.createTempTable("table", relation, ignoreIfExists = true)
55+
catalog.createTempTable("table", relation, overrideIfExists = true)
5656
}
5757

5858
private def checkType(expression: Expression, expectedType: DataType): Unit = {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
210210
}
211211

212212
test("get table") {
213-
assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1")
213+
assert(newBasicCatalog().getTable("db2", "tbl1").identifier.table == "tbl1")
214214
}
215215

216216
test("get table when database/table does not exist") {
@@ -452,7 +452,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
452452
assert(catalog.getFunction("db2", "func1").className == funcClass)
453453
catalog.renameFunction("db2", "func1", newName)
454454
intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
455-
assert(catalog.getFunction("db2", newName).name.funcName == newName)
455+
assert(catalog.getFunction("db2", newName).identifier.funcName == newName)
456456
assert(catalog.getFunction("db2", newName).className == funcClass)
457457
intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") }
458458
}
@@ -549,7 +549,7 @@ abstract class CatalogTestUtils {
549549

550550
def newTable(name: String, database: Option[String] = None): CatalogTable = {
551551
CatalogTable(
552-
name = TableIdentifier(name, database),
552+
identifier = TableIdentifier(name, database),
553553
tableType = CatalogTableType.EXTERNAL_TABLE,
554554
storage = storageFormat,
555555
schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),

0 commit comments

Comments
 (0)