Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SessionCatalog(
CatalogDatabase(defaultName, "default database", conf.warehousePath, Map())
// Initialize default database if it doesn't already exist
createDatabase(defaultDbDefinition, ignoreIfExists = true)
defaultName
formatDatabaseName(defaultName)
}

/**
Expand All @@ -92,6 +92,13 @@ class SessionCatalog(
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}

/**
* Format database name, taking into account case sensitivity.
*/
protected[this] def formatDatabaseName(name: String): String = {
if (conf.caseSensitiveAnalysis) name else name.toLowerCase
}

/**
* This method is used to make the given path qualified before we
* store this path in the underlying external catalog. So, when a path
Expand All @@ -112,25 +119,33 @@ class SessionCatalog(

def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
val dbName = formatDatabaseName(dbDefinition.name)
externalCatalog.createDatabase(
dbDefinition.copy(locationUri = qualifiedPath),
dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
ignoreIfExists)
}

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
val dbName = formatDatabaseName(db)
if (dbName == "default") {
throw new AnalysisException(s"Can not drop default database")
}
externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
}

def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
externalCatalog.alterDatabase(dbDefinition)
val dbName = formatDatabaseName(dbDefinition.name)
externalCatalog.alterDatabase(dbDefinition.copy(name = dbName))
}

def getDatabaseMetadata(db: String): CatalogDatabase = {
externalCatalog.getDatabase(db)
val dbName = formatDatabaseName(db)
externalCatalog.getDatabase(dbName)
}

def databaseExists(db: String): Boolean = {
externalCatalog.databaseExists(db)
val dbName = formatDatabaseName(db)
externalCatalog.databaseExists(dbName)
}

def listDatabases(): Seq[String] = {
Expand All @@ -144,18 +159,19 @@ class SessionCatalog(
def getCurrentDatabase: String = synchronized { currentDb }

def setCurrentDatabase(db: String): Unit = {
if (!databaseExists(db)) {
throw new AnalysisException(s"Database '$db' does not exist.")
val dbName = formatDatabaseName(db)
if (!databaseExists(dbName)) {
throw new AnalysisException(s"Database '$dbName' does not exist.")
}
synchronized { currentDb = db }
synchronized { currentDb = dbName }
}

/**
* Get the path for creating a non-default database when database location is not provided
* by users.
*/
def getDefaultDBPath(db: String): String = {
val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase
val database = formatDatabaseName(db)
new Path(new Path(conf.warehousePath), database + ".db").toString
}

