From cd3e0c1d6e0c9a202de15e4ac839683e64ae1068 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 25 Oct 2019 14:26:33 -0700 Subject: [PATCH 1/3] initial checkin --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +- .../catalyst/analysis/ResolveCatalogs.scala | 3 ++ .../sql/catalyst/parser/AstBuilder.scala | 15 ++++++ .../catalyst/plans/logical/statements.scala | 8 ++++ .../catalyst/plans/logical/v2Commands.scala | 9 ++++ .../sql/catalyst/parser/DDLParserSuite.scala | 22 +++++++++ .../analysis/ResolveSessionCatalog.scala | 9 +++- .../spark/sql/execution/SparkSqlParser.scala | 12 ----- .../datasources/v2/DataSourceV2Strategy.scala | 5 +- .../datasources/v2/DropNamespaceExec.scala | 47 +++++++++++++++++++ .../sql/connector/DataSourceV2SQLSuite.scala | 35 +++++++++++++- 11 files changed, 152 insertions(+), 17 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index c97eb3c935be..457a49686a64 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -95,8 +95,8 @@ statement SET DBPROPERTIES tablePropertyList #setDatabaseProperties | ALTER database db=errorCapturingIdentifier SET locationSpec #setDatabaseLocation - | DROP database (IF EXISTS)? db=errorCapturingIdentifier - (RESTRICT | CASCADE)? #dropDatabase + | DROP (database | NAMESPACE) (IF EXISTS)? multipartIdentifier + (RESTRICT | CASCADE)? #dropNamespace | SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)? (LIKE? pattern=STRING)? #showNamespaces | createTableHeader ('(' colTypeList ')')? tableProvider diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 9803fda0678f..7bf0e2515880 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -178,6 +178,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager) c.ifNotExists, c.properties) + case DropNamespaceStatement(NonSessionCatalog(catalog, nameParts), ifExists, cascade) => + DropNamespace(catalog.asNamespaceCatalog, nameParts, ifExists, cascade) + case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) => ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 4fa479f083e1..f213b2ac54a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2375,6 +2375,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging properties) } + /** + * Create a [[DropNamespaceStatement]] command. + * + * For example: + * {{{ + * DROP (DATABASE|SCHEMA|NAMESPACE) [IF EXISTS] ns1.ns2 [RESTRICT|CASCADE]; + * }}} + */ + override def visitDropNamespace(ctx: DropNamespaceContext): LogicalPlan = withOrigin(ctx) { + DropNamespaceStatement( + visitMultipartIdentifier(ctx.multipartIdentifier), + ctx.EXISTS != null, + ctx.CASCADE != null) + } + /** * Create a [[ShowNamespacesStatement]] command. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 655e87fce4e2..e6f0b0f92322 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -295,6 +295,14 @@ object CreateNamespaceStatement { val LOCATION_PROPERTY_KEY: String = "location" } +/** + * A DROP NAMESPACE statement, as parsed from SQL. + */ +case class DropNamespaceStatement( + namespace: Seq[String], + ifExists: Boolean, + cascade: Boolean) extends ParsedStatement + /** * A SHOW NAMESPACES statement, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d80c1c034a86..f587ee2928fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -246,6 +246,15 @@ case class CreateNamespace( ifNotExists: Boolean, properties: Map[String, String]) extends Command +/** + * The logical plan of the DROP NAMESPACE command that works for v2 catalogs. + */ +case class DropNamespace( + catalog: SupportsNamespaces, + namespace: Seq[String], + ifExists: Boolean, + cascade: Boolean) extends Command + /** * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index da01c612b350..4e0686828786 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -929,6 +929,28 @@ class DDLParserSuite extends AnalysisTest { "location" -> "/home/user/db"))) } + test("drop namespace") { + comparePlans( + parsePlan("DROP NAMESPACE a.b.c"), + DropNamespaceStatement(Seq("a", "b", "c"), ifExists = false, cascade = false)) + + comparePlans( + parsePlan("DROP NAMESPACE IF EXISTS a.b.c"), + DropNamespaceStatement(Seq("a", "b", "c"), ifExists = true, cascade = false)) + + comparePlans( + parsePlan("DROP NAMESPACE IF EXISTS a.b.c RESTRICT"), + DropNamespaceStatement(Seq("a", "b", "c"), ifExists = true, cascade = false)) + + comparePlans( + parsePlan("DROP NAMESPACE IF EXISTS a.b.c CASCADE"), + DropNamespaceStatement(Seq("a", "b", "c"), ifExists = true, cascade = true)) + + comparePlans( + parsePlan("DROP NAMESPACE a.b.c CASCADE"), + DropNamespaceStatement(Seq("a", "b", "c"), ifExists = false, cascade = true)) + } + test("show databases: basic") { comparePlans( parsePlan("SHOW DATABASES"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index e7e34b1ef312..98bf7f32c4f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowCreateTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropDatabaseCommand, DropTableCommand, ShowCreateTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf @@ -271,6 +271,13 @@ class ResolveSessionCatalog( CreateNamespaceStatement.LOCATION_PROPERTY_KEY CreateDatabaseCommand(nameParts.head, c.ifNotExists, location, comment, newProperties) + case d @ DropNamespaceStatement(SessionCatalog(_, nameParts), _, _) => + if (nameParts.length != 1) { + throw new AnalysisException( + s"The database name is not valid: ${nameParts.quoted}") + } + DropDatabaseCommand(nameParts.head, d.ifExists, d.cascade) + case ShowTablesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) => if (nameParts.length != 1) { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 20894b39ce5d..9274bcb8fa1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -323,18 +323,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { ) } - /** - * Create a [[DropDatabaseCommand]] command. - * - * For example: - * {{{ - * DROP (DATABASE|SCHEMA) [IF EXISTS] database [RESTRICT|CASCADE]; - * }}} - */ - override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) { - DropDatabaseCommand(ctx.db.getText, ctx.EXISTS != null, ctx.CASCADE != null) - } - /** * Create a [[DescribeDatabaseCommand]] command. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4a7cb7db45de..3041e9e82d59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} @@ -295,6 +295,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case CreateNamespace(catalog, namespace, ifNotExists, properties) => CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil + case DropNamespace(catalog, namespace, ifExists, cascade) => + DropNamespaceExec(catalog, namespace, ifExists, cascade) :: Nil + case r: ShowNamespaces => ShowNamespacesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala new file mode 100644 index 000000000000..e408b0fd32ca --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.SupportsNamespaces + +/** + * Physical plan node for creating a namespace. + */ +case class DropNamespaceExec( + catalog: SupportsNamespaces, + namespace: Seq[String], + ifExists: Boolean, + cascade: Boolean) + extends V2CommandExec { + override protected def run(): Seq[InternalRow] = { + // TODO: How to handle when cascade is true? + val ns = namespace.toArray + if (catalog.namespaceExists(ns)) { + catalog.dropNamespace(ns) + } else if (!ifExists) { + throw new NoSuchNamespaceException(ns) + } + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index b8a8acbba57c..1cffb2b561a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import scala.collection.JavaConverters._ import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.internal.SQLConf @@ -790,6 +790,39 @@ class DataSourceV2SQLSuite sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1") } + test("DropNamespace: basic tests") { + // Session catalog is used. + sql("CREATE NAMESPACE ns") + testShowNamespaces("SHOW NAMESPACES", Seq("default", "ns")) + sql("DROP NAMESPACE ns") + testShowNamespaces("SHOW NAMESPACES", Seq("default")) + + // V2 non-session catalog is used. + sql("CREATE NAMESPACE testcat.ns1") + testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1")) + sql("DROP NAMESPACE testcat.ns1") + testShowNamespaces("SHOW NAMESPACES IN testcat", Seq()) + } + + test("DropNamespace: non-empty namespace") { + sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo") + testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1")) + + val exception = intercept[IllegalStateException] { + sql("DROP NAMESPACE testcat.ns1") + } + assert(exception.getMessage.contains("Cannot delete non-empty namespace: ns1")) + } + + test("DropNamespace: test handling of 'IF EXISTS'") { + sql("DROP NAMESPACE IF EXISTS testcat.unknown") + + val exception = intercept[NoSuchNamespaceException] { + sql("DROP NAMESPACE testcat.ns1") + } + assert(exception.getMessage.contains("Namespace 'ns1' not found")) + } + test("ShowNamespaces: show root namespaces with default v2 catalog") { spark.conf.set("spark.sql.default.catalog", "testcat") From 588b2cea6b20c99b3940c74642c6bac024a2fadd Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 25 Oct 2019 14:32:57 -0700 Subject: [PATCH 2/3] Remove irrelvant tests --- .../execution/command/DDLParserSuite.scala | 45 ------------------- 1 file changed, 45 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index a9b94bea9517..fca835927d98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -74,51 +74,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { }.head } - test("drop database") { - val sql1 = "DROP DATABASE IF EXISTS database_name RESTRICT" - val sql2 = "DROP DATABASE IF EXISTS database_name CASCADE" - val sql3 = "DROP SCHEMA IF EXISTS database_name RESTRICT" - val sql4 = "DROP SCHEMA IF EXISTS database_name CASCADE" - // The default is restrict=true - val sql5 = "DROP DATABASE IF EXISTS database_name" - // The default is ifExists=false - val sql6 = "DROP DATABASE database_name" - val sql7 = "DROP DATABASE database_name CASCADE" - - val parsed1 = parser.parsePlan(sql1) - val parsed2 = parser.parsePlan(sql2) - val parsed3 = parser.parsePlan(sql3) - val parsed4 = parser.parsePlan(sql4) - val parsed5 = parser.parsePlan(sql5) - val parsed6 = parser.parsePlan(sql6) - val parsed7 = parser.parsePlan(sql7) - - val expected1 = DropDatabaseCommand( - "database_name", - ifExists = true, - cascade = false) - val expected2 = DropDatabaseCommand( - "database_name", - ifExists = true, - cascade = true) - val expected3 = DropDatabaseCommand( - "database_name", - ifExists = false, - cascade = false) - val expected4 = DropDatabaseCommand( - "database_name", - ifExists = false, - cascade = true) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - comparePlans(parsed3, expected1) - comparePlans(parsed4, expected2) - comparePlans(parsed5, expected1) - comparePlans(parsed6, expected3) - comparePlans(parsed7, expected4) - } - test("alter database set dbproperties") { // ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) val sql1 = "ALTER DATABASE database_name SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')" From 714e0c523bbdcb2d2f986ad48d2a90c4940a0555 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 28 Oct 2019 08:38:06 -0700 Subject: [PATCH 3/3] address PR comments --- .../sql/catalyst/plans/logical/statements.scala | 2 +- .../datasources/v2/DropNamespaceExec.scala | 14 ++++++++++---- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 13 ++++++++++--- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index e6f0b0f92322..a34ae3cad065 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -297,7 +297,7 @@ object CreateNamespaceStatement { /** * A DROP NAMESPACE statement, as parsed from SQL. - */ + */ case class DropNamespaceStatement( namespace: Seq[String], ifExists: Boolean, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala index e408b0fd32ca..cd63ca6628ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala @@ -17,25 +17,31 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.SupportsNamespaces /** - * Physical plan node for creating a namespace. + * Physical plan node for dropping a namespace. */ case class DropNamespaceExec( catalog: SupportsNamespaces, namespace: Seq[String], ifExists: Boolean, cascade: Boolean) - extends V2CommandExec { + extends V2CommandExec { override protected def run(): Seq[InternalRow] = { - // TODO: How to handle when cascade is true? val ns = namespace.toArray if (catalog.namespaceExists(ns)) { - catalog.dropNamespace(ns) + try { + catalog.dropNamespace(ns) + } catch { + case e: IllegalStateException if cascade => + throw new SparkException( + "Cascade option for droping namespace is not supported in V2 catalog", e) + } } else if (!ifExists) { throw new NoSuchNamespaceException(ns) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 1cffb2b561a4..6002fb905bf1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector import scala.collection.JavaConverters._ +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog._ @@ -804,14 +805,20 @@ class DataSourceV2SQLSuite testShowNamespaces("SHOW NAMESPACES IN testcat", Seq()) } - test("DropNamespace: non-empty namespace") { + test("DropNamespace: drop non-empty namespace") { sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo") testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1")) - val exception = intercept[IllegalStateException] { + val e1 = intercept[IllegalStateException] { sql("DROP NAMESPACE testcat.ns1") } - assert(exception.getMessage.contains("Cannot delete non-empty namespace: ns1")) + assert(e1.getMessage.contains("Cannot delete non-empty namespace: ns1")) + + val e2 = intercept[SparkException] { + sql("DROP NAMESPACE testcat.ns1 CASCADE") + } + assert(e2.getMessage.contains( + "Cascade option for droping namespace is not supported in V2 catalog")) } test("DropNamespace: test handling of 'IF EXISTS'") {