Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
.exists(_.contains("catalog comment"))
assert(createCommentWarning === false)

catalog.dropNamespace(Array("foo"))
catalog.dropNamespace(Array("foo"), cascade = false)
assert(catalog.namespaceExists(Array("foo")) === false)
assert(catalog.listNamespaces() === builtinNamespaces)
val msg = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ public void alterNamespace(
}

@Override
public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
return asNamespaceCatalog().dropNamespace(namespace);
public boolean dropNamespace(
String[] namespace,
boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException {
return asNamespaceCatalog().dropNamespace(namespace, cascade);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;

import java.util.Map;

Expand Down Expand Up @@ -136,15 +137,20 @@ void alterNamespace(
NamespaceChange... changes) throws NoSuchNamespaceException;

/**
* Drop a namespace from the catalog, recursively dropping all objects within the namespace.
* Drop a namespace from the catalog with cascade mode, recursively dropping all objects
* within the namespace if cascade is true.
* <p>
* If the catalog implementation does not support this operation, it may throw
* {@link UnsupportedOperationException}.
*
* @param namespace a multi-part namespace
* @param cascade When true, deletes all objects under the namespace
* @return true if the namespace was dropped
* @throws NoSuchNamespaceException If the namespace does not exist (optional)
* @throws NonEmptyNamespaceException If the namespace is non-empty and cascade is false
* @throws UnsupportedOperationException If drop is not a supported operation
*/
boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException;
boolean dropNamespace(
String[] namespace,
boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException;
Copy link
Member

@HyukjinKwon HyukjinKwon Jan 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. this API was added in 3.0.0 at SPARK-27661. Shouldn't we have overwritten version to keep the binary compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should keep the old API dropNamespace(String[] namespace), and add a new API dropNamespace( String[] namespace, boolean cascade)? @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise the downstream datasources complied against Spark 3.2 would not work with Spark 3.3 with an exception like method does not exist. Mima plugin we use usually detects such binary compatibility but seems it didn't for some reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I create a PR for reversing this API dropNamespace and adding a new one? WDYT? @cloud-fan @HyukjinKwon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We changed the implementation of DropNamespaceExec to not call listTables at the beginning, and the catalog implementation must deal with the cascade parameter properly. It's better to break binary compatibility here, instead of silently calling the old dropNamespace implementation which may drop all the tables in the namespace.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep the binary compatibility but change the old behavior?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's possible, unless we add versioning to the DS v2 APIs, so that we know which version the catalog implementation is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we implement default method of dropNamespace(String[] namespace) that calls dropNamespace(namespace, cascade = false) or dropNamespace(namespace, cascade = true) with deprecating and pointing the alternative? It already used to work with cascading before, and that's what we documented. This API was added from Spark 3.0.0, and I don't think we should break this binary compatibility unless we must.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think that we shouldn't make this implemented, we should throw an exception with keeping the signature so we can show nicer error message instead of method not found.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we implement default method of dropNamespace(String[] namespace)

This doesn't help, because the new dropNamespace(String[] namespace, boolean cascade) is not implemented and will break all the existing implementations.

A common way to keep backward compatibility is to add default implementation for the new API. e.g.

boolean dropNamespace(String[] namespace, boolean cascade) {
  if (cascade) dropNamespace(namespace) else throw ...
}

However, DROP DATABASE is much more commonly used than DROP DATABASE ... CASCADE. So this doesn't help either.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._


/**
* Thrown by a catalog when an item already exists. The analyzer will rethrow the exception
* as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
*/
case class NonEmptyNamespaceException(
override val message: String,
override val cause: Option[Throwable] = None)
extends AnalysisException(message, cause = cause) {

def this(namespace: Array[String]) = {
this(s"Namespace '${namespace.quoted}' is non empty.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ class CatalogSuite extends SparkFunSuite {

assert(catalog.namespaceExists(testNs) === false)

val ret = catalog.dropNamespace(testNs)
val ret = catalog.dropNamespace(testNs, cascade = false)

assert(ret === false)
}
Expand All @@ -833,7 +833,7 @@ class CatalogSuite extends SparkFunSuite {
assert(catalog.namespaceExists(testNs) === true)
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))

val ret = catalog.dropNamespace(testNs)
val ret = catalog.dropNamespace(testNs, cascade = false)

assert(ret === true)
assert(catalog.namespaceExists(testNs) === false)
Expand All @@ -845,7 +845,7 @@ class CatalogSuite extends SparkFunSuite {
catalog.createNamespace(testNs, Map("property" -> "value").asJava)
catalog.createTable(testIdent, schema, Array.empty, emptyProps)

assert(catalog.dropNamespace(testNs))
assert(catalog.dropNamespace(testNs, cascade = true))

assert(!catalog.namespaceExists(testNs))
intercept[NoSuchNamespaceException](catalog.listTables(testNs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -213,10 +213,16 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp
namespaces.put(namespace.toList, CatalogV2Util.applyNamespaceChanges(metadata, changes))
}

override def dropNamespace(namespace: Array[String]): Boolean = {
listNamespaces(namespace).foreach(dropNamespace)
override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = {
try {
listTables(namespace).foreach(dropTable)
if (!cascade) {
if (listTables(namespace).nonEmpty || listNamespaces(namespace).nonEmpty) {
throw new NonEmptyNamespaceException(namespace)
}
} else {
listNamespaces(namespace).foreach(namespace => dropNamespace(namespace, cascade))
listTables(namespace).foreach(dropTable)
}
} catch {
case _: NoSuchNamespaceException =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand All @@ -37,17 +38,11 @@ case class DropNamespaceExec(
val nsCatalog = catalog.asNamespaceCatalog
val ns = namespace.toArray
if (nsCatalog.namespaceExists(ns)) {
// The default behavior of `SupportsNamespace.dropNamespace()` is cascading,
// so make sure the namespace to drop is empty.
if (!cascade) {
if (catalog.asTableCatalog.listTables(ns).nonEmpty
|| nsCatalog.listNamespaces(ns).nonEmpty) {
try {
nsCatalog.dropNamespace(ns, cascade)
} catch {
case _: NonEmptyNamespaceException =>
throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace)
}
}

if (!nsCatalog.dropNamespace(ns)) {
throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace)
}
} else if (!ifExists) {
throw QueryCompilationErrors.noSuchNamespaceError(ns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,11 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
}

override def dropNamespace(namespace: Array[String]): Boolean = namespace match {
override def dropNamespace(
namespace: Array[String],
cascade: Boolean): Boolean = namespace match {
case Array(db) if catalog.databaseExists(db) =>
if (catalog.listTables(db).nonEmpty) {
throw QueryExecutionErrors.namespaceNotEmptyError(namespace)
}
catalog.dropDatabase(db, ignoreIfNotExists = false, cascade = false)
catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
true

case Array(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
}
}

override def dropNamespace(namespace: Array[String]): Boolean = namespace match {
override def dropNamespace(
namespace: Array[String],
cascade: Boolean): Boolean = namespace match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beliefer do you know how JDBC support DROP DATABASE CASCADE?

Copy link
Contributor

@beliefer beliefer Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DROP DATABASE fails if exists table in database.
DROP DATABASE CASCADE drop all the table in database and database itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a followup PR to fix JDBC source? We need to respect the cascade parameter. thanks!

Copy link
Contributor

@beliefer beliefer Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thank you for the ping. I will try to fix it.

case Array(db) if namespaceExists(namespace) =>
if (listTables(Array(db)).nonEmpty) {
throw QueryExecutionErrors.namespaceNotEmptyError(namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {

override protected def afterAll(): Unit = {
val catalog = newCatalog()
catalog.dropNamespace(Array("db"))
catalog.dropNamespace(Array("db2"))
catalog.dropNamespace(Array("ns"))
catalog.dropNamespace(Array("ns2"))
catalog.dropNamespace(Array("db"), cascade = true)
catalog.dropNamespace(Array("db2"), cascade = true)
catalog.dropNamespace(Array("ns"), cascade = true)
catalog.dropNamespace(Array("ns2"), cascade = true)
super.afterAll()
}

Expand Down Expand Up @@ -811,7 +811,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.listNamespaces(Array()) === Array(testNs, defaultNs))
assert(catalog.listNamespaces(testNs) === Array())

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("listNamespaces: fail if missing namespace") {
Expand Down Expand Up @@ -849,7 +849,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === true)
checkMetadata(metadata.asScala, Map("property" -> "value"))

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("loadNamespaceMetadata: empty metadata") {
Expand All @@ -864,7 +864,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === true)
checkMetadata(metadata.asScala, emptyProps.asScala)

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("createNamespace: basic behavior") {
Expand All @@ -884,7 +884,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
checkMetadata(metadata, Map("property" -> "value"))
assert(expectedPath === metadata("location"))

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("createNamespace: initialize location") {
Expand All @@ -900,7 +900,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
checkMetadata(metadata, Map.empty)
assert(expectedPath === metadata("location"))

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("createNamespace: relative location") {
Expand All @@ -917,7 +917,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
checkMetadata(metadata, Map.empty)
assert(expectedPath === metadata("location"))

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("createNamespace: fail if namespace already exists") {
Expand All @@ -933,7 +933,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === true)
checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value"))

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("createNamespace: fail nested namespace") {
Expand All @@ -948,7 +948,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

assert(exc.getMessage.contains("Invalid namespace name: db.nested"))

catalog.dropNamespace(Array("db"))
catalog.dropNamespace(Array("db"), cascade = false)
}

test("createTable: fail if namespace does not exist") {
Expand All @@ -969,7 +969,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

assert(catalog.namespaceExists(testNs) === false)

val ret = catalog.dropNamespace(testNs)
val ret = catalog.dropNamespace(testNs, cascade = false)

assert(ret === false)
}
Expand All @@ -981,7 +981,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

assert(catalog.namespaceExists(testNs) === true)

val ret = catalog.dropNamespace(testNs)
val ret = catalog.dropNamespace(testNs, cascade = false)

assert(ret === true)
assert(catalog.namespaceExists(testNs) === false)
Expand All @@ -993,16 +993,16 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.createNamespace(testNs, Map("property" -> "value").asJava)
catalog.createTable(testIdent, schema, Array.empty, emptyProps)

val exc = intercept[IllegalStateException] {
catalog.dropNamespace(testNs)
val exc = intercept[AnalysisException] {
catalog.dropNamespace(testNs, cascade = false)
}

assert(exc.getMessage.contains(testNs.quoted))
assert(catalog.namespaceExists(testNs) === true)
checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value"))

catalog.dropTable(testIdent)
catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("alterNamespace: basic behavior") {
Expand All @@ -1027,7 +1027,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.loadNamespaceMetadata(testNs).asScala,
Map("property" -> "value"))

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("alterNamespace: update namespace location") {
Expand All @@ -1050,7 +1050,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", "relativeP"))
assert(newRelativePath === spark.catalog.getDatabase(testNs(0)).locationUri)

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("alterNamespace: update namespace comment") {
Expand All @@ -1065,7 +1065,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {

assert(newComment === spark.catalog.getDatabase(testNs(0)).description)

catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}

test("alterNamespace: fail if namespace doesn't exist") {
Expand All @@ -1092,6 +1092,6 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(exc.getMessage.contains(s"Cannot remove reserved property: $p"))

}
catalog.dropNamespace(testNs)
catalog.dropNamespace(testNs, cascade = false)
}
}