Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,8 @@ class InMemoryCatalog(
if (catalog.contains(db)) {
if (!cascade) {
// If cascade is false, make sure the database is empty.
if (catalog(db).tables.nonEmpty) {
throw QueryCompilationErrors.databaseNotEmptyError(db, "tables")
}
if (catalog(db).functions.nonEmpty) {
throw QueryCompilationErrors.databaseNotEmptyError(db, "functions")
if (catalog(db).tables.nonEmpty || catalog(db).functions.nonEmpty) {
throw QueryCompilationErrors.cannotDropNonemptyDatabaseError(db)
}
}
// Remove the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,14 @@ object QueryCompilationErrors {
s"rename temporary view from '$oldName' to '$newName': destination view already exists")
}

def databaseNotEmptyError(db: String, details: String): Throwable = {
new AnalysisException(s"Database $db is not empty. One or more $details exist.")
def cannotDropNonemptyDatabaseError(db: String): Throwable = {
new AnalysisException(s"Cannot drop a non-empty database: $db. " +
"Use CASCADE option to drop a non-empty database.")
}

def cannotDropNonemptyNamespaceError(namespace: Seq[String]): Throwable = {
new AnalysisException(s"Cannot drop a non-empty namespace: ${namespace.quoted}. " +
"Use CASCADE option to drop a non-empty namespace.")
}

def invalidNameForTableOrDatabaseError(name: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,6 @@ object QueryExecutionErrors {
"Schema of v1 relation: " + v1Schema)
}

def cannotDropNonemptyNamespaceError(namespace: Seq[String]): Throwable = {
new SparkException(
s"Cannot drop a non-empty namespace: ${namespace.quoted}. " +
"Use CASCADE option to drop a non-empty namespace.")
}

def noRecordsFromEmptyDataReaderError(): Throwable = {
new IOException("No records should be returned from EmptyDataReader")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.errors.QueryCompilationErrors

/**
* Physical plan node for dropping a namespace.
Expand All @@ -42,12 +42,12 @@ case class DropNamespaceExec(
if (!cascade) {
if (catalog.asTableCatalog.listTables(ns).nonEmpty
|| nsCatalog.listNamespaces(ns).nonEmpty) {
throw QueryExecutionErrors.cannotDropNonemptyNamespaceError(namespace)
throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace)
}
}

if (!nsCatalog.dropNamespace(ns)) {
throw QueryExecutionErrors.cannotDropNonemptyNamespaceError(namespace)
throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace)
}
} else if (!ifExists) {
throw QueryCompilationErrors.noSuchNamespaceError(ns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ trait DropNamespaceSuiteBase extends QueryTest with DDLCommandTestUtils {

protected def builtinTopNamespaces: Seq[String] = Seq.empty
protected def isCasePreserving: Boolean = true
protected def assertDropFails(): Unit
protected def namespaceAlias: String = "namespace"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alias for namespace: database for v1 and namespace for v2


protected def checkNamespace(expected: Seq[String]) = {
val df = spark.sql(s"SHOW NAMESPACES IN $catalog")
Expand Down Expand Up @@ -72,7 +72,10 @@ trait DropNamespaceSuiteBase extends QueryTest with DDLCommandTestUtils {
checkNamespace(Seq("ns") ++ builtinTopNamespaces)

// $catalog.ns.table is present, thus $catalog.ns cannot be dropped.
assertDropFails()
val e = intercept[AnalysisException] {
sql(s"DROP NAMESPACE $catalog.ns")
}
assert(e.getMessage.contains(s"Cannot drop a non-empty $namespaceAlias: ns"))
sql(s"DROP TABLE $catalog.ns.table")

// Now that $catalog.ns is empty, it can be dropped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,7 @@ import org.apache.spark.sql.execution.command
trait DropNamespaceSuiteBase extends command.DropNamespaceSuiteBase {
override protected def builtinTopNamespaces: Seq[String] = Seq("default")

override protected def assertDropFails(): Unit = {
val e = intercept[AnalysisException] {
sql(s"DROP NAMESPACE $catalog.ns")
}
assert(e.getMessage.contains("Database ns is not empty. One or more tables exist"))
}
override protected def namespaceAlias(): String = "database"

test("drop default namespace") {
val message = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,9 @@

package org.apache.spark.sql.execution.command.v2

import org.apache.spark.SparkException
import org.apache.spark.sql.execution.command

/**
* The class contains tests for the `DROP NAMESPACE` command to check V2 table catalogs.
*/
class DropNamespaceSuite extends command.DropNamespaceSuiteBase with CommandSuiteBase {
// TODO: Unify the error that throws from v1 and v2 test suite into `AnalysisException`
override protected def assertDropFails(): Unit = {
val e = intercept[SparkException] {
sql(s"DROP NAMESPACE $catalog.ns")
}
assert(e.getMessage.contains("Cannot drop a non-empty namespace: ns"))
}
}
class DropNamespaceSuite extends command.DropNamespaceSuiteBase with CommandSuiteBase
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
Expand Down Expand Up @@ -196,7 +197,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = withClient {
client.dropDatabase(db, ignoreIfNotExists, cascade)
try {
client.dropDatabase(db, ignoreIfNotExists, cascade)
} catch {
case NonFatal(exception) =>
if (exception.getClass.getName.equals("org.apache.hadoop.hive.ql.metadata.HiveException")
&& exception.getMessage.contains(s"Database $db is not empty.")) {
throw QueryCompilationErrors.cannotDropNonemptyDatabaseError(db)
} else throw exception
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ class HiveDDLSuite
if (tableExists && !cascade) {
assertAnalysisError(
sqlDropDatabase,
s"Database $dbName is not empty. One or more tables exist.")
s"Cannot drop a non-empty database: $dbName.")
// the database directory was not removed
assert(fs.exists(new Path(expectedDBLocation)))
} else {
Expand Down