Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
import org.apache.spark.sql.hive.client.HiveClient
Expand Down Expand Up @@ -94,22 +93,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}

/**
* Run some code involving `client` in a [[synchronized]] block and wrap non-fatal
* Run some code involving `client` in a [[synchronized]] block and wrap certain
* exceptions thrown in the process in [[AnalysisException]].
*/
private def withClient[T](body: => T): T = withClientWrappingException {
body
} {
_ => None // Will fallback to default wrapping strategy in withClientWrappingException.
}

/**
* Run some code involving `client` in a [[synchronized]] block and wrap non-fatal
* exceptions thrown in the process in [[AnalysisException]] using the given
* `wrapException` function.
*/
private def withClientWrappingException[T](body: => T)
(wrapException: Throwable => Option[AnalysisException]): T = synchronized {
private def withClient[T](body: => T): T = synchronized {
try {
body
} catch {
Expand All @@ -120,11 +107,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
case i: InvocationTargetException => i.getCause
case o => o
}
wrapException(e) match {
case Some(wrapped) => throw wrapped
case None => throw new AnalysisException(
e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is reverting changes made in #35113.

throw new AnalysisException(
e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
}
}

Expand Down Expand Up @@ -204,32 +188,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

override def createDatabase(
dbDefinition: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withClientWrappingException {
ignoreIfExists: Boolean): Unit = withClient {
client.createDatabase(dbDefinition, ignoreIfExists)
} { exception =>
if (exception.getClass.getName.equals(
"org.apache.hadoop.hive.metastore.api.AlreadyExistsException")
&& exception.getMessage.contains(
s"Database ${dbDefinition.name} already exists")) {
Some(new DatabaseAlreadyExistsException(dbDefinition.name))
} else {
None
}
}

override def dropDatabase(
db: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = withClient {
try {
client.dropDatabase(db, ignoreIfNotExists, cascade)
} catch {
case NonFatal(exception) =>
if (exception.getClass.getName.equals("org.apache.hadoop.hive.ql.metadata.HiveException")
&& exception.getMessage.contains(s"Database $db is not empty.")) {
throw QueryCompilationErrors.cannotDropNonemptyDatabaseError(db)
} else throw exception
}
client.dropDatabase(db, ignoreIfNotExists, cascade)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -332,14 +332,24 @@ private[hive] class HiveClientImpl(
database: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withHiveState {
val hiveDb = toHiveDatabase(database, Some(userName))
shim.createDatabase(client, hiveDb, ignoreIfExists)
try {
shim.createDatabase(client, hiveDb, ignoreIfExists)
} catch {
case _: AlreadyExistsException =>
throw new DatabaseAlreadyExistsException(database.name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we need to do something similar as:

try {
shim.createPartitions(client, db, table, parts, ignoreIfExists)
} catch {
case e: InvocationTargetException => replaceExistException(e.getCause)
case e: Throwable => replaceExistException(e)
}

@MaxGekk do you happen to know?

}
}

override def dropDatabase(
name: String,
ignoreIfNotExists: Boolean,
cascade: Boolean): Unit = withHiveState {
shim.dropDatabase(client, name, true, ignoreIfNotExists, cascade)
try {
shim.dropDatabase(client, name, true, ignoreIfNotExists, cascade)
} catch {
case e: HiveException if e.getMessage.contains(s"Database $name is not empty") =>
throw QueryCompilationErrors.cannotDropNonemptyDatabaseError(name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
Expand Down Expand Up @@ -184,14 +184,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
"temporary", description = "test create", tempDatabasePath, Map())
client.createDatabase(tempDB, ignoreIfExists = true)

try {
intercept[DatabaseAlreadyExistsException] {
client.createDatabase(tempDB, ignoreIfExists = false)
assert(false, "createDatabase should throw AlreadyExistsException")
} catch {
case ex: Throwable =>
assert(ex.getClass.getName.equals(
"org.apache.hadoop.hive.metastore.api.AlreadyExistsException"))
assert(ex.getMessage.contains(s"Database ${tempDB.name} already exists"))
}
}

Expand Down Expand Up @@ -275,6 +269,14 @@ class VersionsSuite extends SparkFunSuite with Logging {

test(s"$version: dropDatabase") {
assert(client.databaseExists("temporary"))

client.createTable(table("temporary", tableName = "tbl"), ignoreIfExists = false)
val ex = intercept[AnalysisException] {
client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = false)
assert(false, "dropDatabase should throw HiveException")
}
assert(ex.message.contains("Cannot drop a non-empty database: temporary."))

client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true)
assert(client.databaseExists("temporary") == false)
}
Expand Down