-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37636][SQL] Migrate CREATE NAMESPACE to use V2 command by default #35113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -28,14 +28,21 @@ import org.apache.spark.sql.internal.SQLConf | |||||
| trait TestsV1AndV2Commands extends DDLCommandTestUtils { | ||||||
| private var _version: String = "" | ||||||
| override def commandVersion: String = _version | ||||||
| def runningV1Command: Boolean = commandVersion == "V1" | ||||||
|
|
||||||
| // Tests using V1 catalogs will run with `spark.sql.legacy.useV1Command` on and off | ||||||
| // to test both V1 and V2 commands. | ||||||
| override def test(testName: String, testTags: Tag*)(testFun: => Any) | ||||||
| (implicit pos: Position): Unit = { | ||||||
| Seq(true, false).foreach { useV1Command => | ||||||
| _version = if (useV1Command) "V1" else "V2" | ||||||
| def setCommandVersion(): Unit = { | ||||||
| _version = if (useV1Command) "V1" else "V2" | ||||||
| } | ||||||
| setCommandVersion() | ||||||
| super.test(testName, testTags: _*) { | ||||||
| // Need to set command version inside this test function so that | ||||||
| // the correct command version is available in each test. | ||||||
| setCommandVersion() | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry I'm confused. We set the version right before
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This Also, note that we need to call spark/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala Lines 56 to 57 in 527e842
|
||||||
| withSQLConf(SQLConf.LEGACY_USE_V1_COMMAND.key -> useV1Command.toString) { | ||||||
| testFun | ||||||
| } | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.command | |
| trait CreateNamespaceSuiteBase extends command.CreateNamespaceSuiteBase | ||
| with command.TestsV1AndV2Commands { | ||
| override def namespace: String = "db" | ||
| override def notFoundMsgPrefix: String = "Database" | ||
| override def notFoundMsgPrefix: String = if (runningV1Command) "Database" else "Namespace" | ||
|
||
|
|
||
| test("Create namespace using default warehouse path") { | ||
| val ns = s"$catalog.$namespace" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,7 @@ import org.apache.spark.{SparkConf, SparkException} | |
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException | ||
| import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, TableAlreadyExistsException} | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
| import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
|
|
@@ -94,10 +94,22 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| } | ||
|
|
||
| /** | ||
| * Run some code involving `client` in a [[synchronized]] block and wrap certain | ||
| * Run some code involving `client` in a [[synchronized]] block and wrap non-fatal | ||
| * exceptions thrown in the process in [[AnalysisException]]. | ||
| */ | ||
| private def withClient[T](body: => T): T = synchronized { | ||
| private def withClient[T](body: => T): T = withClientWrappingException{ | ||
imback82 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| body | ||
| } { | ||
| _ => None // Will fallback to default wrapping strategy in withClientWrappingException. | ||
| } | ||
|
|
||
| /** | ||
| * Run some code involving `client` in a [[synchronized]] block and wrap non-fatal | ||
| * exceptions thrown in the process in [[AnalysisException]] using the given | ||
| * `wrapException` function. | ||
| */ | ||
| private def withClientWrappingException[T](body: => T) | ||
| (wrapException: Throwable => Option[AnalysisException]): T = synchronized { | ||
| try { | ||
| body | ||
| } catch { | ||
|
|
@@ -108,8 +120,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| case i: InvocationTargetException => i.getCause | ||
| case o => o | ||
| } | ||
| throw new AnalysisException( | ||
| e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) | ||
| wrapException(e) match { | ||
| case Some(wrapped) => throw wrapped | ||
| case None => throw new AnalysisException( | ||
| e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -189,8 +204,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
|
|
||
| override def createDatabase( | ||
| dbDefinition: CatalogDatabase, | ||
| ignoreIfExists: Boolean): Unit = withClient { | ||
| ignoreIfExists: Boolean): Unit = withClientWrappingException { | ||
| client.createDatabase(dbDefinition, ignoreIfExists) | ||
| } { exception => | ||
| if (exception.getClass.getName.equals( | ||
| "org.apache.hadoop.hive.metastore.api.AlreadyExistsException") | ||
| && exception.getMessage.contains( | ||
| s"Database ${dbDefinition.name} already exists")) { | ||
| Some(new DatabaseAlreadyExistsException(dbDefinition.name)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we test it in the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the test. Thanks! |
||
| } else { | ||
| None | ||
| } | ||
| } | ||
|
|
||
| override def dropDatabase( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is handled in
DatabaseNameInSessionCatalog.