From 9c72aacebafe2d67ce86775368397a1627bd8338 Mon Sep 17 00:00:00 2001 From: stczwd Date: Fri, 13 Mar 2020 15:19:02 +0800 Subject: [PATCH 1/5] [SPARK-31100] Check namespace existens when setting namespace Change-Id: Ibbb3eff4e56d5cf725ebce1e35445bc1155d8d90 --- .../connector/catalog/CatalogManager.scala | 13 +++++----- .../catalog/CatalogManagerSuite.scala | 25 ++++++++++++++++++- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index a81d8f79d6fcc..77fed21f485b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -106,13 +106,14 @@ class CatalogManager( } def setCurrentNamespace(namespace: Array[String]): Unit = synchronized { - if (currentCatalog.name() == SESSION_CATALOG_NAME) { - if (namespace.length != 1) { + currentCatalog match { + case _ if currentCatalog.name() == SESSION_CATALOG_NAME && namespace.length == 1 => + v1SessionCatalog.setCurrentDatabase(namespace.head) + case catalog: SupportsNamespaces if catalog.namespaceExists(namespace) => + logInfo(s"set current namespace to ${namespace.mkString(".")}") + _currentNamespace = Some(namespace) + case _ => throw new NoSuchNamespaceException(namespace) - } - v1SessionCatalog.setCurrentDatabase(namespace.head) - } else { - _currentNamespace = Some(namespace) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala index 17d326019f86b..c98a07ab9bb0f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala @@ -18,6 +18,9 @@ package org.apache.spark.sql.connector.catalog import java.net.URI +import java.util + +import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog, NoSuchNamespaceException} @@ -108,14 +111,34 @@ class CatalogManagerSuite extends SparkFunSuite { assert(v1SessionCatalog.getCurrentDatabase == "default") catalogManager.setCurrentNamespace(Array("test2")) assert(v1SessionCatalog.getCurrentDatabase == "default") + + intercept[NoSuchNamespaceException] { + catalogManager.setCurrentNamespace(Array("ns1", "ns2")) + } } } -class DummyCatalog extends CatalogPlugin { +class DummyCatalog extends CatalogPlugin with SupportsNamespaces { override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { _name = name } private var _name: String = null override def name(): String = _name override def defaultNamespace(): Array[String] = Array("a", "b") + override def namespaceExists(namespace: Array[String]): Boolean = namespace match { + case Array("a") | Array("a", "b") | Array("test2") => true + case _ => false + } + // empty namespace functions + override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = Unit + override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = + Map.empty[String, String].asJava + override def createNamespace( + namespace: Array[String], + metadata: util.Map[String, String]): Unit = Unit + override def dropNamespace(namespace: Array[String]): Boolean = false + override def listNamespaces(): Array[Array[String]] = + Array[Array[String]]() + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = + Array[Array[String]]() } From 0623c2245e32d88bd88b2583d6284d33958f1f24 Mon Sep 17 00:00:00 2001 From: stczwd Date: Wed, 1 Jul 2020 20:52:55 +0800 Subject: [PATCH 2/5] enable setCurrentNamespace on Catalog which not implements SupportsNamespaces Change-Id: I8eb09649a8ce294c257bcba1313f89dfe57c912f --- .../sql/connector/catalog/CatalogManager.scala | 12 +++++++----- .../connector/catalog/CatalogManagerSuite.scala | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 77fed21f485b4..c6d21540f27d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -42,6 +42,7 @@ class CatalogManager( defaultSessionCatalog: CatalogPlugin, val v1SessionCatalog: SessionCatalog) extends Logging { import CatalogManager.SESSION_CATALOG_NAME + import CatalogV2Util._ private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] @@ -107,13 +108,14 @@ class CatalogManager( def setCurrentNamespace(namespace: Array[String]): Unit = synchronized { currentCatalog match { - case _ if currentCatalog.name() == SESSION_CATALOG_NAME && namespace.length == 1 => + case _ if isSessionCatalog(currentCatalog) && namespace.length == 1 => v1SessionCatalog.setCurrentDatabase(namespace.head) - case catalog: SupportsNamespaces if catalog.namespaceExists(namespace) => - logInfo(s"set current namespace to ${namespace.mkString(".")}") - _currentNamespace = Some(namespace) - case _ => + case _ if isSessionCatalog(currentCatalog) => + throw new NoSuchNamespaceException(namespace) + case catalog: SupportsNamespaces if !catalog.namespaceExists(namespace) => throw new NoSuchNamespaceException(namespace) + case _ => + _currentNamespace = Some(namespace) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala index c98a07ab9bb0f..92ce528110ce6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala @@ -112,21 +112,31 @@ class CatalogManagerSuite extends SparkFunSuite { catalogManager.setCurrentNamespace(Array("test2")) assert(v1SessionCatalog.getCurrentDatabase == "default") + // Check namespace existence if currentCatalog implements SupportsNamespaces. + conf.setConfString("spark.sql.catalog.dummyNamespace", classOf[DummyNameSpaceCatalog].getName) + catalogManager.setCurrentCatalog("dummyNamespace") + assert(v1SessionCatalog.getCurrentDatabase == "default") + catalogManager.setCurrentNamespace(Array("test3")) + assert(v1SessionCatalog.getCurrentDatabase == "default") + intercept[NoSuchNamespaceException] { catalogManager.setCurrentNamespace(Array("ns1", "ns2")) } } } -class DummyCatalog extends CatalogPlugin with SupportsNamespaces { +class DummyCatalog extends CatalogPlugin { override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { _name = name } private var _name: String = null override def name(): String = _name override def defaultNamespace(): Array[String] = Array("a", "b") +} + +class DummyNameSpaceCatalog extends DummyCatalog with SupportsNamespaces { override def namespaceExists(namespace: Array[String]): Boolean = namespace match { - case Array("a") | Array("a", "b") | Array("test2") => true + case Array("a") | Array("a", "b") | Array("test3") => true case _ => false } // empty namespace functions From 0b5536a3ce8f4ba3adc3a7513abb278ab712e6a5 Mon Sep 17 00:00:00 2001 From: stczwd Date: Wed, 1 Jul 2020 23:36:18 +0800 Subject: [PATCH 3/5] chagne suite Change-Id: I934a73eeba768abe3dd20d06edcb4931c89dd54e --- .../catalog/CatalogManagerSuite.scala | 27 ++++--------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala index 92ce528110ce6..7dd0753fcf777 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.connector.catalog import java.net.URI -import java.util import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog, NoSuchNamespaceException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.connector.InMemoryTableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -113,8 +113,10 @@ class CatalogManagerSuite extends SparkFunSuite { assert(v1SessionCatalog.getCurrentDatabase == "default") // Check namespace existence if currentCatalog implements SupportsNamespaces. - conf.setConfString("spark.sql.catalog.dummyNamespace", classOf[DummyNameSpaceCatalog].getName) - catalogManager.setCurrentCatalog("dummyNamespace") + conf.setConfString("spark.sql.catalog.testCatalog", classOf[InMemoryTableCatalog].getName) + catalogManager.setCurrentCatalog("testCatalog") + catalogManager.currentCatalog.asInstanceOf[InMemoryTableCatalog] + .createNamespace(Array("test3"), Map.empty[String, String].asJava) assert(v1SessionCatalog.getCurrentDatabase == "default") catalogManager.setCurrentNamespace(Array("test3")) assert(v1SessionCatalog.getCurrentDatabase == "default") @@ -133,22 +135,3 @@ class DummyCatalog extends CatalogPlugin { override def name(): String = _name override def defaultNamespace(): Array[String] = Array("a", "b") } - -class DummyNameSpaceCatalog extends DummyCatalog with SupportsNamespaces { - override def namespaceExists(namespace: Array[String]): Boolean = namespace match { - case Array("a") | Array("a", "b") | Array("test3") => true - case _ => false - } - // empty namespace functions - override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = Unit - override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = - Map.empty[String, String].asJava - override def createNamespace( - namespace: Array[String], - metadata: util.Map[String, String]): Unit = Unit - override def dropNamespace(namespace: Array[String]): Boolean = false - override def listNamespaces(): Array[Array[String]] = - Array[Array[String]]() - override def listNamespaces(namespace: Array[String]): Array[Array[String]] = - Array[Array[String]]() -} From 6fa00aae0938ecc449f4e72f62fdecece5c9dafc Mon Sep 17 00:00:00 2001 From: stczwd Date: Thu, 2 Jul 2020 08:20:56 +0800 Subject: [PATCH 4/5] fix test failed Change-Id: I77526c816650e368092e882e202e4a537a5b4720 --- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ba4200d84d46b..7aa92b5987b95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1394,6 +1394,7 @@ class DataSourceV2SQLSuite test("Use: v2 catalog is used and namespace does not exist") { // Namespaces are not required to exist for v2 catalogs. + sql("create namespace testcat.ns1.ns2") sql("USE testcat.ns1.ns2") val catalogManager = spark.sessionState.catalogManager assert(catalogManager.currentNamespace === Array("ns1", "ns2")) @@ -1418,6 +1419,7 @@ class DataSourceV2SQLSuite sql("USE testcat") testShowCurrentNamespace("testcat", "") + sql("create namespace testcat.ns1.ns2") sql("USE testcat.ns1.ns2") testShowCurrentNamespace("testcat", "ns1.ns2") } @@ -2257,6 +2259,7 @@ class DataSourceV2SQLSuite spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) val sessionCatalogName = CatalogManager.SESSION_CATALOG_NAME + sql("create namespace testcat.ns1.ns2") sql("USE testcat.ns1.ns2") sql("CREATE TABLE t USING foo AS SELECT 1 col") checkAnswer(spark.table("t"), Row(1)) From d888906993c1d8616463a53ba2f944e004e8c425 Mon Sep 17 00:00:00 2001 From: stczwd Date: Thu, 2 Jul 2020 12:14:17 +0800 Subject: [PATCH 5/5] add suites Change-Id: I3f7a2440621747024ec239a23c014a716b9f7bec --- .../sql/connector/DataSourceV2SQLSuite.scala | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 7aa92b5987b95..f1215ab051f28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1392,12 +1392,26 @@ class DataSourceV2SQLSuite assert(exception.getMessage.contains("Database 'ns1' not found")) } - test("Use: v2 catalog is used and namespace does not exist") { - // Namespaces are not required to exist for v2 catalogs. - sql("create namespace testcat.ns1.ns2") - sql("USE testcat.ns1.ns2") - val catalogManager = spark.sessionState.catalogManager - assert(catalogManager.currentNamespace === Array("ns1", "ns2")) + test("SPARK-31100: Use: v2 catalog that implements SupportsNamespaces is used " + + "and namespace not exists") { + // Namespaces are required to exist for v2 catalogs that implements SupportsNamespaces. + val exception = intercept[NoSuchNamespaceException] { + sql("USE testcat.ns1.ns2") + } + assert(exception.getMessage.contains("Namespace 'ns1.ns2' not found")) + } + + test("SPARK-31100: Use: v2 catalog that does not implement SupportsNameSpaces is used " + + "and namespace does not exist") { + // Namespaces are not required to exist for v2 catalogs + // that does not implement SupportsNamespaces. + withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) { + val catalogManager = spark.sessionState.catalogManager + + sql("USE dummy.ns1") + assert(catalogManager.currentCatalog.name() == "dummy") + assert(catalogManager.currentNamespace === Array("ns1")) + } } test("ShowCurrentNamespace: basic tests") { @@ -1419,7 +1433,8 @@ class DataSourceV2SQLSuite sql("USE testcat") testShowCurrentNamespace("testcat", "") - sql("create namespace testcat.ns1.ns2") + + sql("CREATE NAMESPACE testcat.ns1.ns2") sql("USE testcat.ns1.ns2") testShowCurrentNamespace("testcat", "ns1.ns2") } @@ -2259,7 +2274,7 @@ class DataSourceV2SQLSuite spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) val sessionCatalogName = CatalogManager.SESSION_CATALOG_NAME - sql("create namespace testcat.ns1.ns2") + sql("CREATE NAMESPACE testcat.ns1.ns2") sql("USE testcat.ns1.ns2") sql("CREATE TABLE t USING foo AS SELECT 1 col") checkAnswer(spark.table("t"), Row(1))