Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ statement
SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| ALTER database db=errorCapturingIdentifier
SET locationSpec #setDatabaseLocation
| DROP database (IF EXISTS)? db=errorCapturingIdentifier
(RESTRICT | CASCADE)? #dropDatabase
| DROP (database | NAMESPACE) (IF EXISTS)? multipartIdentifier
(RESTRICT | CASCADE)? #dropNamespace
| SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)?
(LIKE? pattern=STRING)? #showNamespaces
| createTableHeader ('(' colTypeList ')')? tableProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
c.ifNotExists,
c.properties)

case DropNamespaceStatement(NonSessionCatalog(catalog, nameParts), ifExists, cascade) =>
DropNamespace(catalog.asNamespaceCatalog, nameParts, ifExists, cascade)

case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2375,6 +2375,21 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
properties)
}

/**
* Create a [[DropNamespaceStatement]] command.
*
* For example:
* {{{
* DROP (DATABASE|SCHEMA|NAMESPACE) [IF EXISTS] ns1.ns2 [RESTRICT|CASCADE];
* }}}
*/
override def visitDropNamespace(ctx: DropNamespaceContext): LogicalPlan = withOrigin(ctx) {
DropNamespaceStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.EXISTS != null,
ctx.CASCADE != null)
}

/**
* Create a [[ShowNamespacesStatement]] command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,14 @@ object CreateNamespaceStatement {
val LOCATION_PROPERTY_KEY: String = "location"
}

/**
* A DROP NAMESPACE statement, as parsed from SQL.
*/
case class DropNamespaceStatement(
namespace: Seq[String],
ifExists: Boolean,
cascade: Boolean) extends ParsedStatement

/**
* A SHOW NAMESPACES statement, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ case class CreateNamespace(
ifNotExists: Boolean,
properties: Map[String, String]) extends Command

/**
* The logical plan of the DROP NAMESPACE command that works for v2 catalogs.
*/
case class DropNamespace(
catalog: SupportsNamespaces,
namespace: Seq[String],
ifExists: Boolean,
cascade: Boolean) extends Command

/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,28 @@ class DDLParserSuite extends AnalysisTest {
"location" -> "/home/user/db")))
}

test("drop namespace") {
comparePlans(
parsePlan("DROP NAMESPACE a.b.c"),
DropNamespaceStatement(Seq("a", "b", "c"), ifExists = false, cascade = false))

comparePlans(
parsePlan("DROP NAMESPACE IF EXISTS a.b.c"),
DropNamespaceStatement(Seq("a", "b", "c"), ifExists = true, cascade = false))

comparePlans(
parsePlan("DROP NAMESPACE IF EXISTS a.b.c RESTRICT"),
DropNamespaceStatement(Seq("a", "b", "c"), ifExists = true, cascade = false))

comparePlans(
parsePlan("DROP NAMESPACE IF EXISTS a.b.c CASCADE"),
DropNamespaceStatement(Seq("a", "b", "c"), ifExists = true, cascade = true))

comparePlans(
parsePlan("DROP NAMESPACE a.b.c CASCADE"),
DropNamespaceStatement(Seq("a", "b", "c"), ifExists = false, cascade = true))
}

test("show databases: basic") {
comparePlans(
parsePlan("SHOW DATABASES"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowCreateTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropDatabaseCommand, DropTableCommand, ShowCreateTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -271,6 +271,13 @@ class ResolveSessionCatalog(
CreateNamespaceStatement.LOCATION_PROPERTY_KEY
CreateDatabaseCommand(nameParts.head, c.ifNotExists, location, comment, newProperties)

case d @ DropNamespaceStatement(SessionCatalog(_, nameParts), _, _) =>
if (nameParts.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${nameParts.quoted}")
}
DropDatabaseCommand(nameParts.head, d.ifExists, d.cascade)

case ShowTablesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) =>
if (nameParts.length != 1) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,18 +323,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
)
}

/**
* Create a [[DropDatabaseCommand]] command.
*
* For example:
* {{{
* DROP (DATABASE|SCHEMA) [IF EXISTS] database [RESTRICT|CASCADE];
* }}}
*/
override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) {
DropDatabaseCommand(ctx.db.getText, ctx.EXISTS != null, ctx.CASCADE != null)
}

/**
* Create a [[DescribeDatabaseCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
Expand Down Expand Up @@ -295,6 +295,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case CreateNamespace(catalog, namespace, ifNotExists, properties) =>
CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil

case DropNamespace(catalog, namespace, ifExists, cascade) =>
DropNamespaceExec(catalog, namespace, ifExists, cascade) :: Nil

case r: ShowNamespaces =>
ShowNamespacesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.SupportsNamespaces

/**
* Physical plan node for creating a namespace.
*/
case class DropNamespaceExec(
catalog: SupportsNamespaces,
namespace: Seq[String],
ifExists: Boolean,
cascade: Boolean)
extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
// TODO: How to handle when cascade is true?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue / @cloud-fan did we discuss how we want to drop namespaces if cascade is set for v2 catalogs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the v2 API, namespaces that are not empty can be rejected by throwing IllegalStateException. I'd say catch that if cascade is set and throw an error that cascade isn't supported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this is a good workaround now. I'm wondering if we should support cascade before 3.0, as this may break DS v2 APIs, e.g. we may need to add a cascade parameter to the dropNamespace API. DROP TABLE has the same problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I updated with a workaround solution. And +1 for adding cascade option to dropNamespace API. I initially tried to implement cascading option with existing APIs, but it was a bit cumbersome.

val ns = namespace.toArray
if (catalog.namespaceExists(ns)) {
catalog.dropNamespace(ns)
} else if (!ifExists) {
throw new NoSuchNamespaceException(ns)
}

Seq.empty
}

override def output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector
import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -790,6 +790,39 @@ class DataSourceV2SQLSuite
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1")
}

test("DropNamespace: basic tests") {
// Session catalog is used.
sql("CREATE NAMESPACE ns")
testShowNamespaces("SHOW NAMESPACES", Seq("default", "ns"))
sql("DROP NAMESPACE ns")
testShowNamespaces("SHOW NAMESPACES", Seq("default"))

// V2 non-session catalog is used.
sql("CREATE NAMESPACE testcat.ns1")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1"))
sql("DROP NAMESPACE testcat.ns1")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq())
}

test("DropNamespace: non-empty namespace") {
sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo")
testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1"))

val exception = intercept[IllegalStateException] {
sql("DROP NAMESPACE testcat.ns1")
}
assert(exception.getMessage.contains("Cannot delete non-empty namespace: ns1"))
}

test("DropNamespace: test handling of 'IF EXISTS'") {
sql("DROP NAMESPACE IF EXISTS testcat.unknown")

val exception = intercept[NoSuchNamespaceException] {
sql("DROP NAMESPACE testcat.ns1")
}
assert(exception.getMessage.contains("Namespace 'ns1' not found"))
}

test("ShowNamespaces: show root namespaces with default v2 catalog") {
spark.conf.set("spark.sql.default.catalog", "testcat")

Expand Down