Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -871,24 +871,24 @@ class Analyzer(override val catalogManager: CatalogManager)
object ResolveTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(ident, _, isStreaming) =>
lookupTempView(ident, isStreaming).getOrElse(u)
lookupTempView(ident, isStreaming, performCheck = true).getOrElse(u)
case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) =>
lookupTempView(ident)
lookupTempView(ident, performCheck = true)
.map(view => i.copy(table = view))
.getOrElse(i)
case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) =>
lookupTempView(ident)
lookupTempView(ident, performCheck = true)
.map(view => c.copy(table = view))
.getOrElse(c)
case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
lookupTempView(ident)
lookupTempView(ident, performCheck = true)
.map(view => c.copy(table = view, isTempView = true))
.getOrElse(c)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
case UnresolvedRelation(ident, _, false) =>
lookupTempView(ident).map(EliminateSubqueryAliases(_)).map {
lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map {
case r: DataSourceV2Relation => write.withNewTable(r)
case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted)
}.getOrElse(write)
Expand Down Expand Up @@ -921,7 +921,9 @@ class Analyzer(override val catalogManager: CatalogManager)
}

def lookupTempView(
identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = {
identifier: Seq[String],
isStreaming: Boolean = false,
performCheck: Boolean = false): Option[LogicalPlan] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when shall we skip the check?

Copy link
Contributor Author

@linhongliu-db linhongliu-db Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For UnresolvedView, UnresolvedTable, UnresolvedTableOrView, we use lookupTempView to check whether the temp view exists, but it's ok to skip the checkAnalysis. For example:

  1. In DropView, the UnresolvedView will change to ResolvedView even the referred table is dropped.
  2. When resolving UnresolvedTable, if it is a view, should analyzer should throw expectTableNotViewError rather than table or view not found

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imback82 This is something we should improve. For SELECT ... FROM view and DROP VIEW view, the way to lookup the view should be different.

For SELECT ... FROM view, we must fully resolve the view, as we need to execute it.

For DROP VIEW view, we only need to get the view metadata entry. The same to ALTER VIEW ....

I think we should change how we resolve UnresolvedTableOrView and UnresolvedView, to only get the view metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will think about this.

// Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView && !referredTempViewNames.contains(identifier)) return None

Expand All @@ -934,7 +936,7 @@ class Analyzer(override val catalogManager: CatalogManager)
if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted)
}
tmpView.map(ResolveRelations.resolveViews)
tmpView.map(ResolveRelations.resolveViews(_, performCheck))
}
}

Expand Down Expand Up @@ -1098,7 +1100,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name.
// If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names
// with it, instead of current catalog and namespace.
def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
def resolveViews(plan: LogicalPlan, performCheck: Boolean = false): LogicalPlan = plan match {
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
Expand All @@ -1115,9 +1117,18 @@ class Analyzer(override val catalogManager: CatalogManager)
executeSameContext(child)
}
}
// Fail the analysis eagerly because outside AnalysisContext, the unresolved operators
// inside a view maybe resolved incorrectly.
// But for commands like `DropViewCommand`, resolving view is unnecessary even though
// there is unresolved node. So use the `performCheck` flag to skip the analysis check
// for these commands.
// TODO(SPARK-34504): avoid unnecessary view resolving and remove the `performCheck` flag
if (performCheck) {
checkAnalysis(newChild)
}
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
p.copy(child = resolveViews(view))
p.copy(child = resolveViews(view, performCheck))
case _ => plan
}

Expand All @@ -1137,14 +1148,14 @@ class Analyzer(override val catalogManager: CatalogManager)

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(resolveViews)
.map(resolveViews(_, performCheck = true))
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(resolveViews)
.map(resolveViews(_, performCheck = true))
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)
Expand All @@ -1170,7 +1181,7 @@ class Analyzer(override val catalogManager: CatalogManager)

case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
.map(resolveViews).getOrElse(u)
.map(resolveViews(_, performCheck = true)).getOrElse(u)

case u @ UnresolvedTable(identifier, cmd, relationTypeMismatchHint) =>
lookupTableOrView(identifier).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis

import java.io.File

import scala.collection.JavaConverters._

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
Expand All @@ -27,8 +29,8 @@ import org.scalatest.matchers.must.Matchers
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.connector.InMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, V1Table}
import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table}
import org.apache.spark.sql.types._

class TableLookupCacheSuite extends AnalysisTest with Matchers {
Expand All @@ -46,7 +48,12 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers {
ignoreIfExists = false)
val v2Catalog = new InMemoryTableCatalog {
override def loadTable(ident: Identifier): Table = {
V1Table(externalCatalog.getTable("default", ident.name))
val catalogTable = externalCatalog.getTable("default", ident.name)
new InMemoryTable(
catalogTable.identifier.table,
catalogTable.schema,
Array.empty,
Map.empty[String, String].asJava)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change V1Table to V2Table here, because to lookup V1 table, the catalog will return UnresolvedCatalogRelation first and it needs FindDataSourceTable rule to resolve. But, FindDataSourceTable is in sql/core while the test is in sql/catalyst. So we need to use V2Table instead to avoid depending on FindDataSourceTable

}
override def name: String = CatalogManager.SESSION_CATALOG_NAME
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,26 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
checkViewOutput(viewName, Seq(Row(2)))
}
}

test("SPARK-34490 - query should fail if the view refers a dropped table") {
withTable("t") {
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
val viewName = createView("testView", "SELECT * FROM t")
withView(viewName) {
// Always create a temp view in this case, not use `createView` on purpose
sql("CREATE TEMP VIEW t AS SELECT 1 AS c1")
withTempView("t") {
checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1)))
// Manually drop table `t` to see if the query will fail
sql("DROP TABLE IF EXISTS default.t")
val e = intercept[AnalysisException] {
sql(s"SELECT * FROM $viewName").collect()
}.getMessage
assert(e.contains("Table or view not found: t"))
}
}
}
}
}

class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
Expand Down