Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ class Analyzer(
ResolveTableValuedFunctions ::
new ResolveCatalogs(catalogManager) ::
ResolveInsertInto ::
ResolveTables ::
ResolveRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
Expand Down Expand Up @@ -666,12 +665,26 @@ class Analyzer(
}

/**
* Resolve table relations with concrete relations from v2 catalog.
* Resolve relations to temp views. This is not an actual rule, and is only called by
* [[ResolveTables]].
*/
object ResolveTempViews extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this is refactored into a separate rule. Can we move it to an earlier batch? If metastore views can't contain temp views, then there's no reason to do this in the same batch as table and view resolution from catalogs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the sync, we decide to keep it in the current batch for safety, in case some user-supplied analyzers rules need Spark to resolve unresolved temp views.

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(Seq(part1)) =>
v1SessionCatalog.lookupTempView(part1).getOrElse(u)
case u @ UnresolvedRelation(Seq(part1, part2)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to check whether part1 is a known catalog. If it is a catalog, then it isn't a temp view reference because catalog resolution happens first.

Not needing to remember that is the purpose of the extractors. I think it would be better to continue using the extractor:

  case u @ UnresolvedRelation(AsTemporaryTableIdentifier(ident)) =>
    ident.database match {
      case Some(db) =>
        v1SessionCatalog.lookupGlobalTempView(db, ident.table).getOrElse(u)
      case None =>
        v1SessionCatalog.lookupTempView(ident.table).getOrElse(u)
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the sync, we decide to treat the global temp view name prefix global_temp as a special catalog, so that it won't be masked by user-supplied catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is intended to only match global_temp then it should match Seq(GLOBAL_TEMP_NAMESPACE, name) instead of matching part1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, global_temp is not a constant, it's a static SQL conf that users can set before starting a Spark application(not at runtime)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's why I went ahead with the merge. I think the code is currently correct.

Still, it would be nice to use the runtime setting here in the matcher instead.

v1SessionCatalog.lookupGlobalTempView(part1, part2).getOrElse(u)
}
}

/**
* Resolve table relations with concrete relations from v2 catalog. This is not an actual rule,
* and is only called by [[ResolveRelations]].
*
* [[ResolveRelations]] still resolves v1 tables.
*/
object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
.getOrElse(u)
Expand Down Expand Up @@ -733,10 +746,6 @@ class Analyzer(
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
// have empty defaultDatabase and all the relations in viewText have database part defined.
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
case u @ UnresolvedRelation(AsTemporaryViewIdentifier(ident))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be simpler to just call ResolveTables.apply(plan) match { here than to embed all the logic within lookupTableFromCatalog?

if v1SessionCatalog.isTemporaryTable(ident) =>
resolveRelation(lookupTableFromCatalog(ident, u, AnalysisContext.get.defaultDatabase))

case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) =>
val defaultDatabase = AnalysisContext.get.defaultDatabase
val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase)
Expand Down Expand Up @@ -767,7 +776,7 @@ class Analyzer(
case _ => plan
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTables(plan).resolveOperatorsUp {
case i @ InsertIntoStatement(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _)
if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match {
Expand Down Expand Up @@ -2839,7 +2848,6 @@ class Analyzer(
private def lookupV2RelationAndCatalog(
identifier: Seq[String]): Option[(DataSourceV2Relation, CatalogPlugin, Identifier)] =
identifier match {
case AsTemporaryViewIdentifier(ti) if v1SessionCatalog.isTemporaryTable(ti) => None
case CatalogObjectIdentifier(catalog, ident) if !CatalogV2Util.isSessionCatalog(catalog) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) => Some((DataSourceV2Relation.create(table), catalog, ident))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,25 @@ class SessionCatalog(
}
}

def lookupTempView(table: String): Option[SubqueryAlias] = {
val formattedTable = formatTableName(table)
getTempView(formattedTable).map { view =>
SubqueryAlias(formattedTable, view)
}
}

def lookupGlobalTempView(db: String, table: String): Option[SubqueryAlias] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is safe and I do prefer these methods to a combined resolveRelation, but I'm curious why you decided not to use the existing method?

Copy link
Contributor Author

@cloud-fan cloud-fan Nov 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to clearly separate the resolution of temp view and v1/v2 table, so I'd like to avoid using resolveRelation which mixes things together.

These 2 methods mostly copy-paste code from resolveRelation. We can update resolveRelation to only resolve v1 tables, but I'd like to do it later as there are many tests calling resolveRelation and we need to update them as well.

val formattedDB = formatDatabaseName(db)
if (formattedDB == globalTempViewManager.database) {
val formattedTable = formatTableName(table)
getGlobalTempView(formattedTable).map { view =>
SubqueryAlias(formattedTable, formattedDB, view)
}
} else {
None
}
}

/**
* Return whether a table with the specified name is a temporary view.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class CatalogManager(
* This happens when the source implementation extends the v2 TableProvider API and is not listed
* in the fallback configuration, spark.sql.sources.write.useV1SourceList
*/
private def v2SessionCatalog: CatalogPlugin = {
private[sql] def v2SessionCatalog: CatalogPlugin = {
conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { customV2SessionCatalog =>
try {
catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.connector.catalog

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}

/**
* A trait to encapsulate catalog lookup function and helpful extractors.
Expand Down Expand Up @@ -120,10 +121,22 @@ private[sql] trait LookupCatalog extends Logging {
* Extract catalog and the rest name parts from a multi-part identifier.
*/
object CatalogAndIdentifierParts {
def unapply(nameParts: Seq[String]): Some[(CatalogPlugin, Seq[String])] = {
private val globalTempDB = SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)

def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = {
assert(nameParts.nonEmpty)
try {
Some((catalogManager.catalog(nameParts.head), nameParts.tail))
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
// API does not support view yet, and we have to use v1 commands to deal with global temp
// views. To simplify the implementation, we put global temp views in a special namespace
// in the session catalog. The special namespace has higher priority during name resolution.
// For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
// this custom catalog can't be accessed.
if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
Some((catalogManager.v2SessionCatalog, nameParts))
} else {
Some((catalogManager.catalog(nameParts.head), nameParts.tail))
}
} catch {
case _: CatalogNotFoundException =>
Some((currentCatalog, nameParts))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.SimpleScanSource
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType}
Expand Down Expand Up @@ -1786,6 +1786,20 @@ class DataSourceV2SQLSuite
}
}

test("global temp view should not be masked by v2 catalog") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the behavior make sense to you? @rdblue @brkyvz

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is okay for now, but I think it is a little confusing that the catalog is completely ignored. I think this should result in an error instead, but we can do that in a follow-up.

val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)

try {
sql("create global temp view v as select 1")
sql(s"alter view $globalTempDB.v rename to v2")
checkAnswer(spark.table(s"$globalTempDB.v2"), Row(1))
sql(s"drop view $globalTempDB.v2")
} finally {
spark.sharedState.globalTempViewManager.clear()
}
}

private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
Expand Down