Expand All @@ -177,7 +193,7 @@ class SessionCatalog(
* If no such database is specified, create it in the current database.
*/
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
val db = tableDefinition.identifier.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
Expand All @@ -193,7 +209,7 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterTable(tableDefinition: CatalogTable): Unit = {
val db = tableDefinition.identifier.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
externalCatalog.alterTable(db, newTableDefinition)
Expand All @@ -205,7 +221,7 @@ class SessionCatalog(
* If the specified table is not found in the database then an [[AnalysisException]] is thrown.
*/
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = name.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
externalCatalog.getTable(db, table)
}
Expand All @@ -216,7 +232,7 @@ class SessionCatalog(
* If the specified table is not found in the database then return None if it doesn't exist.
*/
def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = {
val db = name.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
externalCatalog.getTableOption(db, table)
}
Expand All @@ -231,7 +247,7 @@ class SessionCatalog(
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit = {
val db = name.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
}
Expand All @@ -249,14 +265,14 @@ class SessionCatalog(
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit = {
val db = name.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime,
inheritTableSpecs, isSkewedStoreAsSubdir)
}

def defaultTablePath(tableIdent: TableIdentifier): String = {
val dbName = tableIdent.database.getOrElse(getCurrentDatabase)
val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
val dbLocation = getDatabaseMetadata(dbName).locationUri

new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString
Expand Down Expand Up @@ -290,8 +306,8 @@ class SessionCatalog(
* This assumes the database specified in `oldName` matches the one specified in `newName`.
*/
def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized {
val db = oldName.database.getOrElse(currentDb)
val newDb = newName.database.getOrElse(currentDb)
val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
val newDb = formatDatabaseName(newName.database.getOrElse(currentDb))
if (db != newDb) {
throw new AnalysisException(
s"RENAME TABLE source and destination databases do not match: '$db' != '$newDb'")
Expand All @@ -315,7 +331,7 @@ class SessionCatalog(
* the same name, then, if that does not exist, drop the table from the current database.
*/
def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = synchronized {
val db = name.database.getOrElse(currentDb)
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.contains(table)) {
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
Expand All @@ -339,7 +355,7 @@ class SessionCatalog(
*/
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
synchronized {
val db = name.database.getOrElse(currentDb)
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
val relation =
if (name.database.isDefined || !tempTables.contains(table)) {
Expand All @@ -364,7 +380,7 @@ class SessionCatalog(
* contain the table.
*/
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = name.database.getOrElse(currentDb)
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.contains(table)) {
externalCatalog.tableExists(db, table)
Expand All @@ -386,14 +402,15 @@ class SessionCatalog(
/**
* List all tables in the specified database, including temporary tables.
*/
def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
def listTables(db: String): Seq[TableIdentifier] = listTables(formatDatabaseName(db), "*")

/**
* List all matching tables in the specified database, including temporary tables.
*/
def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
val dbName = formatDatabaseName(db)
val dbTables =
externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) }
externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) }
synchronized {
val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern)
.map { t => TableIdentifier(t) }
Expand Down Expand Up @@ -449,7 +466,7 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val db = tableName.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
externalCatalog.createPartitions(db, table, parts, ignoreIfExists)
}
Expand All @@ -462,7 +479,7 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean): Unit = {
val db = tableName.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists)
}
Expand All @@ -477,7 +494,7 @@ class SessionCatalog(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
val db = tableName.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
externalCatalog.renamePartitions(db, table, specs, newSpecs)
}
Expand All @@ -492,7 +509,7 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
val db = tableName.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
externalCatalog.alterPartitions(db, table, parts)
}
Expand All @@ -502,7 +519,7 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
val db = tableName.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
externalCatalog.getPartition(db, table, spec)
}
Expand All @@ -517,7 +534,7 @@ class SessionCatalog(
def listPartitions(
tableName: TableIdentifier,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
val db = tableName.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
externalCatalog.listPartitions(db, table, partialSpec)
}
Expand All @@ -540,7 +557,7 @@ class SessionCatalog(
* If no such database is specified, create it in the current database.
*/
def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
val db = funcDefinition.identifier.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db))
val newFuncDefinition = funcDefinition.copy(identifier = identifier)
if (!functionExists(identifier)) {
Expand All @@ -555,7 +572,7 @@ class SessionCatalog(
* If no database is specified, assume the function is in the current database.
*/
def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
val db = name.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val identifier = name.copy(database = Some(db))
if (functionExists(identifier)) {
// TODO: registry should just take in FunctionIdentifier for type safety
Expand All @@ -579,15 +596,15 @@ class SessionCatalog(
* If no database is specified, this will return the function in the current database.
*/
def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
val db = name.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
externalCatalog.getFunction(db, name.funcName)
}

/**
* Check if the specified function exists.
*/
def functionExists(name: FunctionIdentifier): Boolean = {
val db = name.database.getOrElse(getCurrentDatabase)
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
functionRegistry.functionExists(name.unquotedString) ||
externalCatalog.functionExists(db, name.funcName)
}
Expand Down Expand Up @@ -654,7 +671,8 @@ class SessionCatalog(
*/
private[spark] def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = synchronized {
// TODO: just make function registry take in FunctionIdentifier instead of duplicating this
val qualifiedName = name.copy(database = name.database.orElse(Some(currentDb)))
val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
val qualifiedName = name.copy(database = database)
functionRegistry.lookupFunction(name.funcName)
.orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString))
.getOrElse {
Expand Down Expand Up @@ -693,7 +711,8 @@ class SessionCatalog(
}

// If the name itself is not qualified, add the current database to it.
val qualifiedName = if (name.database.isEmpty) name.copy(database = Some(currentDb)) else name
val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
val qualifiedName = name.copy(database = database)

if (functionRegistry.functionExists(qualifiedName.unquotedString)) {
// This function has been already loaded into the function registry.
Expand Down Expand Up @@ -733,8 +752,9 @@ class SessionCatalog(
* List all matching functions in the specified database, including temporary functions.
*/
def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = {
val dbFunctions =
externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
val dbName = formatDatabaseName(db)
val dbFunctions = externalCatalog.listFunctions(dbName, pattern)
.map { f => FunctionIdentifier(f, Some(dbName)) }
val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern)
.map { f => FunctionIdentifier(f) }
dbFunctions ++ loadedFunctions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,16 +605,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {

checkAnswer(
sql("SHOW DATABASES LIKE '*db1A'"),
Row("showdb1A") :: Nil)
Row("showdb1a") :: Nil)

checkAnswer(
sql("SHOW DATABASES LIKE 'showdb1A'"),
Row("showdb1A") :: Nil)
Row("showdb1a") :: Nil)

checkAnswer(
sql("SHOW DATABASES LIKE '*db1A|*db2B'"),
Row("showdb1A") ::
Row("showdb2B") :: Nil)
Row("showdb1a") ::
Row("showdb2b") :: Nil)

checkAnswer(
sql("SHOW DATABASES LIKE 'non-existentdb'"),
Expand Down Expand Up @@ -939,4 +939,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Row("Usage: a ^ b - Bitwise exclusive OR.") :: Nil
)
}

test("drop default database") {
Seq("true", "false").foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
var message = intercept[AnalysisException] {
sql("DROP DATABASE default")
}.getMessage
assert(message.contains("Can not drop default database"))

message = intercept[AnalysisException] {
sql("DROP DATABASE DeFault")
}.getMessage
if (caseSensitive == "true") {
assert(message.contains("Database 'DeFault' does not exist"))
} else {
assert(message.contains("Can not drop default database"))
}
}
}
}
}
Loading