diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 835ea9214481..7e5e2bad041e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -214,16 +214,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case DropView(r: ResolvedView, ifExists) => DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = true, purge = false) - case c @ CreateNamespace(ResolvedDBObjectName(catalog, name), _, _) - if isSessionCatalog(catalog) => - if (name.length != 1) { - throw QueryCompilationErrors.invalidDatabaseNameError(name.quoted) - } - + case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command => val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) val location = c.properties.get(SupportsNamespaces.PROP_LOCATION) val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES - CreateDatabaseCommand(name.head, c.ifNotExists, location, comment, newProperties) + CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties) case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) => DropDatabaseCommand(db, d.ifExists, d.cascade) @@ -609,4 +604,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) resolved.namespace.map(quoteIfNeeded).mkString(".")) } } + + private object DatabaseNameInSessionCatalog { + def unapply(resolved: ResolvedDBObjectName): Option[String] = resolved match { + case ResolvedDBObjectName(catalog, _) if !isSessionCatalog(catalog) => None + case ResolvedDBObjectName(_, Seq(dbName)) => Some(dbName) + case _ => + assert(resolved.nameParts.length > 1) + throw QueryCompilationErrors.invalidDatabaseNameError(resolved.nameParts.quoted) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceSuiteBase.scala index 597ea869eb8a..7db8fba8ac36 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceSuiteBase.scala @@ -21,9 +21,11 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Util, SupportsNamespaces} +import org.apache.spark.sql.execution.command.DDLCommandTestUtils.V1_COMMAND_VERSION import org.apache.spark.sql.internal.SQLConf /** @@ -47,9 +49,8 @@ trait CreateNamespaceSuiteBase extends QueryTest with DDLCommandTestUtils { protected def namespaceArray: Array[String] = namespace.split('.') - protected def notFoundMsgPrefix: String - - protected def alreadyExistErrorMessage: String = s"$notFoundMsgPrefix '$namespace' already exists" + protected def notFoundMsgPrefix: String = + if (commandVersion == V1_COMMAND_VERSION) "Database" else "Namespace" test("basic") { val ns = s"$catalog.$namespace" @@ -88,12 +89,10 @@ trait CreateNamespaceSuiteBase extends QueryTest with DDLCommandTestUtils { withNamespace(ns) { sql(s"CREATE NAMESPACE $ns") - // TODO: HiveExternalCatalog throws DatabaseAlreadyExistsException, and - // non-Hive catalogs throw NamespaceAlreadyExistsException. - val e = intercept[AnalysisException] { + val e = intercept[NamespaceAlreadyExistsException] { sql(s"CREATE NAMESPACE $ns") } - assert(e.getMessage.contains(alreadyExistErrorMessage)) + assert(e.getMessage.contains(s"$notFoundMsgPrefix '$namespace' already exists")) // The following will be no-op since the namespace already exists. sql(s"CREATE NAMESPACE IF NOT EXISTS $ns") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala index af3e92a55868..39f2abd35c2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala @@ -173,3 +173,8 @@ trait DDLCommandTestUtils extends SQLTestUtils { part1Loc } } + +object DDLCommandTestUtils { + val V1_COMMAND_VERSION = "V1" + val V2_COMMAND_VERSION = "V2" +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TestsV1AndV2Commands.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TestsV1AndV2Commands.scala index 15976f2df7c2..73e75dca4517 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TestsV1AndV2Commands.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TestsV1AndV2Commands.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command import org.scalactic.source.Position import org.scalatest.Tag +import org.apache.spark.sql.execution.command.DDLCommandTestUtils.{V1_COMMAND_VERSION, V2_COMMAND_VERSION} import org.apache.spark.sql.internal.SQLConf /** @@ -34,8 +35,14 @@ trait TestsV1AndV2Commands extends DDLCommandTestUtils { override def test(testName: String, testTags: Tag*)(testFun: => Any) (implicit pos: Position): Unit = { Seq(true, false).foreach { useV1Command => - _version = if (useV1Command) "V1" else "V2" + def setCommandVersion(): Unit = { + _version = if (useV1Command) V1_COMMAND_VERSION else V2_COMMAND_VERSION + } + setCommandVersion() super.test(testName, testTags: _*) { + // Need to set command version inside this test function so that + // the correct command version is available in each test. + setCommandVersion() withSQLConf(SQLConf.LEGACY_USE_V1_COMMAND.key -> useV1Command.toString) { testFun } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateNamespaceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateNamespaceSuite.scala index ba3eba932355..f8ded64669b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateNamespaceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/CreateNamespaceSuite.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.execution.command trait CreateNamespaceSuiteBase extends command.CreateNamespaceSuiteBase with command.TestsV1AndV2Commands { override def namespace: String = "db" - override def notFoundMsgPrefix: String = "Database" test("Create namespace using default warehouse path") { val ns = s"$catalog.$namespace" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateNamespaceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateNamespaceSuite.scala index 66865ff0c5bd..6b5475a1e267 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateNamespaceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateNamespaceSuite.scala @@ -24,5 +24,4 @@ import org.apache.spark.sql.execution.command */ class CreateNamespaceSuite extends command.CreateNamespaceSuiteBase with CommandSuiteBase { override def namespace: String = "ns1.ns2" - override def notFoundMsgPrefix: String = "Namespace" } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 52b21598edc0..24e60529d227 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -36,7 +36,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ @@ -94,10 +94,22 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } /** - * Run some code involving `client` in a [[synchronized]] block and wrap certain + * Run some code involving `client` in a [[synchronized]] block and wrap non-fatal * exceptions thrown in the process in [[AnalysisException]]. */ - private def withClient[T](body: => T): T = synchronized { + private def withClient[T](body: => T): T = withClientWrappingException { + body + } { + _ => None // Will fallback to default wrapping strategy in withClientWrappingException. + } + + /** + * Run some code involving `client` in a [[synchronized]] block and wrap non-fatal + * exceptions thrown in the process in [[AnalysisException]] using the given + * `wrapException` function. + */ + private def withClientWrappingException[T](body: => T) + (wrapException: Throwable => Option[AnalysisException]): T = synchronized { try { body } catch { @@ -108,8 +120,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case i: InvocationTargetException => i.getCause case o => o } - throw new AnalysisException( - e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) + wrapException(e) match { + case Some(wrapped) => throw wrapped + case None => throw new AnalysisException( + e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) + } } } @@ -189,8 +204,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createDatabase( dbDefinition: CatalogDatabase, - ignoreIfExists: Boolean): Unit = withClient { + ignoreIfExists: Boolean): Unit = withClientWrappingException { client.createDatabase(dbDefinition, ignoreIfExists) + } { exception => + if (exception.getClass.getName.equals( + "org.apache.hadoop.hive.metastore.api.AlreadyExistsException") + && exception.getMessage.contains( + s"Database ${dbDefinition.name} already exists")) { + Some(new DatabaseAlreadyExistsException(dbDefinition.name)) + } else { + None + } } override def dropDatabase( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 03aba2db9932..14b2a51bff8c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -183,6 +183,16 @@ class VersionsSuite extends SparkFunSuite with Logging { val tempDB = CatalogDatabase( "temporary", description = "test create", tempDatabasePath, Map()) client.createDatabase(tempDB, ignoreIfExists = true) + + try { + client.createDatabase(tempDB, ignoreIfExists = false) + assert(false, "createDatabase should throw AlreadyExistsException") + } catch { + case ex: Throwable => + assert(ex.getClass.getName.equals( + "org.apache.hadoop.hive.metastore.api.AlreadyExistsException")) + assert(ex.getMessage.contains(s"Database ${tempDB.name} already exists")) + } } test(s"$version: create/get/alter database should pick right user name as owner") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateNamespaceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateNamespaceSuite.scala index e8f85f40e9fb..afe6f1138d30 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateNamespaceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/CreateNamespaceSuite.scala @@ -25,5 +25,4 @@ import org.apache.spark.sql.execution.command.v1 */ class CreateNamespaceSuite extends v1.CreateNamespaceSuiteBase with CommandSuiteBase { override def commandVersion: String = super[CreateNamespaceSuiteBase].commandVersion - override def alreadyExistErrorMessage: String = s"$notFoundMsgPrefix $namespace already exists" }