From 0cd7ab1aa8bdd6d1dbfcab1cf857f37f16e158b3 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Mon, 17 Jan 2022 18:21:08 +0700 Subject: [PATCH 1/7] initial commit --- .../sql/jdbc/v2/V2JDBCNamespaceTest.scala | 2 +- .../catalog/DelegatingCatalogExtension.java | 6 ++- .../connector/catalog/SupportsNamespaces.java | 5 ++- .../sql/connector/catalog/CatalogSuite.scala | 6 +-- .../catalog/InMemoryTableCatalog.scala | 4 +- .../datasources/v2/DropNamespaceExec.scala | 11 +---- .../datasources/v2/V2SessionCatalog.scala | 9 ++--- .../v2/jdbc/JDBCTableCatalog.scala | 4 +- .../v2/V2SessionCatalogSuite.scala | 40 +++++++++---------- 9 files changed, 41 insertions(+), 46 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 284b05c1cc12..0c6b2701c92b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -52,7 +52,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte .exists(_.contains("catalog comment")) assert(createCommentWarning === false) - catalog.dropNamespace(Array("foo")) + catalog.dropNamespace(Array("foo"), cascade = false) assert(catalog.namespaceExists(Array("foo")) === false) assert(catalog.listNamespaces() === builtinNamespaces) val msg = intercept[AnalysisException] { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index 48a859a4159f..98045f3a7e53 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -155,8 +155,10 @@ public void alterNamespace( } @Override - public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException { - return asNamespaceCatalog().dropNamespace(namespace); + public boolean dropNamespace( + String[] namespace, + boolean cascade) throws NoSuchNamespaceException { + return asNamespaceCatalog().dropNamespace(namespace, cascade); } @Override diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index f70746b612e9..e0a69984fec7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -136,7 +136,8 @@ void alterNamespace( NamespaceChange... changes) throws NoSuchNamespaceException; /** - * Drop a namespace from the catalog, recursively dropping all objects within the namespace. + * Drop a namespace from the catalog with cascade mode, recursively dropping all objects + * within the namespace if cascade is true. *

* If the catalog implementation does not support this operation, it may throw * {@link UnsupportedOperationException}. @@ -146,5 +147,5 @@ void alterNamespace( * @throws NoSuchNamespaceException If the namespace does not exist (optional) * @throws UnsupportedOperationException If drop is not a supported operation */ - boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException; + boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException; } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index 0cca1cc9bebf..312943f78a6b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -820,7 +820,7 @@ class CatalogSuite extends SparkFunSuite { assert(catalog.namespaceExists(testNs) === false) - val ret = catalog.dropNamespace(testNs) + val ret = catalog.dropNamespace(testNs, cascade = false) assert(ret === false) } @@ -833,7 +833,7 @@ class CatalogSuite extends SparkFunSuite { assert(catalog.namespaceExists(testNs) === true) assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value")) - val ret = catalog.dropNamespace(testNs) + val ret = catalog.dropNamespace(testNs, cascade = false) assert(ret === true) assert(catalog.namespaceExists(testNs) === false) @@ -845,7 +845,7 @@ class CatalogSuite extends SparkFunSuite { catalog.createNamespace(testNs, Map("property" -> "value").asJava) catalog.createTable(testIdent, schema, Array.empty, emptyProps) - assert(catalog.dropNamespace(testNs)) + assert(catalog.dropNamespace(testNs, cascade = false)) assert(!catalog.namespaceExists(testNs)) intercept[NoSuchNamespaceException](catalog.listTables(testNs)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index d8e6bc4149d9..9ba32133571f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -213,8 +213,8 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp namespaces.put(namespace.toList, CatalogV2Util.applyNamespaceChanges(metadata, changes)) } - override def dropNamespace(namespace: Array[String]): Boolean = { - listNamespaces(namespace).foreach(dropNamespace) + override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = { + listNamespaces(namespace).foreach(namespace => dropNamespace(namespace, cascade)) try { listTables(namespace).foreach(dropTable) } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala index 9a9d8e1d4d57..a0b4ad94bc19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala @@ -37,16 +37,7 @@ case class DropNamespaceExec( val nsCatalog = catalog.asNamespaceCatalog val ns = namespace.toArray if (nsCatalog.namespaceExists(ns)) { - // The default behavior of `SupportsNamespace.dropNamespace()` is cascading, - // so make sure the namespace to drop is empty. - if (!cascade) { - if (catalog.asTableCatalog.listTables(ns).nonEmpty - || nsCatalog.listNamespaces(ns).nonEmpty) { - throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace) - } - } - - if (!nsCatalog.dropNamespace(ns)) { + if (!nsCatalog.dropNamespace(ns, cascade)) { throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace) } } else if (!ifExists) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 3ea7d0f578b3..d9cfe0aa04dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -286,12 +286,11 @@ class V2SessionCatalog(catalog: SessionCatalog) } } - override def dropNamespace(namespace: Array[String]): Boolean = namespace match { + override def dropNamespace( + namespace: Array[String], + cascade: Boolean): Boolean = namespace match { case Array(db) if catalog.databaseExists(db) => - if (catalog.listTables(db).nonEmpty) { - throw QueryExecutionErrors.namespaceNotEmptyError(namespace) - } - catalog.dropDatabase(db, ignoreIfNotExists = false, cascade = false) + catalog.dropDatabase(db, ignoreIfNotExists = false, cascade) true case Array(_) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 566706486d3f..1658f0dce7fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -278,7 +278,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging } } - override def dropNamespace(namespace: Array[String]): Boolean = namespace match { + override def dropNamespace( + namespace: Array[String], + cascade: Boolean): Boolean = namespace match { case Array(db) if namespaceExists(namespace) => if (listTables(Array(db)).nonEmpty) { throw QueryExecutionErrors.namespaceNotEmptyError(namespace) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 86f4dc467638..6fdb0de9f31c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -67,10 +67,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { override protected def afterAll(): Unit = { val catalog = newCatalog() - catalog.dropNamespace(Array("db")) - catalog.dropNamespace(Array("db2")) - catalog.dropNamespace(Array("ns")) - catalog.dropNamespace(Array("ns2")) + catalog.dropNamespace(Array("db"), cascade = true) + catalog.dropNamespace(Array("db2"), cascade = true) + catalog.dropNamespace(Array("ns"), cascade = true) + catalog.dropNamespace(Array("ns2"), cascade = true) super.afterAll() } @@ -811,7 +811,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.listNamespaces(Array()) === Array(testNs, defaultNs)) assert(catalog.listNamespaces(testNs) === Array()) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("listNamespaces: fail if missing namespace") { @@ -849,7 +849,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === true) checkMetadata(metadata.asScala, Map("property" -> "value")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("loadNamespaceMetadata: empty metadata") { @@ -864,7 +864,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === true) checkMetadata(metadata.asScala, emptyProps.asScala) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: basic behavior") { @@ -884,7 +884,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { checkMetadata(metadata, Map("property" -> "value")) assert(expectedPath === metadata("location")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: initialize location") { @@ -900,7 +900,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { checkMetadata(metadata, Map.empty) assert(expectedPath === metadata("location")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: relative location") { @@ -917,7 +917,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { checkMetadata(metadata, Map.empty) assert(expectedPath === metadata("location")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: fail if namespace already exists") { @@ -933,7 +933,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === true) checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("createNamespace: fail nested namespace") { @@ -948,7 +948,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(exc.getMessage.contains("Invalid namespace name: db.nested")) - catalog.dropNamespace(Array("db")) + catalog.dropNamespace(Array("db"), cascade = false) } test("createTable: fail if namespace does not exist") { @@ -969,7 +969,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === false) - val ret = catalog.dropNamespace(testNs) + val ret = catalog.dropNamespace(testNs, cascade = false) assert(ret === false) } @@ -981,7 +981,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(catalog.namespaceExists(testNs) === true) - val ret = catalog.dropNamespace(testNs) + val ret = catalog.dropNamespace(testNs, cascade = false) assert(ret === true) assert(catalog.namespaceExists(testNs) === false) @@ -994,7 +994,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.createTable(testIdent, schema, Array.empty, emptyProps) val exc = intercept[IllegalStateException] { - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } assert(exc.getMessage.contains(testNs.quoted)) @@ -1002,7 +1002,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) catalog.dropTable(testIdent) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("alterNamespace: basic behavior") { @@ -1027,7 +1027,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("alterNamespace: update namespace location") { @@ -1050,7 +1050,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", "relativeP")) assert(newRelativePath === spark.catalog.getDatabase(testNs(0)).locationUri) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("alterNamespace: update namespace comment") { @@ -1065,7 +1065,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(newComment === spark.catalog.getDatabase(testNs(0)).description) - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } test("alterNamespace: fail if namespace doesn't exist") { @@ -1092,6 +1092,6 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { assert(exc.getMessage.contains(s"Cannot remove reserved property: $p")) } - catalog.dropNamespace(testNs) + catalog.dropNamespace(testNs, cascade = false) } } From ff685edf6520f2b9d743c3274ec02af027c1f791 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Tue, 18 Jan 2022 15:38:57 +0700 Subject: [PATCH 2/7] fix error in V2SessionCatalogSuite test --- .../sql/execution/datasources/v2/V2SessionCatalogSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 6fdb0de9f31c..646eccb4cdd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -993,7 +993,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { catalog.createNamespace(testNs, Map("property" -> "value").asJava) catalog.createTable(testIdent, schema, Array.empty, emptyProps) - val exc = intercept[IllegalStateException] { + val exc = intercept[AnalysisException] { catalog.dropNamespace(testNs, cascade = false) } From e72d0c74ef607410049800770b1ab6994b0cb4e0 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Wed, 19 Jan 2022 08:50:40 +0700 Subject: [PATCH 3/7] handle with cascade in InMemoryTableCatalog --- .../spark/sql/connector/catalog/CatalogSuite.scala | 2 +- .../sql/connector/catalog/InMemoryTableCatalog.scala | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index 312943f78a6b..d00bc31e07f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -845,7 +845,7 @@ class CatalogSuite extends SparkFunSuite { catalog.createNamespace(testNs, Map("property" -> "value").asJava) catalog.createTable(testIdent, schema, Array.empty, emptyProps) - assert(catalog.dropNamespace(testNs, cascade = false)) + assert(catalog.dropNamespace(testNs, cascade = true)) assert(!catalog.namespaceExists(testNs)) intercept[NoSuchNamespaceException](catalog.listTables(testNs)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 9ba32133571f..d60562ff3c0b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{SortOrder, Transform} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -216,7 +217,13 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = { listNamespaces(namespace).foreach(namespace => dropNamespace(namespace, cascade)) try { - listTables(namespace).foreach(dropTable) + if (!cascade) { + if (listTables(namespace).nonEmpty || listNamespaces(namespace).nonEmpty) { + throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace) + } + } else { + listTables(namespace).foreach(dropTable) + } } catch { case _: NoSuchNamespaceException => } @@ -224,7 +231,7 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp } override def listTables(namespace: Array[String]): Array[Identifier] = { - if (namespace.isEmpty || namespaceExists(namespace)) { + if (namespace.isEmpty || namespaceExists(namespace)) { super.listTables(namespace) } else { throw new NoSuchNamespaceException(namespace) From 56addfbc9b315e62b7a15a381e7567f401dd21f0 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Wed, 19 Jan 2022 15:42:46 +0700 Subject: [PATCH 4/7] reformat code --- .../spark/sql/connector/catalog/InMemoryTableCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index d60562ff3c0b..d4a81641c622 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -231,7 +231,7 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp } override def listTables(namespace: Array[String]): Array[Identifier] = { - if (namespace.isEmpty || namespaceExists(namespace)) { + if (namespace.isEmpty || namespaceExists(namespace)) { super.listTables(namespace) } else { throw new NoSuchNamespaceException(namespace) From f0a7f31c0cd131a5f87eba74d6bbcfd1bbc9bb54 Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Thu, 20 Jan 2022 14:46:59 +0700 Subject: [PATCH 5/7] create new exception class NonEmptyNamespace and update caller, tests --- .../catalog/DelegatingCatalogExtension.java | 2 +- .../connector/catalog/SupportsNamespaces.java | 8 ++++- .../catalyst/analysis/NonEmptyException.scala | 36 +++++++++++++++++++ .../catalog/InMemoryTableCatalog.scala | 5 ++- .../datasources/v2/DropNamespaceExec.scala | 8 +++-- 5 files changed, 52 insertions(+), 7 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index 98045f3a7e53..865ac553199a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -157,7 +157,7 @@ public void alterNamespace( @Override public boolean dropNamespace( String[] namespace, - boolean cascade) throws NoSuchNamespaceException { + boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException { return asNamespaceCatalog().dropNamespace(namespace, cascade); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index e0a69984fec7..5b2f06abdac1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -20,6 +20,7 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; import java.util.Map; @@ -143,9 +144,14 @@ void alterNamespace( * {@link UnsupportedOperationException}. * * @param namespace a multi-part namespace + * @param cascade a boolean flag that deletes all namespaces and tables under the namespace + * if it is set true * @return true if the namespace was dropped * @throws NoSuchNamespaceException If the namespace does not exist (optional) + * @throws NonEmptyNamespaceException If the namespace is non-empty * @throws UnsupportedOperationException If drop is not a supported operation */ - boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException; + boolean dropNamespace( + String[] namespace, + boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala new file mode 100644 index 000000000000..f3ff28f74fcc --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + +/** + * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception + * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. + */ +case class NonEmptyNamespaceException( + override val message: String, + override val cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) { + + def this(namespace: Array[String]) = { + this(s"Namespace '${namespace.quoted}' is non empty.") + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index d4a81641c622..a9a0dd02f4f3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -22,10 +22,9 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{SortOrder, Transform} -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -219,7 +218,7 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp try { if (!cascade) { if (listTables(namespace).nonEmpty || listNamespaces(namespace).nonEmpty) { - throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace) + throw new NonEmptyNamespaceException(namespace) } } else { listTables(namespace).foreach(dropTable) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala index a0b4ad94bc19..5d302055e7d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.CatalogPlugin import org.apache.spark.sql.errors.QueryCompilationErrors @@ -37,8 +38,11 @@ case class DropNamespaceExec( val nsCatalog = catalog.asNamespaceCatalog val ns = namespace.toArray if (nsCatalog.namespaceExists(ns)) { - if (!nsCatalog.dropNamespace(ns, cascade)) { - throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace) + try { + nsCatalog.dropNamespace(ns, cascade) + } catch { + case _: NonEmptyNamespaceException => + throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace) } } else if (!ifExists) { throw QueryCompilationErrors.noSuchNamespaceError(ns) From fe3a3deefce481f63821fd0946a2df8be8f108fe Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Fri, 21 Jan 2022 10:14:43 +0700 Subject: [PATCH 6/7] update doc and function --- .../spark/sql/connector/catalog/SupportsNamespaces.java | 5 ++--- .../spark/sql/connector/catalog/InMemoryTableCatalog.scala | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index 5b2f06abdac1..e2faeabef28e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -144,11 +144,10 @@ void alterNamespace( * {@link UnsupportedOperationException}. * * @param namespace a multi-part namespace - * @param cascade a boolean flag that deletes all namespaces and tables under the namespace - * if it is set true + * @param cascade a boolean flag that deletes all object under the namespace if it is set true * @return true if the namespace was dropped * @throws NoSuchNamespaceException If the namespace does not exist (optional) - * @throws NonEmptyNamespaceException If the namespace is non-empty + * @throws NonEmptyNamespaceException If the namespace is non-empty and cascade is false * @throws UnsupportedOperationException If drop is not a supported operation */ boolean dropNamespace( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index a9a0dd02f4f3..428aec703674 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -214,13 +214,13 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp } override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = { - listNamespaces(namespace).foreach(namespace => dropNamespace(namespace, cascade)) try { if (!cascade) { if (listTables(namespace).nonEmpty || listNamespaces(namespace).nonEmpty) { throw new NonEmptyNamespaceException(namespace) } } else { + listNamespaces(namespace).foreach(namespace => dropNamespace(namespace, cascade)) listTables(namespace).foreach(dropTable) } } catch { From 005d0caf411757133260c931576f6dcf578bef9b Mon Sep 17 00:00:00 2001 From: dch nguyen Date: Fri, 21 Jan 2022 10:16:26 +0700 Subject: [PATCH 7/7] update doc --- .../apache/spark/sql/connector/catalog/SupportsNamespaces.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index e2faeabef28e..c1a4960068d2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -144,7 +144,7 @@ void alterNamespace( * {@link UnsupportedOperationException}. * * @param namespace a multi-part namespace - * @param cascade a boolean flag that deletes all object under the namespace if it is set true + * @param cascade When true, deletes all objects under the namespace * @return true if the namespace was dropped * @throws NoSuchNamespaceException If the namespace does not exist (optional) * @throws NonEmptyNamespaceException If the namespace is non-empty and cascade is false