diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 29341aecc1842..38259c234c262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -871,24 +871,24 @@ class Analyzer(override val catalogManager: CatalogManager) object ResolveTempViews extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case u @ UnresolvedRelation(ident, _, isStreaming) => - lookupTempView(ident, isStreaming).getOrElse(u) + lookupTempView(ident, isStreaming, performCheck = true).getOrElse(u) case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) => - lookupTempView(ident) + lookupTempView(ident, performCheck = true) .map(view => i.copy(table = view)) .getOrElse(i) case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) => - lookupTempView(ident) + lookupTempView(ident, performCheck = true) .map(view => c.copy(table = view)) .getOrElse(c) case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) => - lookupTempView(ident) + lookupTempView(ident, performCheck = true) .map(view => c.copy(table = view, isTempView = true)) .getOrElse(c) // TODO (SPARK-27484): handle streaming write commands when we have them. case write: V2WriteCommand => write.table match { case UnresolvedRelation(ident, _, false) => - lookupTempView(ident).map(EliminateSubqueryAliases(_)).map { + lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map { case r: DataSourceV2Relation => write.withNewTable(r) case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted) }.getOrElse(write) @@ -921,7 +921,9 @@ class Analyzer(override val catalogManager: CatalogManager) } def lookupTempView( - identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = { + identifier: Seq[String], + isStreaming: Boolean = false, + performCheck: Boolean = false): Option[LogicalPlan] = { // Permanent View can't refer to temp views, no need to lookup at all. if (isResolvingView && !referredTempViewNames.contains(identifier)) return None @@ -934,7 +936,7 @@ class Analyzer(override val catalogManager: CatalogManager) if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) { throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted) } - tmpView.map(ResolveRelations.resolveViews) + tmpView.map(ResolveRelations.resolveViews(_, performCheck)) } } @@ -1098,7 +1100,7 @@ class Analyzer(override val catalogManager: CatalogManager) // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name. // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names // with it, instead of current catalog and namespace. - def resolveViews(plan: LogicalPlan): LogicalPlan = plan match { + def resolveViews(plan: LogicalPlan, performCheck: Boolean = false): LogicalPlan = plan match { // The view's child should be a logical plan parsed from the `desc.viewText`, the variable // `viewText` should be defined, or else we throw an error on the generation of the View // operator. @@ -1115,9 +1117,18 @@ class Analyzer(override val catalogManager: CatalogManager) executeSameContext(child) } } + // Fail the analysis eagerly because outside AnalysisContext, the unresolved operators + // inside a view maybe resolved incorrectly. + // But for commands like `DropViewCommand`, resolving view is unnecessary even though + // there is unresolved node. So use the `performCheck` flag to skip the analysis check + // for these commands. + // TODO(SPARK-34504): avoid unnecessary view resolving and remove the `performCheck` flag + if (performCheck) { + checkAnalysis(newChild) + } view.copy(child = newChild) case p @ SubqueryAlias(_, view: View) => - p.copy(child = resolveViews(view)) + p.copy(child = resolveViews(view, performCheck)) case _ => plan } @@ -1137,14 +1148,14 @@ class Analyzer(override val catalogManager: CatalogManager) case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) => lookupRelation(u.multipartIdentifier, u.options, false) - .map(resolveViews) + .map(resolveViews(_, performCheck = true)) .map(EliminateSubqueryAliases(_)) .map(relation => c.copy(table = relation)) .getOrElse(c) case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) => lookupRelation(u.multipartIdentifier, u.options, false) - .map(resolveViews) + .map(resolveViews(_, performCheck = true)) .map(EliminateSubqueryAliases(_)) .map(relation => c.copy(table = relation)) .getOrElse(c) @@ -1170,7 +1181,7 @@ class Analyzer(override val catalogManager: CatalogManager) case u: UnresolvedRelation => lookupRelation(u.multipartIdentifier, u.options, u.isStreaming) - .map(resolveViews).getOrElse(u) + .map(resolveViews(_, performCheck = true)).getOrElse(u) case u @ UnresolvedTable(identifier, cmd, relationTypeMismatchHint) => lookupTableOrView(identifier).map { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala index 3e9a8b71a8fb6..ec9480514ba2d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import java.io.File +import scala.collection.JavaConverters._ + import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -27,8 +29,8 @@ import org.scalatest.matchers.must.Matchers import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.connector.InMemoryTableCatalog -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, V1Table} +import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table} import org.apache.spark.sql.types._ class TableLookupCacheSuite extends AnalysisTest with Matchers { @@ -46,7 +48,12 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers { ignoreIfExists = false) val v2Catalog = new InMemoryTableCatalog { override def loadTable(ident: Identifier): Table = { - V1Table(externalCatalog.getTable("default", ident.name)) + val catalogTable = externalCatalog.getTable("default", ident.name) + new InMemoryTable( + catalogTable.identifier.table, + catalogTable.schema, + Array.empty, + Map.empty[String, String].asJava) } override def name: String = CatalogManager.SESSION_CATALOG_NAME } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 68e1a682562ac..84a20bb16ad86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -258,6 +258,26 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils { checkViewOutput(viewName, Seq(Row(2))) } } + + test("SPARK-34490 - query should fail if the view refers a dropped table") { + withTable("t") { + Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t") + val viewName = createView("testView", "SELECT * FROM t") + withView(viewName) { + // Always create a temp view in this case, not use `createView` on purpose + sql("CREATE TEMP VIEW t AS SELECT 1 AS c1") + withTempView("t") { + checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1))) + // Manually drop table `t` to see if the query will fail + sql("DROP TABLE IF EXISTS default.t") + val e = intercept[AnalysisException] { + sql(s"SELECT * FROM $viewName").collect() + }.getMessage + assert(e.contains("Table or view not found: t")) + } + } + } + } } class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {