diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a7443e71c0ca3..625ef2153c711 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -198,7 +198,6 @@ class Analyzer( ResolveTableValuedFunctions :: new ResolveCatalogs(catalogManager) :: ResolveInsertInto :: - ResolveTables :: ResolveRelations :: ResolveReferences :: ResolveCreateNamedStruct :: @@ -666,12 +665,26 @@ class Analyzer( } /** - * Resolve table relations with concrete relations from v2 catalog. + * Resolve relations to temp views. This is not an actual rule, and is only called by + * [[ResolveTables]]. + */ + object ResolveTempViews extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case u @ UnresolvedRelation(Seq(part1)) => + v1SessionCatalog.lookupTempView(part1).getOrElse(u) + case u @ UnresolvedRelation(Seq(part1, part2)) => + v1SessionCatalog.lookupGlobalTempView(part1, part2).getOrElse(u) + } + } + + /** + * Resolve table relations with concrete relations from v2 catalog. This is not an actual rule, + * and is only called by [[ResolveRelations]]. * * [[ResolveRelations]] still resolves v1 tables. */ object ResolveTables extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { case u: UnresolvedRelation => lookupV2Relation(u.multipartIdentifier) .getOrElse(u) @@ -733,10 +746,6 @@ class Analyzer( // Note this is compatible with the views defined by older versions of Spark(before 2.2), which // have empty defaultDatabase and all the relations in viewText have database part defined. def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match { - case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident)) - if v1SessionCatalog.isTemporaryTable(ident) => - resolveRelation(lookupTableFromCatalog(ident, u, AnalysisContext.get.defaultDatabase)) - case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) => val defaultDatabase = AnalysisContext.get.defaultDatabase val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase) @@ -767,7 +776,7 @@ class Analyzer( case _ => plan } - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + def apply(plan: LogicalPlan): LogicalPlan = ResolveTables(plan).resolveOperatorsUp { case i @ InsertIntoStatement(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match { @@ -2839,7 +2848,6 @@ class Analyzer( private def lookupV2RelationAndCatalog( identifier: Seq[String]): Option[(DataSourceV2Relation, CatalogPlugin, Identifier)] = identifier match { - case AsTemporaryViewIdentifier(ti) if v1SessionCatalog.isTemporaryTable(ti) => None case CatalogObjectIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) => CatalogV2Util.loadTable(catalog, ident) match { case Some(table) => Some((DataSourceV2Relation.create(table), catalog, ident)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e77f3c70f0bdf..96ca1ac73e043 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -767,6 +767,25 @@ class SessionCatalog( } } + def lookupTempView(table: String): Option[SubqueryAlias] = { + val formattedTable = formatTableName(table) + getTempView(formattedTable).map { view => + SubqueryAlias(formattedTable, view) + } + } + + def lookupGlobalTempView(db: String, table: String): Option[SubqueryAlias] = { + val formattedDB = formatDatabaseName(db) + if (formattedDB == globalTempViewManager.database) { + val formattedTable = formatTableName(table) + getGlobalTempView(formattedTable).map { view => + SubqueryAlias(formattedTable, formattedDB, view) + } + } else { + None + } + } + /** * Return whether a table with the specified name is a temporary view. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index d62148b2bbe45..135c180ef4000 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -71,7 +71,7 @@ class CatalogManager( * This happens when the source implementation extends the v2 TableProvider API and is not listed * in the fallback configuration, spark.sql.sources.write.useV1SourceList */ - private def v2SessionCatalog: CatalogPlugin = { + private[sql] def v2SessionCatalog: CatalogPlugin = { conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { customV2SessionCatalog => try { catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog()) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index 26ba93e57fc64..613c0d1797cc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** * A trait to encapsulate catalog lookup function and helpful extractors. @@ -120,10 +121,22 @@ private[sql] trait LookupCatalog extends Logging { * Extract catalog and the rest name parts from a multi-part identifier. */ object CatalogAndIdentifierParts { - def unapply(nameParts: Seq[String]): Some[(CatalogPlugin, Seq[String])] = { + private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + + def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = { assert(nameParts.nonEmpty) try { - Some((catalogManager.catalog(nameParts.head), nameParts.tail)) + // Conceptually global temp views are in a special reserved catalog. However, the v2 catalog + // API does not support view yet, and we have to use v1 commands to deal with global temp + // views. To simplify the implementation, we put global temp views in a special namespace + // in the session catalog. The special namespace has higher priority during name resolution. + // For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`, + // this custom catalog can't be accessed. + if (nameParts.head.equalsIgnoreCase(globalTempDB)) { + Some((catalogManager.v2SessionCatalog, nameParts)) + } else { + Some((catalogManager.catalog(nameParts.head), nameParts.tail)) + } } catch { case _: CatalogNotFoundException => Some((currentCatalog, nameParts)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 2a44251e102a0..2958b57bf06ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType} @@ -1786,6 +1786,20 @@ class DataSourceV2SQLSuite } } + test("global temp view should not be masked by v2 catalog") { + val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName) + + try { + sql("create global temp view v as select 1") + sql(s"alter view $globalTempDB.v rename to v2") + checkAnswer(spark.table(s"$globalTempDB.v2"), Row(1)) + sql(s"drop view $globalTempDB.v2") + } finally { + spark.sharedState.globalTempViewManager.clear() + } + } + private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams")