diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 284b05c1cc12..0c6b2701c92b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -52,7 +52,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte .exists(_.contains("catalog comment")) assert(createCommentWarning === false) - catalog.dropNamespace(Array("foo")) + catalog.dropNamespace(Array("foo"), cascade = false) assert(catalog.namespaceExists(Array("foo")) === false) assert(catalog.listNamespaces() === builtinNamespaces) val msg = intercept[AnalysisException] { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index 48a859a4159f..865ac553199a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -155,8 +155,10 @@ public void alterNamespace( } @Override - public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException { - return asNamespaceCatalog().dropNamespace(namespace); + public boolean dropNamespace( + String[] namespace, + boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException { + return asNamespaceCatalog().dropNamespace(namespace, cascade); } @Override diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index f70746b612e9..c1a4960068d2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -20,6 +20,7 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; import java.util.Map; @@ -136,15 +137,20 @@ void alterNamespace( NamespaceChange... changes) throws NoSuchNamespaceException; /** - * Drop a namespace from the catalog, recursively dropping all objects within the namespace. + * Drop a namespace from the catalog with cascade mode, recursively dropping all objects + * within the namespace if cascade is true. *
* If the catalog implementation does not support this operation, it may throw * {@link UnsupportedOperationException}. * * @param namespace a multi-part namespace + * @param cascade When true, deletes all objects under the namespace * @return true if the namespace was dropped * @throws NoSuchNamespaceException If the namespace does not exist (optional) + * @throws NonEmptyNamespaceException If the namespace is non-empty and cascade is false * @throws UnsupportedOperationException If drop is not a supported operation */ - boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException; + boolean dropNamespace( + String[] namespace, + boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala new file mode 100644 index 000000000000..f3ff28f74fcc --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + +/** + * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception + * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. + */ +case class NonEmptyNamespaceException( + override val message: String, + override val cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) { + + def this(namespace: Array[String]) = { + this(s"Namespace '${namespace.quoted}' is non empty.") + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index 0cca1cc9bebf..d00bc31e07f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -820,7 +820,7 @@ class CatalogSuite extends SparkFunSuite { assert(catalog.namespaceExists(testNs) === false) - val ret = catalog.dropNamespace(testNs) + val ret = catalog.dropNamespace(testNs, cascade = false) assert(ret === false) } @@ -833,7 +833,7 @@ class CatalogSuite extends SparkFunSuite { assert(catalog.namespaceExists(testNs) === true) assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) - val ret = catalog.dropNamespace(testNs) + val ret = catalog.dropNamespace(testNs, cascade = false) assert(ret === true) assert(catalog.namespaceExists(testNs) === false) @@ -845,7 +845,7 @@ class CatalogSuite extends SparkFunSuite { catalog.createNamespace(testNs, Map("property" -> "value").asJava) catalog.createTable(testIdent, schema, Array.empty, emptyProps) - assert(catalog.dropNamespace(testNs)) + assert(catalog.dropNamespace(testNs, cascade = true)) assert(!catalog.namespaceExists(testNs)) intercept[NoSuchNamespaceException](catalog.listTables(testNs)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index d8e6bc4149d9..428aec703674 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{SortOrder, Transform} import org.apache.spark.sql.types.StructType @@ -213,10 +213,16 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp namespaces.put(namespace.toList, CatalogV2Util.applyNamespaceChanges(metadata, changes)) } - override def dropNamespace(namespace: Array[String]): Boolean = { - listNamespaces(namespace).foreach(dropNamespace) + override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = { try { - listTables(namespace).foreach(dropTable) + if (!cascade) { + if (listTables(namespace).nonEmpty || listNamespaces(namespace).nonEmpty) { + throw new NonEmptyNamespaceException(namespace) + } + } else { + listNamespaces(namespace).foreach(namespace => dropNamespace(namespace, cascade)) + listTables(namespace).foreach(dropTable) + } } catch { case _: NoSuchNamespaceException => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala index 9a9d8e1d4d57..5d302055e7d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.CatalogPlugin import org.apache.spark.sql.errors.QueryCompilationErrors @@ -37,17 +38,11 @@ case class DropNamespaceExec( val nsCatalog = catalog.asNamespaceCatalog val ns = namespace.toArray if (nsCatalog.namespaceExists(ns)) { - // The default behavior of `SupportsNamespace.dropNamespace()` is cascading, - // so make sure the namespace to drop is empty. - if (!cascade) { - if (catalog.asTableCatalog.listTables(ns).nonEmpty - || nsCatalog.listNamespaces(ns).nonEmpty) { + try { + nsCatalog.dropNamespace(ns, cascade) + } catch { + case _: NonEmptyNamespaceException => throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace) - } - } - - if (!nsCatalog.dropNamespace(ns)) { - throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace) } } else if (!ifExists) { throw QueryCompilationErrors.noSuchNamespaceError(ns) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 3ea7d0f578b3..d9cfe0aa04dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -286,12 +286,11 @@ class V2SessionCatalog(catalog: SessionCatalog) } } - override def dropNamespace(namespace: Array[String]): Boolean = namespace match { + override def dropNamespace( + namespace: Array[String], + cascade: Boolean): Boolean = namespace match { case Array(db) if catalog.databaseExists(db) => - if (catalog.listTables(db).nonEmpty) { - throw QueryExecutionErrors.namespaceNotEmptyError(namespace) - } - catalog.dropDatabase(db, ignoreIfNotExists = false, cascade = false) + catalog.dropDatabase(db, ignoreIfNotExists = false, cascade) true case Array(_) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 566706486d3f..1658f0dce7fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -278,7 +278,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging } } - override def dropNamespace(namespace: Array[String]): Boolean = namespace match { + override def dropNamespace( + namespace: Array[String], + cascade: Boolean): Boolean = namespace match { case Array(db) if namespaceExists(namespace) => if (listTables(Array(db)).nonEmpty) { throw QueryExecutionErrors.namespaceNotEmptyError(namespace) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 86f4dc467638..646eccb4cdd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -67,10 +67,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { override protected def afterAll(): Unit = { val catalog = newCatalog() - catalog.dropNamespace(Array("db")) - catalog.dropNamespace(Array("db2")) - catalog.dropNamespace(Array("ns")) - catalog.dropNamespace(Array("ns2")) + catalog.dropNamespace(Array("db"), cascade = true) + catalog.dropNamespace(Array("db2"), cascade = true) + catalog.dropNamespace(Array("ns"), cascade = true) + catalog.dropNamespace(Array("ns2"), cascade = true) super.afterAll() } @@ -811,7 +811,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.listNamespaces(Array()) === Array(testNs, defaultNs)) assert(catalog.listNamespaces(testNs) === Array()) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("listNamespaces: fail if missing namespace") { @@ -849,7 +849,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === true) checkMetadata(metadata.asScala, Map("property" -> "value")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("loadNamespaceMetadata: empty metadata") { @@ -864,7 +864,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === true) checkMetadata(metadata.asScala, emptyProps.asScala) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: basic behavior") { @@ -884,7 +884,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { checkMetadata(metadata, Map("property" -> "value")) assert(expectedPath === metadata("location")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: initialize location") { @@ -900,7 +900,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { checkMetadata(metadata, Map.empty) assert(expectedPath === metadata("location")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: relative location") { @@ -917,7 +917,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { checkMetadata(metadata, Map.empty) assert(expectedPath === metadata("location")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: fail if namespace already exists") { @@ -933,7 +933,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === true) checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: fail nested namespace") { @@ -948,7 +948,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(exc.getMessage.contains("Invalid namespace name: db.nested")) - catalog.dropNamespace(Array("db")) + catalog.dropNamespace(Array("db"), cascade = false) } test("createTable: fail if namespace does not exist") { @@ -969,7 +969,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === false) - val ret = catalog.dropNamespace(testNs) + val ret = catalog.dropNamespace(testNs, cascade = false) assert(ret === false) } @@ -981,7 +981,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === true) - val ret = catalog.dropNamespace(testNs) + val ret = catalog.dropNamespace(testNs, cascade = false) assert(ret === true) assert(catalog.namespaceExists(testNs) === false) @@ -993,8 +993,8 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.createNamespace(testNs, Map("property" -> "value").asJava) catalog.createTable(testIdent, schema, Array.empty, emptyProps) - val exc = intercept[IllegalStateException] { - catalog.dropNamespace(testNs) + val exc = intercept[AnalysisException] { + catalog.dropNamespace(testNs, cascade = false) } assert(exc.getMessage.contains(testNs.quoted)) @@ -1002,7 +1002,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) catalog.dropTable(testIdent) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("alterNamespace: basic behavior") { @@ -1027,7 +1027,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("alterNamespace: update namespace location") { @@ -1050,7 +1050,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", "relativeP")) assert(newRelativePath === spark.catalog.getDatabase(testNs(0)).locationUri) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("alterNamespace: update namespace comment") { @@ -1065,7 +1065,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(newComment === spark.catalog.getDatabase(testNs(0)).description) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("alterNamespace: fail if namespace doesn't exist") { @@ -1092,6 +1092,6 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(exc.getMessage.contains(s"Cannot remove reserved property: $p")) } - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } }