Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case DropView(r: ResolvedView, ifExists) =>
DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = true, purge = false)

case c @ CreateNamespace(ResolvedDBObjectName(catalog, name), _, _)
if isSessionCatalog(catalog) =>
if (name.length != 1) {
throw QueryCompilationErrors.invalidDatabaseNameError(name.quoted)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is handled in DatabaseNameInSessionCatalog.


case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command =>
val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
CreateDatabaseCommand(name.head, c.ifNotExists, location, comment, newProperties)
CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties)

case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) =>
DropDatabaseCommand(db, d.ifExists, d.cascade)
Expand Down Expand Up @@ -609,4 +604,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
resolved.namespace.map(quoteIfNeeded).mkString("."))
}
}

private object DatabaseNameInSessionCatalog {
def unapply(resolved: ResolvedDBObjectName): Option[String] = resolved match {
case ResolvedDBObjectName(catalog, _) if !isSessionCatalog(catalog) => None
case ResolvedDBObjectName(_, Seq(dbName)) => Some(dbName)
case _ =>
assert(resolved.nameParts.length > 1)
throw QueryCompilationErrors.invalidDatabaseNameError(resolved.nameParts.quoted)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Util, SupportsNamespaces}
import org.apache.spark.sql.execution.command.DDLCommandTestUtils.V1_COMMAND_VERSION
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -47,9 +49,8 @@ trait CreateNamespaceSuiteBase extends QueryTest with DDLCommandTestUtils {

protected def namespaceArray: Array[String] = namespace.split('.')

protected def notFoundMsgPrefix: String

protected def alreadyExistErrorMessage: String = s"$notFoundMsgPrefix '$namespace' already exists"
protected def notFoundMsgPrefix: String =
if (commandVersion == V1_COMMAND_VERSION) "Database" else "Namespace"

test("basic") {
val ns = s"$catalog.$namespace"
Expand Down Expand Up @@ -88,12 +89,10 @@ trait CreateNamespaceSuiteBase extends QueryTest with DDLCommandTestUtils {
withNamespace(ns) {
sql(s"CREATE NAMESPACE $ns")

// TODO: HiveExternalCatalog throws DatabaseAlreadyExistsException, and
// non-Hive catalogs throw NamespaceAlreadyExistsException.
val e = intercept[AnalysisException] {
val e = intercept[NamespaceAlreadyExistsException] {
sql(s"CREATE NAMESPACE $ns")
}
assert(e.getMessage.contains(alreadyExistErrorMessage))
assert(e.getMessage.contains(s"$notFoundMsgPrefix '$namespace' already exists"))

// The following will be no-op since the namespace already exists.
sql(s"CREATE NAMESPACE IF NOT EXISTS $ns")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,8 @@ trait DDLCommandTestUtils extends SQLTestUtils {
part1Loc
}
}

object DDLCommandTestUtils {
val V1_COMMAND_VERSION = "V1"
val V2_COMMAND_VERSION = "V2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command
import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.sql.execution.command.DDLCommandTestUtils.{V1_COMMAND_VERSION, V2_COMMAND_VERSION}
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -34,8 +35,14 @@ trait TestsV1AndV2Commands extends DDLCommandTestUtils {
override def test(testName: String, testTags: Tag*)(testFun: => Any)
(implicit pos: Position): Unit = {
Seq(true, false).foreach { useV1Command =>
_version = if (useV1Command) "V1" else "V2"
def setCommandVersion(): Unit = {
_version = if (useV1Command) V1_COMMAND_VERSION else V2_COMMAND_VERSION
}
setCommandVersion()
super.test(testName, testTags: _*) {
// Need to set command version inside this test function so that
// the correct command version is available in each test.
setCommandVersion()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I'm confused. We set the version right before super.test(....), do you mean it's being set again during super.test(....)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This def test doesn't run testFun when it's invoked; it only registers the test). So by the time, testFunc is actually run, _version will always be set to "V2" (useV1Command == false). So we need to capture this inside the lambda that's passed into super.test.

Also, note that we need to call setCommandVersion() before calling super.test because super.test being called is utilizing the commandVersion to set the right test name:

val testNamePrefix = s"$command using $catalogVersion catalog $commandVersion command"
super.test(s"$testNamePrefix: $testName", testTags: _*)(testFun)

withSQLConf(SQLConf.LEGACY_USE_V1_COMMAND.key -> useV1Command.toString) {
testFun
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.sql.execution.command
trait CreateNamespaceSuiteBase extends command.CreateNamespaceSuiteBase
with command.TestsV1AndV2Commands {
override def namespace: String = "db"
override def notFoundMsgPrefix: String = "Database"

test("Create namespace using default warehouse path") {
val ns = s"$catalog.$namespace"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@ import org.apache.spark.sql.execution.command
*/
class CreateNamespaceSuite extends command.CreateNamespaceSuiteBase with CommandSuiteBase {
override def namespace: String = "ns1.ns2"
override def notFoundMsgPrefix: String = "Namespace"
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -94,10 +94,22 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}

/**
* Run some code involving `client` in a [[synchronized]] block and wrap certain
* Run some code involving `client` in a [[synchronized]] block and wrap non-fatal
* exceptions thrown in the process in [[AnalysisException]].
*/
private def withClient[T](body: => T): T = synchronized {
private def withClient[T](body: => T): T = withClientWrappingException {
body
} {
_ => None // Will fallback to default wrapping strategy in withClientWrappingException.
}

/**
* Run some code involving `client` in a [[synchronized]] block and wrap non-fatal
* exceptions thrown in the process in [[AnalysisException]] using the given
* `wrapException` function.
*/
private def withClientWrappingException[T](body: => T)
(wrapException: Throwable => Option[AnalysisException]): T = synchronized {
try {
body
} catch {
Expand All @@ -108,8 +120,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
case i: InvocationTargetException => i.getCause
case o => o
}
throw new AnalysisException(
e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
wrapException(e) match {
case Some(wrapped) => throw wrapped
case None => throw new AnalysisException(
e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
}
}
}

Expand Down Expand Up @@ -189,8 +204,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

override def createDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withClient {
ignoreIfExists: Boolean): Unit = withClientWrappingException {
client.createDatabase(dbDefinition, ignoreIfExists)
} { exception =>
if (exception.getClass.getName.equals(
"org.apache.hadoop.hive.metastore.api.AlreadyExistsException")
&& exception.getMessage.contains(
s"Database ${dbDefinition.name} already exists")) {
Some(new DatabaseAlreadyExistsException(dbDefinition.name))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test it in the VersionsSuite, by updating the existing $version: createDatabase test? We need to make sure this wrap exception logic works for all hive versions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test. Thanks!

} else {
None
}
}

override def dropDatabase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ class VersionsSuite extends SparkFunSuite with Logging {
val tempDB = CatalogDatabase(
"temporary", description = "test create", tempDatabasePath, Map())
client.createDatabase(tempDB, ignoreIfExists = true)

try {
client.createDatabase(tempDB, ignoreIfExists = false)
assert(false, "createDatabase should throw AlreadyExistsException")
} catch {
case ex: Throwable =>
assert(ex.getClass.getName.equals(
"org.apache.hadoop.hive.metastore.api.AlreadyExistsException"))
assert(ex.getMessage.contains(s"Database ${tempDB.name} already exists"))
}
}

test(s"$version: create/get/alter database should pick right user name as owner") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ import org.apache.spark.sql.execution.command.v1
*/
class CreateNamespaceSuite extends v1.CreateNamespaceSuiteBase with CommandSuiteBase {
override def commandVersion: String = super[CreateNamespaceSuiteBase].commandVersion
override def alreadyExistErrorMessage: String = s"$notFoundMsgPrefix $namespace already exists"
}