diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index fc54d89a1a4b3..27994ed76b2af 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -4098,14 +4098,13 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct c("name", "description", "dataType", "nullable", "isPartition", "isBucket")) expect_equal(collect(c)[[1]][[1]], "speed") expect_error(listColumns("zxwtyswklpf", "default"), - paste("Error in listColumns : analysis error - Table", - "'zxwtyswklpf' does not exist in database 'default'")) + paste("Table or view not found: spark_catalog.default.zxwtyswklpf")) f <- listFunctions() expect_true(nrow(f) >= 200) # 250 expect_equal(colnames(f), c("name", "catalog", "namespace", "description", "className", "isTemporary")) - expect_equal(take(orderBy(f, "className"), 1)$className, + expect_equal(take(orderBy(filter(f, "className IS NOT NULL"), "className"), 1)$className, "org.apache.spark.sql.catalyst.expressions.Abs") expect_error(listFunctions("zxwtyswklpf_db"), paste("Error in listFunctions : no such database - Database", diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 548750d712025..10c9ab5f6d239 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -164,7 +164,7 @@ def getDatabase(self, dbName: str) -> Database: Examples -------- >>> spark.catalog.getDatabase("default") - Database(name='default', catalog=None, description='default database', ... + Database(name='default', catalog='spark_catalog', description='default database', ... >>> spark.catalog.getDatabase("spark_catalog.default") Database(name='default', catalog='spark_catalog', description='default database', ... """ @@ -376,9 +376,9 @@ def getFunction(self, functionName: str) -> Function: -------- >>> func = spark.sql("CREATE FUNCTION my_func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") >>> spark.catalog.getFunction("my_func1") - Function(name='my_func1', catalog=None, namespace=['default'], ... + Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ... >>> spark.catalog.getFunction("default.my_func1") - Function(name='my_func1', catalog=None, namespace=['default'], ... + Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ... >>> spark.catalog.getFunction("spark_catalog.default.my_func1") Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ... >>> spark.catalog.getFunction("my_func2") diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py index 7d81234bce256..24cd67251a8f7 100644 --- a/python/pyspark/sql/tests/test_catalog.py +++ b/python/pyspark/sql/tests/test_catalog.py @@ -198,7 +198,7 @@ def test_list_functions(self): self.assertTrue("to_unix_timestamp" in functions) self.assertTrue("current_database" in functions) self.assertEqual(functions["+"].name, "+") - self.assertEqual(functions["+"].description, None) + self.assertEqual(functions["+"].description, "expr1 + expr2 - Returns `expr1`+`expr2`.") self.assertEqual( functions["+"].className, "org.apache.spark.sql.catalyst.expressions.Add" ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 16d89c9b2e48e..a0c98aac6c4ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -971,13 +971,17 @@ class SessionCatalog( } def lookupTempView(name: TableIdentifier): Option[View] = { - val tableName = formatTableName(name.table) - if (name.database.isEmpty) { - tempViews.get(tableName).map(getTempViewPlan) - } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { - globalTempViewManager.get(tableName).map(getTempViewPlan) - } else { - None + lookupLocalOrGlobalRawTempView(name.database.toSeq :+ name.table).map(getTempViewPlan) + } + + /** + * Return the raw logical plan of a temporary local or global view for the given name. + */ + def lookupLocalOrGlobalRawTempView(name: Seq[String]): Option[TemporaryViewRelation] = { + name match { + case Seq(v) => getRawTempView(v) + case Seq(db, v) if isGlobalTempViewDB(db) => getRawGlobalTempView(v) + case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 29b35229e9753..82ac8fd604994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -33,28 +33,28 @@ import org.apache.spark.storage.StorageLevel abstract class Catalog { /** - * Returns the current default database in this session. + * Returns the current database (namespace) in this session. * * @since 2.0.0 */ def currentDatabase: String /** - * Sets the current default database in this session. + * Sets the current database (namespace) in this session. * * @since 2.0.0 */ def setCurrentDatabase(dbName: String): Unit /** - * Returns a list of databases available across all sessions. + * Returns a list of databases (namespaces) available within the current catalog. * * @since 2.0.0 */ def listDatabases(): Dataset[Database] /** - * Returns a list of tables/views in the current database. + * Returns a list of tables/views in the current database (namespace). * This includes all temporary views. * * @since 2.0.0 @@ -62,7 +62,8 @@ abstract class Catalog { def listTables(): Dataset[Table] /** - * Returns a list of tables/views in the specified database. + * Returns a list of tables/views in the specified database (namespace) (the name can be qualified + * with catalog). * This includes all temporary views. * * @since 2.0.0 @@ -71,16 +72,17 @@ abstract class Catalog { def listTables(dbName: String): Dataset[Table] /** - * Returns a list of functions registered in the current database. - * This includes all temporary functions + * Returns a list of functions registered in the current database (namespace). + * This includes all temporary functions. * * @since 2.0.0 */ def listFunctions(): Dataset[Function] /** - * Returns a list of functions registered in the specified database. - * This includes all temporary functions + * Returns a list of functions registered in the specified database (namespace) (the name can be + * qualified with catalog). + * This includes all built-in and temporary functions. * * @since 2.0.0 */ @@ -90,21 +92,22 @@ abstract class Catalog { /** * Returns a list of columns for the given table/view or temporary view. * - * @param tableName is either a qualified or unqualified name that designates a table/view. - * If no database identifier is provided, it refers to a temporary view or - * a table/view in the current database. + * @param tableName is either a qualified or unqualified name that designates a table/view. It + * follows the same resolution rule with SQL: search for temp views first then + * table/views in the current database (namespace). * @since 2.0.0 */ @throws[AnalysisException]("table does not exist") def listColumns(tableName: String): Dataset[Column] /** - * Returns a list of columns for the given table/view in the specified database. + * Returns a list of columns for the given table/view in the specified database under the Hive + * Metastore. * - * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace, - * use listColumns(tableName) instead. + * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with + * qualified table/view name instead. * - * @param dbName is a name that designates a database. + * @param dbName is an unqualified name that designates a database. * @param tableName is an unqualified name that designates a table/view. * @since 2.0.0 */ @@ -112,8 +115,8 @@ abstract class Catalog { def listColumns(dbName: String, tableName: String): Dataset[Column] /** - * Get the database with the specified name. This throws an AnalysisException when the database - * cannot be found. + * Get the database (namespace) with the specified name (can be qualified with catalog). This + * throws an AnalysisException when the database (namespace) cannot be found. * * @since 2.1.0 */ @@ -124,20 +127,20 @@ abstract class Catalog { * Get the table or view with the specified name. This table can be a temporary view or a * table/view. This throws an AnalysisException when no Table can be found. * - * @param tableName is either a qualified or unqualified name that designates a table/view. - * If no database identifier is provided, it refers to a table/view in - * the current database. + * @param tableName is either a qualified or unqualified name that designates a table/view. It + * follows the same resolution rule with SQL: search for temp views first then + * table/views in the current database (namespace). * @since 2.1.0 */ @throws[AnalysisException]("table does not exist") def getTable(tableName: String): Table /** - * Get the table or view with the specified name in the specified database. This throws an - * AnalysisException when no Table can be found. + * Get the table or view with the specified name in the specified database under the Hive + * Metastore. This throws an AnalysisException when no Table can be found. * - * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace, - * use getTable(tableName) instead. + * To get table/view in other catalogs, please use `getTable(tableName)` with qualified table/view + * name instead. * * @since 2.1.0 */ @@ -148,22 +151,22 @@ abstract class Catalog { * Get the function with the specified name. This function can be a temporary function or a * function. This throws an AnalysisException when the function cannot be found. * - * @param functionName is either a qualified or unqualified name that designates a function. - * If no database identifier is provided, it refers to a temporary function - * or a function in the current database. + * @param functionName is either a qualified or unqualified name that designates a function. It + * follows the same resolution rule with SQL: search for built-in/temp + * functions first then functions in the current database (namespace). * @since 2.1.0 */ @throws[AnalysisException]("function does not exist") def getFunction(functionName: String): Function /** - * Get the function with the specified name. This throws an AnalysisException when the function - * cannot be found. + * Get the function with the specified name in the specified database under the Hive Metastore. + * This throws an AnalysisException when the function cannot be found. * - * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace, - * use getFunction(functionName) instead. + * To get functions in other catalogs, please use `getFunction(functionName)` with qualified + * function name instead. * - * @param dbName is a name that designates a database. + * @param dbName is an unqualified name that designates a database. * @param functionName is an unqualified name that designates a function in the specified database * @since 2.1.0 */ @@ -171,7 +174,8 @@ abstract class Catalog { def getFunction(dbName: String, functionName: String): Function /** - * Check if the database with the specified name exists. + * Check if the database (namespace) with the specified name exists (the name can be qualified + * with catalog). * * @since 2.1.0 */ @@ -181,20 +185,21 @@ abstract class Catalog { * Check if the table or view with the specified name exists. This can either be a temporary * view or a table/view. * - * @param tableName is either a qualified or unqualified name that designates a table/view. - * If no database identifier is provided, it refers to a table/view in - * the current database. + * @param tableName is either a qualified or unqualified name that designates a table/view. It + * follows the same resolution rule with SQL: search for temp views first then + * table/views in the current database (namespace). * @since 2.1.0 */ def tableExists(tableName: String): Boolean /** - * Check if the table or view with the specified name exists in the specified database. + * Check if the table or view with the specified name exists in the specified database under the + * Hive Metastore. * - * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace, - * use tableExists(tableName) instead. + * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with + * qualified table/view name instead. * - * @param dbName is a name that designates a database. + * @param dbName is an unqualified name that designates a database. * @param tableName is an unqualified name that designates a table. * @since 2.1.0 */ @@ -204,20 +209,21 @@ abstract class Catalog { * Check if the function with the specified name exists. This can either be a temporary function * or a function. * - * @param functionName is either a qualified or unqualified name that designates a function. - * If no database identifier is provided, it refers to a function in - * the current database. + * @param functionName is either a qualified or unqualified name that designates a function. It + * follows the same resolution rule with SQL: search for built-in/temp + * functions first then functions in the current database (namespace). * @since 2.1.0 */ def functionExists(functionName: String): Boolean /** - * Check if the function with the specified name exists in the specified database. + * Check if the function with the specified name exists in the specified database under the + * Hive Metastore. * - * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace, - * use functionExists(functionName) instead. + * To check existence of functions in other catalogs, please use `functionExists(functionName)` + * with qualified function name instead. * - * @param dbName is a name that designates a database. + * @param dbName is an unqualified name that designates a database. * @param functionName is an unqualified name that designates a function. * @since 2.1.0 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index e11b349777e8f..657ed87e609e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -23,14 +23,14 @@ import scala.util.control.NonFatal import org.apache.spark.sql._ import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table} import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, SubqueryAlias, TableSpec, View} -import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, SupportsNamespaces, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, MultipartIdentifierHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.connector.V1Function @@ -45,15 +45,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog - private def requireDatabaseExists(dbName: String): Unit = { - if (!sessionCatalog.databaseExists(dbName)) { - throw QueryCompilationErrors.databaseDoesNotExistError(dbName) - } + private def parseIdent(name: String): Seq[String] = { + sparkSession.sessionState.sqlParser.parseMultipartIdentifier(name) } - private def requireTableExists(dbName: String, tableName: String): Unit = { - if (!sessionCatalog.tableExists(TableIdentifier(tableName, Some(dbName)))) { - throw QueryCompilationErrors.tableDoesNotExistInDatabaseError(tableName, dbName) + private def qualifyV1Ident(nameParts: Seq[String]): Seq[String] = { + assert(nameParts.length == 1 || nameParts.length == 2) + if (nameParts.length == 1) { + Seq(CatalogManager.SESSION_CATALOG_NAME, sessionCatalog.getCurrentDatabase) ++ nameParts + } else { + CatalogManager.SESSION_CATALOG_NAME +: nameParts } } @@ -68,32 +69,27 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("database does not exist") override def setCurrentDatabase(dbName: String): Unit = { - // we assume dbName will not include the catalog prefix. e.g. if you call - // setCurrentDatabase("catalog.db") it will search for a database catalog.db in the catalog. - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) - sparkSession.sessionState.catalogManager.setCurrentNamespace(ident.toArray) + // we assume `dbName` will not include the catalog name. e.g. if you call + // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current + // catalog. + sparkSession.sessionState.catalogManager.setCurrentNamespace(parseIdent(dbName).toArray) } /** * Returns a list of databases available across all sessions. */ override def listDatabases(): Dataset[Database] = { - val catalog = currentCatalog() - val plan = ShowNamespaces(UnresolvedNamespace(Seq(catalog)), None) - val databases = sparkSession.sessionState.executePlan(plan).toRdd.collect() - .map(row => catalog + "." + row.getString(0)) - .map(getDatabase) + val plan = ShowNamespaces(UnresolvedNamespace(Nil), None) + val qe = sparkSession.sessionState.executePlan(plan) + val catalog = qe.analyzed.collectFirst { + case ShowNamespaces(r: ResolvedNamespace, _, _) => r.catalog + }.get + val databases = qe.toRdd.collect().map { row => + getNamespace(catalog, parseIdent(row.getString(0))) + } CatalogImpl.makeDataset(databases, sparkSession) } - private def makeDatabase(dbName: String): Database = { - val metadata = sessionCatalog.getDatabaseMetadata(dbName) - new Database( - name = metadata.name, - description = metadata.description, - locationUri = CatalogUtils.URIToString(metadata.locationUri)) - } - /** * Returns a list of tables in the current database. * This includes all temporary tables. @@ -110,74 +106,93 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { override def listTables(dbName: String): Dataset[Table] = { // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or // a qualified namespace with catalog name. We assume it's a single database name - // and check if we can find the dbName in sessionCatalog. If so we listTables under - // that database. Otherwise we try 3-part name parsing and locate the database. - if (sessionCatalog.databaseExists(dbName) || sessionCatalog.isGlobalTempViewDB(dbName)) { - val tables = sessionCatalog.listTables(dbName).map(makeTable) - CatalogImpl.makeDataset(tables, sparkSession) + // and check if we can find it in the sessionCatalog. If so we list tables under + // that database. Otherwise we will resolve the catalog/namespace and list tables there. + val namespace = if (sessionCatalog.databaseExists(dbName)) { + Seq(CatalogManager.SESSION_CATALOG_NAME, dbName) } else { - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) - val plan = ShowTables(UnresolvedNamespace(ident), None) - val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect() - val tables = ret - .map(row => ident ++ Seq(row.getString(1))) - .map(makeTable) - CatalogImpl.makeDataset(tables, sparkSession) + parseIdent(dbName) } - } + val plan = ShowTables(UnresolvedNamespace(namespace), None) + val qe = sparkSession.sessionState.executePlan(plan) + val catalog = qe.analyzed.collectFirst { + case ShowTables(r: ResolvedNamespace, _, _) => r.catalog + case _: ShowTablesCommand => + sparkSession.sessionState.catalogManager.v2SessionCatalog + }.get + val tables = qe.toRdd.collect().map { row => + val tableName = row.getString(1) + val namespaceName = row.getString(0) + val isTemp = row.getBoolean(2) + if (isTemp) { + // Temp views do not belong to any catalog. We shouldn't prepend the catalog name here. + val ns = if (namespaceName.isEmpty) Nil else Seq(namespaceName) + makeTable(ns :+ tableName) + } else { + val ns = parseIdent(namespaceName) + makeTable(catalog.name() +: ns :+ tableName) + } + } + CatalogImpl.makeDataset(tables, sparkSession) + } + + private def makeTable(nameParts: Seq[String]): Table = { + sessionCatalog.lookupLocalOrGlobalRawTempView(nameParts).map { tempView => + new Table( + name = tempView.tableMeta.identifier.table, + catalog = null, + namespace = tempView.tableMeta.identifier.database.toArray, + description = tempView.tableMeta.comment.orNull, + tableType = "TEMPORARY", + isTemporary = true) + }.getOrElse { + val plan = UnresolvedIdentifier(nameParts) + sparkSession.sessionState.executePlan(plan).analyzed match { + case ResolvedIdentifier(catalog: TableCatalog, ident) => + val tableOpt = try { + loadTable(catalog, ident) + } catch { + // Even if the table exits, error may still happen. For example, Spark can't read Hive's + // index table. We return a Table without description and tableType in this case. + case NonFatal(_) => + Some(new Table( + name = ident.name(), + catalog = catalog.name(), + namespace = ident.namespace(), + description = null, + tableType = null, + isTemporary = false)) + } + tableOpt.getOrElse(throw QueryCompilationErrors.tableOrViewNotFound(nameParts)) - /** - * Returns a Table for the given table/view or temporary view. - * - * Note that this function requires the table already exists in the Catalog. - * - * If the table metadata retrieval failed due to any reason (e.g., table serde class - * is not accessible or the table type is not accepted by Spark SQL), this function - * still returns the corresponding Table without the description and tableType) - */ - private def makeTable(tableIdent: TableIdentifier): Table = { - val metadata = try { - Some(sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)) - } catch { - case NonFatal(_) => None + case _ => throw QueryCompilationErrors.tableOrViewNotFound(nameParts) + } } - val isTemp = sessionCatalog.isTempView(tableIdent) - val qualifier = - metadata.map(_.identifier.database).getOrElse(tableIdent.database).map(Array(_)).orNull - new Table( - name = tableIdent.table, - catalog = CatalogManager.SESSION_CATALOG_NAME, - namespace = qualifier, - description = metadata.map(_.comment.orNull).orNull, - tableType = if (isTemp) "TEMPORARY" else metadata.map(_.tableType.name).orNull, - isTemporary = isTemp) - } - - private def makeTable(ident: Seq[String]): Table = { - val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true) - val node = sparkSession.sessionState.executePlan(plan).analyzed - node match { - case t: ResolvedTable => - val isExternal = t.table.properties().getOrDefault( + } + + private def loadTable(catalog: TableCatalog, ident: Identifier): Option[Table] = { + // TODO: support v2 view when it gets implemented. + CatalogV2Util.loadTable(catalog, ident).map { + case v1: V1Table if v1.v1Table.tableType == CatalogTableType.VIEW => + new Table( + name = v1.v1Table.identifier.table, + catalog = catalog.name(), + namespace = v1.v1Table.identifier.database.toArray, + description = v1.v1Table.comment.orNull, + tableType = "VIEW", + isTemporary = false) + case t: V2Table => + val isExternal = t.properties().getOrDefault( TableCatalog.PROP_EXTERNAL, "false").equals("true") new Table( - name = t.identifier.name(), - catalog = t.catalog.name(), - namespace = t.identifier.namespace(), - description = t.table.properties().get("comment"), + name = ident.name(), + catalog = catalog.name(), + namespace = ident.namespace(), + description = t.properties().get("comment"), tableType = if (isExternal) CatalogTableType.EXTERNAL.name else CatalogTableType.MANAGED.name, isTemporary = false) - case v: ResolvedView => - new Table( - name = v.identifier.name(), - catalog = null, - namespace = v.identifier.namespace(), - description = null, - tableType = if (v.isTemp) "TEMPORARY" else "VIEW", - isTemporary = v.isTemp) - case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident) } } @@ -197,48 +212,37 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { override def listFunctions(dbName: String): Dataset[Function] = { // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or // a qualified namespace with catalog name. We assume it's a single database name - // and check if we can find the dbName in sessionCatalog. If so we listFunctions under - // that database. Otherwise we try 3-part name parsing and locate the database. - if (sessionCatalog.databaseExists(dbName)) { - val functions = sessionCatalog.listFunctions(dbName) - .map { case (functIdent, _) => makeFunction(functIdent) } - CatalogImpl.makeDataset(functions, sparkSession) + // and check if we can find it in the sessionCatalog. If so we list functions under + // that database. Otherwise we will resolve the catalog/namespace and list functions there. + val namespace = if (sessionCatalog.databaseExists(dbName)) { + Seq(CatalogManager.SESSION_CATALOG_NAME, dbName) } else { - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) - val functions = collection.mutable.ArrayBuilder.make[Function] - - // built-in functions - val plan0 = ShowFunctions(UnresolvedNamespace(ident), - userScope = false, systemScope = true, None) - sparkSession.sessionState.executePlan(plan0).toRdd.collect().foreach { row => - // `lookupBuiltinOrTempFunction` and `lookupBuiltinOrTempTableFunction` in Analyzer - // require the input identifier only contains the function name, otherwise, built-in - // functions will be skipped. - val name = row.getString(0) - functions += makeFunction(Seq(name)) - } - - // user functions - val plan1 = ShowFunctions(UnresolvedNamespace(ident), - userScope = true, systemScope = false, None) - sparkSession.sessionState.executePlan(plan1).toRdd.collect().foreach { row => - // `row.getString(0)` may contain dbName like `db.function`, so extract the function name. - val name = row.getString(0).split("\\.").last - functions += makeFunction(ident :+ name) - } + parseIdent(dbName) + } + val functions = collection.mutable.ArrayBuilder.make[Function] + + // TODO: The SHOW FUNCTIONS should tell us the function type (built-in, temp, persistent) and + // we can simply the code below quite a bit. For now we need to list built-in functions + // separately as several built-in function names are not parsable, such as `!=`. + + // List built-in functions. We don't need to specify the namespace here as SHOW FUNCTIONS with + // only system scope does not need to know the catalog and namespace. + val plan0 = ShowFunctions(UnresolvedNamespace(Nil), userScope = false, systemScope = true, None) + sparkSession.sessionState.executePlan(plan0).toRdd.collect().foreach { row => + // Built-in functions do not belong to any catalog or namespace. We can only look it up with + // a single part name. + val name = row.getString(0) + functions += makeFunction(Seq(name)) + } - CatalogImpl.makeDataset(functions.result(), sparkSession) + // List user functions. + val plan1 = ShowFunctions(UnresolvedNamespace(namespace), + userScope = true, systemScope = false, None) + sparkSession.sessionState.executePlan(plan1).toRdd.collect().foreach { row => + functions += makeFunction(parseIdent(row.getString(0))) } - } - private def makeFunction(funcIdent: FunctionIdentifier): Function = { - val metadata = sessionCatalog.lookupFunctionInfo(funcIdent) - new Function( - name = metadata.getName, - database = metadata.getDb, - description = null, // for now, this is always undefined - className = metadata.getClassName, - isTemporary = metadata.getDb == null) + CatalogImpl.makeDataset(functions.result(), sparkSession) } private def makeFunction(ident: Seq[String]): Function = { @@ -279,23 +283,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("table does not exist") override def listColumns(tableName: String): Dataset[Column] = { - // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name - // and optionally contains a database name(thus a TableIdentifier), then we look up the table in - // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of - // string as the qualified identifier and resolve the table through SQL analyzer. - try { - val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - if (tableExists(ident.database.orNull, ident.table)) { - listColumns(ident) - } else { - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - listColumns(ident) - } - } catch { - case e: org.apache.spark.sql.catalyst.parser.ParseException => - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - listColumns(ident) + val parsed = parseIdent(tableName) + // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in + // the Hive Metastore first. + val nameParts = if (parsed.length <= 2 && !sessionCatalog.isTempView(parsed) && + sessionCatalog.tableExists(parsed.asTableIdentifier)) { + qualifyV1Ident(parsed) + } else { + parsed } + listColumns(nameParts) } /** @@ -303,25 +300,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("database or table does not exist") override def listColumns(dbName: String, tableName: String): Dataset[Column] = { - requireTableExists(dbName, tableName) - listColumns(TableIdentifier(tableName, Some(dbName))) - } - - private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = { - val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier) - - val partitionColumnNames = tableMetadata.partitionColumnNames.toSet - val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet - val columns = tableMetadata.schema.map { c => - new Column( - name = c.name, - description = c.getComment().orNull, - dataType = CharVarcharUtils.getRawType(c.metadata).getOrElse(c.dataType).catalogString, - nullable = c.nullable, - isPartition = partitionColumnNames.contains(c.name), - isBucket = bucketColumnNames.contains(c.name)) - } - CatalogImpl.makeDataset(columns, sparkSession) + // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the + // Hive Metastore. + listColumns(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName)) } private def listColumns(ident: Seq[String]): Dataset[Column] = { @@ -361,41 +342,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { CatalogImpl.makeDataset(columns, sparkSession) } + private def getNamespace(catalog: CatalogPlugin, ns: Seq[String]): Database = catalog match { + case catalog: SupportsNamespaces => + val metadata = catalog.loadNamespaceMetadata(ns.toArray) + new Database( + name = ns.quoted, + catalog = catalog.name, + description = metadata.get(SupportsNamespaces.PROP_COMMENT), + locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION)) + // If the catalog doesn't support namespaces, we assume it's an implicit namespace, which always + // exists but has no metadata. + case catalog: CatalogPlugin => + new Database( + name = ns.quoted, + catalog = catalog.name, + description = null, + locationUri = null) + case _ => new Database(name = ns.quoted, description = null, locationUri = null) + } /** * Gets the database with the specified name. This throws an `AnalysisException` when no * `Database` can be found. */ override def getDatabase(dbName: String): Database = { - // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or a - // qualified namespace with catalog name. To maintain backwards compatibility, we first assume - // it's a single database name and return the database from sessionCatalog if it exists. - // Otherwise we try 3-part name parsing and locate the database. If the parased identifier - // contains both catalog name and database name, we then search the database in the catalog. - if (sessionCatalog.databaseExists(dbName) || sessionCatalog.isGlobalTempViewDB(dbName)) { - makeDatabase(dbName) + // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or + // a qualified namespace with catalog name. We assume it's a single database name + // and check if we can find it in the sessionCatalog. Otherwise we will parse `dbName` and + // resolve catalog/namespace with it. + val namespace = if (sessionCatalog.databaseExists(dbName)) { + Seq(CatalogManager.SESSION_CATALOG_NAME, dbName) } else { - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) - val plan = UnresolvedNamespace(ident) - val resolved = sparkSession.sessionState.executePlan(plan).analyzed - resolved match { - case ResolvedNamespace(catalog: SupportsNamespaces, namespace) => - val metadata = catalog.loadNamespaceMetadata(namespace.toArray) - new Database( - name = namespace.quoted, - catalog = catalog.name, - description = metadata.get(SupportsNamespaces.PROP_COMMENT), - locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION)) - // similar to databaseExists: if the catalog doesn't support namespaces, we assume it's an - // implicit namespace, which exists but has no metadata. - case ResolvedNamespace(catalog: CatalogPlugin, namespace) => - new Database( - name = namespace.quoted, - catalog = catalog.name, - description = null, - locationUri = null) - case _ => new Database(name = dbName, description = null, locationUri = null) - } + sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) + } + val plan = UnresolvedNamespace(namespace) + sparkSession.sessionState.executePlan(plan).analyzed match { + case ResolvedNamespace(catalog, namespace) => + getNamespace(catalog, namespace) + case _ => new Database(name = dbName, description = null, locationUri = null) } } @@ -404,26 +388,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * table/view. This throws an `AnalysisException` when no `Table` can be found. */ override def getTable(tableName: String): Table = { - // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name - // and optionally contains a database name(thus a TableIdentifier), then we look up the table in - // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of - // string as the qualified identifier and resolve the table through SQL analyzer. - try { - val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - if (tableExists(ident.database.orNull, ident.table)) { - makeTable(ident) - } else { - getTable3LNamespace(tableName) - } - } catch { - case e: org.apache.spark.sql.catalyst.parser.ParseException => - getTable3LNamespace(tableName) + val parsed = parseIdent(tableName) + // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in + // the Hive Metastore first. + val nameParts = if (parsed.length <= 2 && !sessionCatalog.isTempView(parsed) && + sessionCatalog.tableExists(parsed.asTableIdentifier)) { + qualifyV1Ident(parsed) + } else { + parsed } - } - - private def getTable3LNamespace(tableName: String): Table = { - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - makeTable(ident) + makeTable(nameParts) } /** @@ -431,10 +405,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * `AnalysisException` when no `Table` can be found. */ override def getTable(dbName: String, tableName: String): Table = { - if (tableExists(dbName, tableName)) { - makeTable(TableIdentifier(tableName, Option(dbName))) + if (sessionCatalog.isGlobalTempViewDB(dbName)) { + makeTable(Seq(dbName, tableName)) } else { - throw QueryCompilationErrors.tableOrViewNotFoundInDatabaseError(tableName, dbName) + // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the + // Hive Metastore. + makeTable(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName)) } } @@ -443,19 +419,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * function. This throws an `AnalysisException` when no `Function` can be found. */ override def getFunction(functionName: String): Function = { - // calling `sqlParser.parseFunctionIdentifier` to parse functionName. If it contains only - // function name and optionally contains a database name(thus a FunctionIdentifier), then - // we look up the function in sessionCatalog. - // Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of string as - // the qualified identifier and resolve the function through SQL analyzer. - try { - val ident = sparkSession.sessionState.sqlParser.parseFunctionIdentifier(functionName) - getFunction(ident.database.orNull, ident.funcName) - } catch { - case e: org.apache.spark.sql.catalyst.parser.ParseException => - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(functionName) - makeFunction(ident) + val parsed = parseIdent(functionName) + // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in + // the Hive Metastore first. + val nameParts = if (parsed.length <= 2 && + !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) && + sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) { + qualifyV1Ident(parsed) + } else { + parsed } + makeFunction(nameParts) } /** @@ -463,7 +437,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * found. */ override def getFunction(dbName: String, functionName: String): Function = { - makeFunction(FunctionIdentifier(functionName, Option(dbName))) + // For backward compatibility (Spark 3.3 and prior), here we always look up the function from + // the Hive Metastore. + makeFunction(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, functionName)) } /** @@ -471,15 +447,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ override def databaseExists(dbName: String): Boolean = { // To maintain backwards compatibility, we first treat the input is a simple dbName and check - // if sessionCatalog contains it. If no, we try to parse it as 3 part name. If the parased - // identifier contains both catalog name and database name, we then search the database in the - // catalog. + // if sessionCatalog contains it. If no, we try to parse it, resolve catalog and namespace, + // and check if namespace exists in the catalog. if (!sessionCatalog.databaseExists(dbName)) { - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) - val plan = sparkSession.sessionState.executePlan(UnresolvedNamespace(ident)).analyzed - plan match { - case ResolvedNamespace(catalog: SupportsNamespaces, _) => - catalog.namespaceExists(ident.slice(1, ident.size).toArray) + val plan = UnresolvedNamespace(parseIdent(dbName)) + sparkSession.sessionState.executePlan(plan).analyzed match { + case ResolvedNamespace(catalog: SupportsNamespaces, ns) => + catalog.namespaceExists(ns.toArray) case _ => true } } else { @@ -492,11 +466,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * view or a table/view. */ override def tableExists(tableName: String): Boolean = { - try { - getTable(tableName) - true - } catch { - case e: AnalysisException => false + val parsed = parseIdent(tableName) + // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in + // the Hive Metastore first. This also checks if it's a temp view. + (parsed.length <= 2 && { + val v1Ident = parsed.asTableIdentifier + sessionCatalog.isTempView(v1Ident) || sessionCatalog.tableExists(v1Ident) + }) || { + val plan = UnresolvedIdentifier(parsed) + sparkSession.sessionState.executePlan(plan).analyzed match { + case ResolvedIdentifier(catalog: TableCatalog, ident) => catalog.tableExists(ident) + case _ => false + } } } @@ -513,22 +494,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * or a function. */ override def functionExists(functionName: String): Boolean = { - try { - val ident = sparkSession.sessionState.sqlParser.parseFunctionIdentifier(functionName) - functionExists(ident.database.orNull, ident.funcName) - } catch { - case e: org.apache.spark.sql.catalyst.parser.ParseException => - try { - val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(functionName) - val plan = UnresolvedFunc(ident, "Catalog.functionExists", false, None) - sparkSession.sessionState.executePlan(plan).analyzed match { - case _: ResolvedPersistentFunc => true - case _: ResolvedNonPersistentFunc => true - case _ => false - } - } catch { - case _: org.apache.spark.sql.AnalysisException => false - } + val parsed = parseIdent(functionName) + // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in + // the Hive Metastore first. This also checks if it's a built-in/temp function. + (parsed.length <= 2 && sessionCatalog.functionExists(parsed.asFunctionIdentifier)) || { + val plan = UnresolvedIdentifier(parsed) + sparkSession.sessionState.executePlan(plan).analyzed match { + case ResolvedIdentifier(catalog: FunctionCatalog, ident) => catalog.functionExists(ident) + case _ => false + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala index 8a635807abbbb..1a9baecfa747a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalog.Table import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint} -import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -166,7 +165,7 @@ class GlobalTempViewSuite extends QueryTest with SharedSparkSession { assert(spark.catalog.tableExists(globalTempDB, "src")) assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table( name = "src", - catalog = CatalogManager.SESSION_CATALOG_NAME, + catalog = null, namespace = Array(globalTempDB), description = null, tableType = "TEMPORARY", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 0de48325d981e..ab26a4fcc35f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -182,6 +182,31 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table2")) } + test("SPARK-39828: Catalog.listTables() should respect currentCatalog") { + assert(spark.catalog.currentCatalog() == "spark_catalog") + assert(spark.catalog.listTables().collect().isEmpty) + createTable("my_table1") + assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table1")) + + val catalogName = "testcat" + val dbName = "my_db" + val tableName = "my_table2" + val tableSchema = new StructType().add("i", "int") + val description = "this is a test managed table" + sql(s"CREATE NAMESPACE $catalogName.$dbName") + + spark.catalog.setCurrentCatalog("testcat") + spark.catalog.setCurrentDatabase("my_db") + assert(spark.catalog.listTables().collect().isEmpty) + + createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema, + Map.empty[String, String], description) + assert(spark.catalog.listTables() + .collect() + .map(t => Array(t.catalog, t.namespace.mkString("."), t.name).mkString(".")).toSet == + Set("testcat.my_db.my_table2")) + } + test("list tables with database") { assert(spark.catalog.listTables("default").collect().isEmpty) createDatabase("my_db1") @@ -229,6 +254,33 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(!funcNames2.contains("my_temp_func")) } + test("SPARK-39828: Catalog.listFunctions() should respect currentCatalog") { + assert(spark.catalog.currentCatalog() == "spark_catalog") + assert(Set("+", "current_database", "window").subsetOf( + spark.catalog.listFunctions().collect().map(_.name).toSet)) + createFunction("my_func") + assert(spark.catalog.listFunctions().collect().map(_.name).contains("my_func")) + + sql(s"CREATE NAMESPACE testcat.ns") + spark.catalog.setCurrentCatalog("testcat") + spark.catalog.setCurrentDatabase("ns") + + val funcCatalog = spark.sessionState.catalogManager.catalog("testcat") + .asInstanceOf[InMemoryCatalog] + val function: UnboundFunction = new UnboundFunction { + override def bind(inputType: StructType): BoundFunction = new ScalarFunction[Int] { + override def inputTypes(): Array[DataType] = Array(IntegerType) + override def resultType(): DataType = IntegerType + override def name(): String = "my_bound_function" + } + override def description(): String = "my_function" + override def name(): String = "my_function" + } + assert(!spark.catalog.listFunctions().collect().map(_.name).contains("my_func")) + funcCatalog.createFunction(Identifier.of(Array("ns"), "my_func"), function) + assert(spark.catalog.listFunctions().collect().map(_.name).contains("my_func")) + } + test("list functions with database") { assert(Set("+", "current_database", "window").subsetOf( spark.catalog.listFunctions().collect().map(_.name).toSet)) @@ -283,7 +335,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf testListColumns("tab1", dbName = Some("db1")) } - test("SPARK-39615: three layer namespace compatibility - listColumns") { + test("SPARK-39615: qualified name with catalog - listColumns") { val answers = Map( "col1" -> ("int", true, false, true), "col2" -> ("string", true, false, false), @@ -637,7 +689,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(errMsg.contains("my_temp_table is a temp view. 'recoverPartitions()' expects a table")) } - test("three layer namespace compatibility - create managed table") { + test("qualified name with catalog - create managed table") { val catalogName = "testcat" val dbName = "my_db" val tableName = "my_table" @@ -656,7 +708,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(table.properties().get("comment").equals(description)) } - test("three layer namespace compatibility - create external table") { + test("qualified name with catalog - create external table") { withTempDir { dir => val catalogName = "testcat" val dbName = "my_db" @@ -680,7 +732,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf } } - test("three layer namespace compatibility - list tables") { + test("qualified name with catalog - list tables") { withTempDir { dir => val catalogName = "testcat" val dbName = "my_db" @@ -729,7 +781,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf Set("my_table1", "my_table2", "my_temp_table")) } - test("three layer namespace compatibility - get table") { + test("qualified name with catalog - get table") { val catalogName = "testcat" val dbName = "default" val tableName = "my_table" @@ -757,7 +809,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(t2.catalog == CatalogManager.SESSION_CATALOG_NAME) } - test("three layer namespace compatibility - table exists") { + test("qualified name with catalog - table exists") { val catalogName = "testcat" val dbName = "my_db" val tableName = "my_table" @@ -781,7 +833,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString("."))) } - test("three layer namespace compatibility - database exists") { + test("qualified name with catalog - database exists") { val catalogName = "testcat" val dbName = "my_db" assert(!spark.catalog.databaseExists(Array(catalogName, dbName).mkString("."))) @@ -793,7 +845,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(!spark.catalog.databaseExists(Array(catalogName2, dbName).mkString("."))) } - test("SPARK-39506: three layer namespace compatibility - cache table, isCached and" + + test("SPARK-39506: qualified name with catalog - cache table, isCached and" + "uncacheTable") { val tableSchema = new StructType().add("i", "int") createTable("my_table", "my_db", "testcat", classOf[FakeV2Provider].getName, @@ -840,7 +892,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf } } - test("three layer namespace compatibility - get database") { + test("qualified name with catalogy - get database") { val catalogsAndDatabases = Seq(("testcat", "somedb"), ("testcat", "ns.somedb"), ("spark_catalog", "somedb")) catalogsAndDatabases.foreach { case (catalog, dbName) => @@ -863,7 +915,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf intercept[AnalysisException](spark.catalog.getDatabase("randomcat.db10")) } - test("three layer namespace compatibility - get database, same in hive and testcat") { + test("qualified name with catalog - get database, same in hive and testcat") { // create 'testdb' in hive and testcat val dbName = "testdb" sql(s"CREATE NAMESPACE spark_catalog.$dbName COMMENT 'hive database'") @@ -883,7 +935,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(spark.catalog.getDatabase(qualified).name === db) } - test("three layer namespace compatibility - set current database") { + test("qualified name with catalog - set current database") { spark.catalog.setCurrentCatalog("testcat") // namespace with the same name as catalog sql("CREATE NAMESPACE testcat.testcat.my_db") @@ -912,8 +964,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(e3.contains("unknown_db")) } - test("SPARK-39579: Three layer namespace compatibility - " + - "listFunctions, getFunction, functionExists") { + test("SPARK-39579: qualified name with catalog - listFunctions, getFunction, functionExists") { createDatabase("my_db1") createFunction("my_func1", Some("my_db1")) @@ -931,8 +982,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf val func1b = spark.catalog.getFunction("spark_catalog.my_db1.my_func1") assert(func1a.name === func1b.name && func1a.namespace === func1b.namespace && func1a.className === func1b.className && func1a.isTemporary === func1b.isTemporary) - assert(func1a.catalog === null && func1b.catalog === "spark_catalog") - assert(func1a.description === null && func1b.description === "N/A.") + assert(func1a.catalog === "spark_catalog" && func1b.catalog === "spark_catalog") + assert(func1a.description === "N/A." && func1b.description === "N/A.") val function: UnboundFunction = new UnboundFunction { override def bind(inputType: StructType): BoundFunction = new ScalarFunction[Int] {