diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index a81d8f79d6fcc..c6d21540f27d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -42,6 +42,7 @@ class CatalogManager( defaultSessionCatalog: CatalogPlugin, val v1SessionCatalog: SessionCatalog) extends Logging { import CatalogManager.SESSION_CATALOG_NAME + import CatalogV2Util._ private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] @@ -106,13 +107,15 @@ class CatalogManager( } def setCurrentNamespace(namespace: Array[String]): Unit = synchronized { - if (currentCatalog.name() == SESSION_CATALOG_NAME) { - if (namespace.length != 1) { + currentCatalog match { + case _ if isSessionCatalog(currentCatalog) && namespace.length == 1 => + v1SessionCatalog.setCurrentDatabase(namespace.head) + case _ if isSessionCatalog(currentCatalog) => throw new NoSuchNamespaceException(namespace) - } - v1SessionCatalog.setCurrentDatabase(namespace.head) - } else { - _currentNamespace = Some(namespace) + case catalog: SupportsNamespaces if !catalog.namespaceExists(namespace) => + throw new NoSuchNamespaceException(namespace) + case _ => + _currentNamespace = Some(namespace) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala index 17d326019f86b..7dd0753fcf777 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala @@ -19,9 +19,12 @@ package org.apache.spark.sql.connector.catalog import java.net.URI +import scala.collection.JavaConverters._ + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog, NoSuchNamespaceException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.connector.InMemoryTableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -108,6 +111,19 @@ class CatalogManagerSuite extends SparkFunSuite { assert(v1SessionCatalog.getCurrentDatabase == "default") catalogManager.setCurrentNamespace(Array("test2")) assert(v1SessionCatalog.getCurrentDatabase == "default") + + // Check namespace existence if currentCatalog implements SupportsNamespaces. + conf.setConfString("spark.sql.catalog.testCatalog", classOf[InMemoryTableCatalog].getName) + catalogManager.setCurrentCatalog("testCatalog") + catalogManager.currentCatalog.asInstanceOf[InMemoryTableCatalog] + .createNamespace(Array("test3"), Map.empty[String, String].asJava) + assert(v1SessionCatalog.getCurrentDatabase == "default") + catalogManager.setCurrentNamespace(Array("test3")) + assert(v1SessionCatalog.getCurrentDatabase == "default") + + intercept[NoSuchNamespaceException] { + catalogManager.setCurrentNamespace(Array("ns1", "ns2")) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ba4200d84d46b..f1215ab051f28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1392,11 +1392,26 @@ class DataSourceV2SQLSuite assert(exception.getMessage.contains("Database 'ns1' not found")) } - test("Use: v2 catalog is used and namespace does not exist") { - // Namespaces are not required to exist for v2 catalogs. - sql("USE testcat.ns1.ns2") - val catalogManager = spark.sessionState.catalogManager - assert(catalogManager.currentNamespace === Array("ns1", "ns2")) + test("SPARK-31100: Use: v2 catalog that implements SupportsNamespaces is used " + + "and namespace not exists") { + // Namespaces are required to exist for v2 catalogs that implements SupportsNamespaces. + val exception = intercept[NoSuchNamespaceException] { + sql("USE testcat.ns1.ns2") + } + assert(exception.getMessage.contains("Namespace 'ns1.ns2' not found")) + } + + test("SPARK-31100: Use: v2 catalog that does not implement SupportsNameSpaces is used " + + "and namespace does not exist") { + // Namespaces are not required to exist for v2 catalogs + // that does not implement SupportsNamespaces. + withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) { + val catalogManager = spark.sessionState.catalogManager + + sql("USE dummy.ns1") + assert(catalogManager.currentCatalog.name() == "dummy") + assert(catalogManager.currentNamespace === Array("ns1")) + } } test("ShowCurrentNamespace: basic tests") { @@ -1418,6 +1433,8 @@ class DataSourceV2SQLSuite sql("USE testcat") testShowCurrentNamespace("testcat", "") + + sql("CREATE NAMESPACE testcat.ns1.ns2") sql("USE testcat.ns1.ns2") testShowCurrentNamespace("testcat", "ns1.ns2") } @@ -2257,6 +2274,7 @@ class DataSourceV2SQLSuite spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) val sessionCatalogName = CatalogManager.SESSION_CATALOG_NAME + sql("CREATE NAMESPACE testcat.ns1.ns2") sql("USE testcat.ns1.ns2") sql("CREATE TABLE t USING foo AS SELECT 1 col") checkAnswer(spark.table("t"), Row(1))