diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 999a9b0a4aec6..87996d66f6bb0 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -538,6 +538,11 @@ "AES- with the padding by the function." ] }, + "CATALOG_OPERATION" : { + "message" : [ + "Catalog does not support ." + ] + }, "DESC_TABLE_COLUMN_PARTITION" : { "message" : [ "DESC TABLE COLUMN for a specific partition." diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index ade20dbff83ff..67410c48d0b07 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -299,10 +299,15 @@ abstract class SparkFunSuite parameters: Map[String, String] = Map.empty, matchPVals: Boolean = false, queryContext: Array[QueryContext] = Array.empty): Unit = { - assert(exception.getErrorClass === errorClass) + val mainErrorClass :: tail = errorClass.split("\\.").toList + assert(tail.isEmpty || tail.length == 1) + // TODO: remove the `errorSubClass` parameter. + assert(tail.isEmpty || errorSubClass.isEmpty) + assert(exception.getErrorClass === mainErrorClass) if (exception.getErrorSubClass != null) { - assert(errorSubClass.isDefined) - assert(exception.getErrorSubClass === errorSubClass.get) + val subClass = errorSubClass.orElse(tail.headOption) + assert(subClass.isDefined) + assert(exception.getErrorSubClass === subClass.get) } sqlState.foreach(state => assert(exception.getSqlState === state)) val expectedParameters = (exception.getParameterNames zip exception.getMessageParameters).toMap diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6fc9d756c998d..c1fcac3521c69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1090,7 +1090,7 @@ class Analyzer(override val catalogManager: CatalogManager) }.getOrElse(u) case u @ UnresolvedView(identifier, cmd, allowTemp, relationTypeMismatchHint) => - lookupTableOrView(identifier).map { + lookupTableOrView(identifier, viewOnly = true).map { case _: ResolvedTempView if !allowTemp => throw QueryCompilationErrors.expectViewNotTempViewError(identifier, cmd, u) case t: ResolvedTable => @@ -1136,12 +1136,17 @@ class Analyzer(override val catalogManager: CatalogManager) * Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is * for resolving DDL and misc commands. */ - private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = { + private def lookupTableOrView( + identifier: Seq[String], + viewOnly: Boolean = false): Option[LogicalPlan] = { lookupTempView(identifier).map { tempView => ResolvedTempView(identifier.asIdentifier, tempView.tableMeta.schema) }.orElse { expandIdentifier(identifier) match { case CatalogAndIdentifier(catalog, ident) => + if (viewOnly && !CatalogV2Util.isSessionCatalog(catalog)) { + throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views") + } CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) && v1Table.v1Table.tableType == CatalogTableType.VIEW => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 68ed8991553e3..587ede9a81bf2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -131,12 +131,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case u: UnresolvedTable => u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}") - case u @ UnresolvedView(NonSessionCatalogAndIdentifier(catalog, ident), cmd, _, _) => - u.failAnalysis( - s"Cannot specify catalog `${catalog.name}` for view ${ident.quoted} " + - "because view support in v2 catalog has not been implemented yet. " + - s"$cmd expects a view.") - case u: UnresolvedView => u.failAnalysis(s"View not found: ${u.multipartIdentifier.quoted}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 805f3080c8472..343eca252fb28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -54,6 +54,10 @@ case class NoSuchTableException( def this(tableIdent: Identifier) = { this(s"Table ${tableIdent.quoted} not found") } + + def this(nameParts: Seq[String]) = { + this(s"Table ${nameParts.quoted} not found") + } } case class NoSuchPartitionException( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 9893384b709f5..221f1a0f3135c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog} /** * Resolves the catalog of the name parts for table/view/function/namespace. @@ -28,8 +28,14 @@ class ResolveCatalogs(val catalogManager: CatalogManager) extends Rule[LogicalPlan] with LookupCatalog { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case UnresolvedIdentifier(CatalogAndIdentifier(catalog, identifier)) => - ResolvedIdentifier(catalog, identifier) + case UnresolvedIdentifier(nameParts, allowTemp) => + if (allowTemp && catalogManager.v1SessionCatalog.isTempView(nameParts)) { + val ident = Identifier.of(nameParts.dropRight(1).toArray, nameParts.last) + ResolvedIdentifier(FakeSystemCatalog, ident) + } else { + val CatalogAndIdentifier(catalog, identifier) = nameParts + ResolvedIdentifier(catalog, identifier) + } case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) => s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace)) case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala index a7370254826b4..7a2bd1ccc154f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable} +import org.apache.spark.sql.catalyst.plans.logical.{DropFunction, LogicalPlan, NoopCommand, UncacheTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND @@ -29,10 +29,6 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND object ResolveCommandsWithIfExists extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(COMMAND)) { - case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists => - NoopCommand("DROP TABLE", u.multipartIdentifier) - case DropView(u: UnresolvedView, ifExists) if ifExists => - NoopCommand("DROP VIEW", u.multipartIdentifier) case UncacheTable(u: UnresolvedRelation, ifExists, _) if ifExists => NoopCommand("UNCACHE TABLE", u.multipartIdentifier) case DropFunction(u: UnresolvedFunc, ifExists) if ifExists => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index c3fc5533a8e02..321eecf42b09a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to @@ -135,7 +136,8 @@ case class UnresolvedFunc( * Holds the name of a table/view/function identifier that we need to determine the catalog. It will * be resolved to [[ResolvedIdentifier]] during analysis. */ -case class UnresolvedIdentifier(nameParts: Seq[String]) extends LeafNode { +case class UnresolvedIdentifier(nameParts: Seq[String], allowTemp: Boolean = false) + extends LeafNode { override lazy val resolved: Boolean = false override def output: Seq[Attribute] = Nil } @@ -244,3 +246,9 @@ case class ResolvedIdentifier( identifier: Identifier) extends LeafNodeWithoutStats { override def output: Seq[Attribute] = Nil } + +// A fake v2 catalog to hold temp views. +object FakeSystemCatalog extends CatalogPlugin { + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {} + override def name(): String = "SYSTEM" +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 76de49d86dc42..d2ea8df415108 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3673,7 +3673,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) { // DROP TABLE works with either a table or a temporary view. DropTable( - createUnresolvedTableOrView(ctx.multipartIdentifier(), "DROP TABLE"), + UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier()), allowTemp = true), ctx.EXISTS != null, ctx.PURGE != null) } @@ -3683,11 +3683,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit */ override def visitDropView(ctx: DropViewContext): AnyRef = withOrigin(ctx) { DropView( - createUnresolvedView( - ctx.multipartIdentifier(), - commandName = "DROP VIEW", - allowTemp = true, - relationTypeMismatchHint = Some("Please use DROP TABLE instead.")), + UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier()), allowTemp = true), ctx.EXISTS != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index c1d8f0a4a8a51..ef1d0dd94fc91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -604,6 +604,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { "operation" -> operation)) } + def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String): Throwable = { + new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE", + errorSubClass = "CATALOG_OPERATION", + messageParameters = Map( + "catalogName" -> toSQLId(Seq(catalog.name())), + "operation" -> operation)) + } + def alterColumnWithV1TableCannotSpecifyNotNullError(): Throwable = { new AnalysisException("ALTER COLUMN with v1 tables cannot specify NOT NULL.") } @@ -958,6 +967,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { new NoSuchTableException(ident) } + def noSuchTableError(nameParts: Seq[String]): Throwable = { + new NoSuchTableException(nameParts) + } + def noSuchNamespaceError(namespace: Array[String]): Throwable = { new NoSuchNamespaceException(namespace) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala index e50a58f8ce5fe..785d5ae05cfff 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExceptionPositionSuite.scala @@ -34,14 +34,12 @@ class AnalysisExceptionPositionSuite extends AnalysisTest { } test("SPARK-33918: UnresolvedView should retain sql text position") { - verifyViewPosition("DROP VIEW unknown", "unknown") verifyViewPosition("ALTER VIEW unknown SET TBLPROPERTIES ('k'='v')", "unknown") verifyViewPosition("ALTER VIEW unknown UNSET TBLPROPERTIES ('k')", "unknown") verifyViewPosition("ALTER VIEW unknown AS SELECT 1", "unknown") } test("SPARK-34057: UnresolvedTableOrView should retain sql text position") { - verifyTableOrViewPosition("DROP TABLE unknown", "unknown") verifyTableOrViewPosition("DESCRIBE TABLE unknown", "unknown") verifyTableOrPermanentViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS", "unknown") verifyTableOrViewPosition("ANALYZE TABLE unknown COMPUTE STATISTICS FOR COLUMNS col", "unknown") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 25bacc3631efb..ba2cc4ff15e57 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -685,15 +685,15 @@ class DDLParserSuite extends AnalysisTest { val cmd = "DROP VIEW" val hint = Some("Please use DROP TABLE instead.") parseCompare(s"DROP VIEW testcat.db.view", - DropView(UnresolvedView(Seq("testcat", "db", "view"), cmd, true, hint), ifExists = false)) + DropView(UnresolvedIdentifier(Seq("testcat", "db", "view"), true), ifExists = false)) parseCompare(s"DROP VIEW db.view", - DropView(UnresolvedView(Seq("db", "view"), cmd, true, hint), ifExists = false)) + DropView(UnresolvedIdentifier(Seq("db", "view"), true), ifExists = false)) parseCompare(s"DROP VIEW IF EXISTS db.view", - DropView(UnresolvedView(Seq("db", "view"), cmd, true, hint), ifExists = true)) + DropView(UnresolvedIdentifier(Seq("db", "view"), true), ifExists = true)) parseCompare(s"DROP VIEW view", - DropView(UnresolvedView(Seq("view"), cmd, true, hint), ifExists = false)) + DropView(UnresolvedIdentifier(Seq("view"), true), ifExists = false)) parseCompare(s"DROP VIEW IF EXISTS view", - DropView(UnresolvedView(Seq("view"), cmd, true, hint), ifExists = true)) + DropView(UnresolvedIdentifier(Seq("view"), true), ifExists = true)) } private def testCreateOrReplaceDdl( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index cca0a56174da1..56236f0d2ad03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -216,19 +216,23 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) c } - case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) => + case DropTable(ResolvedV1Identifier(ident), ifExists, purge) => DropTableCommand(ident, ifExists, isView = false, purge = purge) - case DropTable(_: ResolvedPersistentView, ifExists, purge) => - throw QueryCompilationErrors.cannotDropViewWithDropTableError - // v1 DROP TABLE supports temp view. - case DropTable(ResolvedTempView(ident, _), ifExists, purge) => - DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge) + case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) => + DropTempViewCommand(ident) - case DropView(ResolvedViewIdentifier(ident), ifExists) => + case DropView(ResolvedV1Identifier(ident), ifExists) => DropTableCommand(ident, ifExists, isView = true, purge = false) + case DropView(r @ ResolvedIdentifier(catalog, ident), _) => + if (catalog == FakeSystemCatalog) { + DropTempViewCommand(ident) + } else { + throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views") + } + case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command => val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) val location = c.properties.get(SupportsNamespaces.PROP_LOCATION) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 527f78ef10e34..e9bbbc717d1e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint -import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint} +import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View} import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation @@ -159,11 +159,51 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { plan: LogicalPlan, cascade: Boolean, blocking: Boolean = false): Unit = { + uncacheQuery(spark, _.sameResult(plan), cascade, blocking) + } + + def uncacheTableOrView(spark: SparkSession, name: Seq[String], cascade: Boolean): Unit = { + uncacheQuery( + spark, + isMatchedTableOrView(_, name, spark.sessionState.conf), + cascade, + blocking = false) + } + + private def isMatchedTableOrView(plan: LogicalPlan, name: Seq[String], conf: SQLConf): Boolean = { + def isSameName(nameInCache: Seq[String]): Boolean = { + nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled) + } + + plan match { + case SubqueryAlias(ident, LogicalRelation(_, _, Some(catalogTable), _)) => + val v1Ident = catalogTable.identifier + isSameName(ident.qualifier :+ ident.name) && + isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) + + case SubqueryAlias(ident, DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _)) => + isSameName(ident.qualifier :+ ident.name) && + isSameName(catalog.name() +: v2Ident.namespace() :+ v2Ident.name()) + + case SubqueryAlias(ident, View(catalogTable, _, _)) => + val v1Ident = catalogTable.identifier + isSameName(ident.qualifier :+ ident.name) && + isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) + + case _ => false + } + } + + def uncacheQuery( + spark: SparkSession, + isMatchedPlan: LogicalPlan => Boolean, + cascade: Boolean, + blocking: Boolean): Unit = { val shouldRemove: LogicalPlan => Boolean = if (cascade) { - _.exists(_.sameResult(plan)) + _.exists(isMatchedPlan) } else { - _.sameResult(plan) + isMatchedPlan } val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) this.synchronized { @@ -187,7 +227,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // will keep it as it is. It means the physical plan has been re-compiled already in the // other thread. val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded - cd.plan.exists(_.sameResult(plan)) && !cacheAlreadyLoaded + cd.plan.exists(isMatchedPlan) && !cacheAlreadyLoaded }) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 078358b6c7dbd..1f71a104707a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -203,7 +203,8 @@ case class DescribeDatabaseCommand( } /** - * Drops a table/view from the metastore and removes it if it is cached. + * Drops a table/view from the metastore and removes it if it is cached. This command does not drop + * temp views, which should be handled by [[DropTempViewCommand]]. * * The syntax of this command is: * {{{ @@ -219,9 +220,8 @@ case class DropTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val isTempView = catalog.isTempView(tableName) - if (!isTempView && catalog.tableExists(tableName)) { + if (catalog.tableExists(tableName)) { // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view // issue an exception. catalog.getTableMetadata(tableName).tableType match { @@ -231,14 +231,10 @@ case class DropTableCommand( throw QueryCompilationErrors.cannotDropViewWithDropTableError() case _ => } - } - if (isTempView || catalog.tableExists(tableName)) { try { - val hasViewText = isTempView && - catalog.getTempViewOrPermanentTableMetadata(tableName).viewText.isDefined sparkSession.sharedState.cacheManager.uncacheQuery( - sparkSession.table(tableName), cascade = !isTempView || hasViewText) + sparkSession.table(tableName), cascade = true) } catch { case NonFatal(e) => log.warn(e.toString, e) } @@ -247,7 +243,28 @@ case class DropTableCommand( } else if (ifExists) { // no-op } else { - throw QueryCompilationErrors.tableOrViewNotFoundError(tableName.identifier) + throw QueryCompilationErrors.noSuchTableError( + tableName.catalog.toSeq ++ tableName.database :+ tableName.table) + } + Seq.empty[Row] + } +} + +case class DropTempViewCommand(ident: Identifier) extends LeafRunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(ident.namespace().isEmpty || ident.namespace().length == 1) + val nameParts = ident.namespace() :+ ident.name() + val catalog = sparkSession.sessionState.catalog + catalog.getRawLocalOrGlobalTempView(nameParts).foreach { view => + val hasViewText = view.tableMeta.viewText.isDefined + sparkSession.sharedState.cacheManager.uncacheTableOrView( + sparkSession, nameParts, cascade = hasViewText) + view.refresh() + if (ident.namespace().isEmpty) { + catalog.dropTempView(ident.name()) + } else { + catalog.dropGlobalTempView(ident.name()) + } } Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 35a5f41fb1768..39ad51ffbe73c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -326,8 +326,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat "DESC TABLE COLUMN", toPrettySQL(nested)) } - case DropTable(r: ResolvedTable, ifExists, purge) => - DropTableExec(r.catalog, r.identifier, ifExists, purge, invalidateTableCache(r)) :: Nil + case DropTable(r: ResolvedIdentifier, ifExists, purge) => + val invalidateFunc = () => session.sharedState.cacheManager.uncacheTableOrView( + session, r.catalog.name() +: r.identifier.namespace() :+ r.identifier.name(), + cascade = true) + DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil case _: NoopCommand => LocalTableScanExec(Nil, Nil) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index 1e0627fb6dfdd..2125b58813f85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -37,7 +37,8 @@ case class DropTableExec( invalidateCache() if (purge) catalog.purgeTable(ident) else catalog.dropTable(ident) } else if (!ifExists) { - throw QueryCompilationErrors.noSuchTableError(ident) + throw QueryCompilationErrors.noSuchTableError( + catalog.name() +: ident.namespace() :+ ident.name()) } Seq.empty diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 11f4fe0649be4..7a97efe088c63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2089,33 +2089,18 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT } test("View commands are not supported in v2 catalogs") { - def validateViewCommand( - sql: String, - catalogName: String, - viewName: String, - cmdName: String): Unit = { - assertAnalysisError( - sql, - s"Cannot specify catalog `$catalogName` for view $viewName because view support " + - s"in v2 catalog has not been implemented yet. $cmdName expects a view.") - } - - validateViewCommand("DROP VIEW testcat.v", "testcat", "v", "DROP VIEW") - validateViewCommand( - "ALTER VIEW testcat.v SET TBLPROPERTIES ('key' = 'val')", - "testcat", - "v", - "ALTER VIEW ... SET TBLPROPERTIES") - validateViewCommand( - "ALTER VIEW testcat.v UNSET TBLPROPERTIES ('key')", - "testcat", - "v", - "ALTER VIEW ... UNSET TBLPROPERTIES") - validateViewCommand( - "ALTER VIEW testcat.v AS SELECT 1", - "testcat", - "v", - "ALTER VIEW ... AS") + def validateViewCommand(sqlStatement: String): Unit = { + val e = intercept[AnalysisException](sql(sqlStatement)) + checkError( + e, + errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION", + parameters = Map("catalogName" -> "`testcat`", "operation" -> "views")) + } + + validateViewCommand("DROP VIEW testcat.v") + validateViewCommand("ALTER VIEW testcat.v SET TBLPROPERTIES ('key' = 'val')") + validateViewCommand("ALTER VIEW testcat.v UNSET TBLPROPERTIES ('key')") + validateViewCommand("ALTER VIEW testcat.v AS SELECT 1") } test("SPARK-33924: INSERT INTO .. PARTITION preserves the partition location") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index c7fa365abbdeb..14af2b8241125 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -962,7 +962,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { sql("DROP VIEW dbx.tab1") } assert(e.getMessage.contains( - "dbx.tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead.")) + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")) } protected def testSetProperties(isDatasourceTable: Boolean): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala index 60c7cd8dd6f8b..7e81ad66436af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableParserSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedIdentifier} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan} import org.apache.spark.sql.test.SharedSparkSession @@ -29,31 +29,26 @@ class DropTableParserSuite extends AnalysisTest with SharedSparkSession { test("drop table") { parseCompare("DROP TABLE testcat.ns1.ns2.tbl", - DropTable( - UnresolvedTableOrView(Seq("testcat", "ns1", "ns2", "tbl"), "DROP TABLE", true), + DropTable(UnresolvedIdentifier(Seq("testcat", "ns1", "ns2", "tbl"), true), ifExists = false, purge = false)) parseCompare(s"DROP TABLE db.tab", DropTable( - UnresolvedTableOrView(Seq("db", "tab"), "DROP TABLE", true), + UnresolvedIdentifier(Seq("db", "tab"), true), ifExists = false, purge = false)) parseCompare(s"DROP TABLE IF EXISTS db.tab", DropTable( - UnresolvedTableOrView(Seq("db", "tab"), "DROP TABLE", true), + UnresolvedIdentifier(Seq("db", "tab"), true), ifExists = true, purge = false)) parseCompare(s"DROP TABLE tab", - DropTable( - UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = false, purge = false)) + DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = false, purge = false)) parseCompare(s"DROP TABLE IF EXISTS tab", - DropTable( - UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = true, purge = false)) + DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = true, purge = false)) parseCompare(s"DROP TABLE tab PURGE", - DropTable( - UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = false, purge = true)) + DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = false, purge = true)) parseCompare(s"DROP TABLE IF EXISTS tab PURGE", - DropTable( - UnresolvedTableOrView(Seq("tab"), "DROP TABLE", true), ifExists = true, purge = true)) + DropTable(UnresolvedIdentifier(Seq("tab"), true), ifExists = true, purge = true)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala index 3c9b39af8ef22..c26022addf0c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DropTableSuiteBase.scala @@ -57,7 +57,7 @@ trait DropTableSuiteBase extends QueryTest with DDLCommandTestUtils { val errMsg = intercept[AnalysisException] { sql(s"DROP TABLE $catalog.ns.tbl") }.getMessage - assert(errMsg.contains("Table or view not found")) + assert(errMsg.contains(s"Table $catalog.ns.tbl not found")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 7e8816553499d..e678b866bfe4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -713,13 +713,13 @@ class PlanResolutionSuite extends AnalysisTest { val tableIdent2 = Identifier.of(Array.empty, "tab") parseResolveCompare(s"DROP TABLE $tableName1", - DropTable(ResolvedTable.create(testCat, tableIdent1, table), ifExists = false, purge = false)) + DropTable(ResolvedIdentifier(testCat, tableIdent1), ifExists = false, purge = false)) parseResolveCompare(s"DROP TABLE IF EXISTS $tableName1", - DropTable(ResolvedTable.create(testCat, tableIdent1, table), ifExists = true, purge = false)) + DropTable(ResolvedIdentifier(testCat, tableIdent1), ifExists = true, purge = false)) parseResolveCompare(s"DROP TABLE $tableName2", - DropTable(ResolvedTable.create(testCat, tableIdent2, table), ifExists = false, purge = false)) + DropTable(ResolvedIdentifier(testCat, tableIdent2), ifExists = false, purge = false)) parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2", - DropTable(ResolvedTable.create(testCat, tableIdent2, table), ifExists = true, purge = false)) + DropTable(ResolvedIdentifier(testCat, tableIdent2), ifExists = true, purge = false)) } test("drop view") { @@ -728,7 +728,7 @@ class PlanResolutionSuite extends AnalysisTest { val viewName2 = "view" val viewIdent2 = TableIdentifier("view", Option("default"), Some(SESSION_CATALOG_NAME)) val tempViewName = "v" - val tempViewIdent = TableIdentifier("v") + val tempViewIdent = Identifier.of(Array.empty, "v") parseResolveCompare(s"DROP VIEW $viewName1", DropTableCommand(viewIdent1, ifExists = false, isView = true, purge = false)) @@ -739,16 +739,19 @@ class PlanResolutionSuite extends AnalysisTest { parseResolveCompare(s"DROP VIEW IF EXISTS $viewName2", DropTableCommand(viewIdent2, ifExists = true, isView = true, purge = false)) parseResolveCompare(s"DROP VIEW $tempViewName", - DropTableCommand(tempViewIdent, ifExists = false, isView = true, purge = false)) + DropTempViewCommand(tempViewIdent)) parseResolveCompare(s"DROP VIEW IF EXISTS $tempViewName", - DropTableCommand(tempViewIdent, ifExists = true, isView = true, purge = false)) + DropTempViewCommand(tempViewIdent)) } test("drop view in v2 catalog") { - intercept[AnalysisException] { + val e = intercept[AnalysisException] { parseAndResolve("DROP VIEW testcat.db.view", checkAnalysis = true) - }.getMessage.toLowerCase(Locale.ROOT).contains( - "view support in catalog has not been implemented") + } + checkError( + e, + errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION", + parameters = Map("catalogName" -> "`testcat`", "operation" -> "views")) } // ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 7aa8adc07edd3..7371b6cf0bc5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -82,9 +82,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { checkAnswer(sql("SHOW TABLES IN h2.test"), Seq(Row("test", "people", false))) Seq( "h2.test.not_existing_table" -> - "Table or view not found: h2.test.not_existing_table", + "Table h2.test.not_existing_table not found", "h2.bad_test.not_existing_table" -> - "Table or view not found: h2.bad_test.not_existing_table" + "Table h2.bad_test.not_existing_table not found" ).foreach { case (table, expectedMsg) => val msg = intercept[AnalysisException] { sql(s"DROP TABLE $table") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f1bb8d30eed99..ffb6993ccf1d1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1047,7 +1047,7 @@ class HiveDDLSuite sql("CREATE TABLE tab1(c1 int)") assertAnalysisError( "DROP VIEW tab1", - "tab1 is a table. 'DROP VIEW' expects a view. Please use DROP TABLE instead.") + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala index 0ca6184c9469e..8c6d718f18abb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala @@ -26,7 +26,7 @@ class DropTableSuite extends v1.DropTableSuiteBase with CommandSuiteBase { test("hive client calls") { withNamespaceAndTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id int) $defaultUsing") - checkHiveClientCalls(expected = 15) { + checkHiveClientCalls(expected = 11) { sql(s"DROP TABLE $t") } }