diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 4f34ca29ea658..cfc41c1f08d63 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -2306,6 +2306,11 @@ "CREATE TEMPORARY FUNCTION with IF NOT EXISTS is not allowed." ] }, + "DESCRIBE_PARTITION_FOR_VIEW" : { + "message" : [ + "DESCRIBE does not support partition for view." + ] + }, "EMPTY_PARTITION_VALUE" : { "message" : [ "Partition key must set value." @@ -2479,6 +2484,12 @@ ], "sqlState" : "42K09" }, + "INVALID_VIEW_CURRENT_CATALOG" : { + "message" : [ + "Invalid current catalog in view " + ], + "sqlState" : "XX000" + }, "INVALID_VIEW_TEXT" : { "message" : [ "The view cannot be displayed due to invalid view text: . This may be caused by an unauthorized modification of the view or an incorrect query syntax. Please check your query syntax and verify that the view has not been tampered with." @@ -2801,6 +2812,12 @@ ], "sqlState" : "0A000" }, + "NOT_SUPPORTED_COMMAND_FOR_V2_VIEW" : { + "message" : [ + " is not supported for v2 views." + ], + "sqlState" : "0A000" + }, "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT" : { "message" : [ " is not supported, if you want to enable it, please set \"spark.sql.catalogImplementation\" to \"hive\"." @@ -4420,11 +4437,6 @@ " is not a temp view of streaming logical plan, please use batch API such as `DataFrameReader.table` to read it." ] }, - "_LEGACY_ERROR_TEMP_1011" : { - "message" : [ - "Writing into a view is not allowed. View: ." - ] - }, "_LEGACY_ERROR_TEMP_1012" : { "message" : [ "Cannot write into v1 table: ." diff --git a/docs/sql-error-conditions-invalid-sql-syntax-error-class.md b/docs/sql-error-conditions-invalid-sql-syntax-error-class.md index 93bd5c24c9d3f..ad734a20fa9b8 100644 --- a/docs/sql-error-conditions-invalid-sql-syntax-error-class.md +++ b/docs/sql-error-conditions-invalid-sql-syntax-error-class.md @@ -41,6 +41,10 @@ CREATE TEMPORARY FUNCTION with specifying a database(``) is not allowe CREATE TEMPORARY FUNCTION with IF NOT EXISTS is not allowed. +## DESCRIBE_PARTITION_FOR_VIEW + +DESCRIBE does not support partition for view. + ## EMPTY_PARTITION_VALUE Partition key `` must set value. diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 248839666ef2a..431e885de9da0 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -1369,6 +1369,12 @@ Invalid usage of `` in ``. Variable type must be string type but got ``. +### INVALID_VIEW_CURRENT_CATALOG + +[SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error) + +Invalid current catalog `` in view `` + ### INVALID_VIEW_TEXT [SQLSTATE: XX000](sql-error-conditions-sqlstates.html#class-XX-internal-error) @@ -1610,6 +1616,12 @@ ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing ``'s column `` is not supported for v2 tables. +### NOT_SUPPORTED_COMMAND_FOR_V2_VIEW + +[SQLSTATE: 0A000](sql-error-conditions-sqlstates.html#class-0A-feature-not-supported) + +`` is not supported for v2 views. + ### NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT [SQLSTATE: 0A000](sql-error-conditions-sqlstates.html#class-0A-feature-not-supported) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a57fd7a31d303..ffb301a8ed4f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -236,6 +236,11 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor errorOnExceed = true, maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key) + /** + * Override to provide additional rules for the "Substitution" batch. + */ + val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = Nil + /** * Override to provide additional rules for the "Resolution" batch. */ @@ -256,16 +261,17 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor override def batches: Seq[Batch] = Seq( Batch("Substitution", fixedPoint, - new SubstituteExecuteImmediate(catalogManager), + new SubstituteExecuteImmediate(catalogManager) +: // This rule optimizes `UpdateFields` expression chains so looks more like optimization rule. // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be // very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early // at the beginning of analysis. - OptimizeUpdateFields, - CTESubstitution, - WindowsSubstitution, - EliminateUnions, - SubstituteUnresolvedOrdinals), + OptimizeUpdateFields +: + CTESubstitution +: + WindowsSubstitution +: + EliminateUnions +: + SubstituteUnresolvedOrdinals +: + extendedSubstitutionRules : _*), Batch("Disable Hints", Once, new ResolveHints.DisableHints), Batch("Hints", fixedPoint, @@ -1054,7 +1060,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // The view's child should be a logical plan parsed from the `desc.viewText`, the variable // `viewText` should be defined, or else we throw an error on the generation of the View // operator. - case view @ View(desc, isTempView, child) if !child.resolved => + case view @ View(V1ViewDescription(desc), isTempView, child) if !child.resolved => // Resolve all the UnresolvedRelations and Views in the child. val newChild = AnalysisContext.withAnalysisContext(desc) { val nestedViewDepth = AnalysisContext.get.nestedViewDepth @@ -1106,8 +1112,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor write.table match { case u: UnresolvedRelation if !u.isStreaming => resolveRelation(u).map(unwrapRelationPlan).map { - case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError( - v.desc.identifier, write) + case v: View => throw QueryCompilationErrors.expectTableNotViewError( + v.desc.identifier.nameParts, "V2WRITE", false, u) case r: DataSourceV2Relation => write.withNewTable(r) case u: UnresolvedCatalogRelation => throw QueryCompilationErrors.writeIntoV1TableNotAllowedError( @@ -1140,7 +1146,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor }.getOrElse(u) case u @ UnresolvedView(identifier, cmd, allowTemp, suggestAlternative) => - lookupTableOrView(identifier, viewOnly = true).map { + lookupTableOrView(identifier).map { case _: ResolvedTempView if !allowTemp => throw QueryCompilationErrors.expectPermanentViewNotTempViewError( identifier, cmd, u) @@ -1187,32 +1193,38 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor * Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is * for resolving DDL and misc commands. */ - private def lookupTableOrView( - identifier: Seq[String], - viewOnly: Boolean = false): Option[LogicalPlan] = { + private def lookupTableOrView(identifier: Seq[String]): Option[LogicalPlan] = { lookupTempView(identifier).map { tempView => ResolvedTempView(identifier.asIdentifier, tempView.tableMeta) }.orElse { expandIdentifier(identifier) match { - case CatalogAndIdentifier(catalog, ident) => - if (viewOnly && !CatalogV2Util.isSessionCatalog(catalog)) { - throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views") - } - CatalogV2Util.loadTable(catalog, ident).map { - case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) && - v1Table.v1Table.tableType == CatalogTableType.VIEW => - val v1Ident = v1Table.catalogTable.identifier - val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier) - ResolvedPersistentView( - catalog, v2Ident, v1Table.catalogTable) - case table => - ResolvedTable.create(catalog.asTableCatalog, ident, table) - } - case _ => None + case SessionCatalogAndIdentifier(catalog, ident) => + lookupTable(catalog, ident) + case NonSessionCatalogAndIdentifier(catalog, ident) => + lookupView(catalog, ident) + .orElse(lookupTable(catalog, ident)) + case _ => + None } } } + private def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = + CatalogV2Util.loadView(catalog, ident) + .map(V2ViewDescription(ident, _)) + .map(ResolvedPersistentView(catalog.asViewCatalog, ident, _)) + + private def lookupTable(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = + CatalogV2Util.loadTable(catalog, ident).map { + case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) && + v1Table.v1Table.tableType == CatalogTableType.VIEW => + val v1Ident = v1Table.catalogTable.identifier + val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier) + ResolvedPersistentView(catalog, v2Ident, V1ViewDescription(v1Table.catalogTable)) + case table => + ResolvedTable.create(catalog.asTableCatalog, ident, table) + } + private def createRelation( catalog: CatalogPlugin, ident: Identifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewDescription.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewDescription.scala new file mode 100644 index 0000000000000..2c789b82f1dbe --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewDescription.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.connector.catalog.{Identifier, View, ViewCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ArrayImplicits.SparkArrayOps + +/** + * A trait for view description. + */ +trait ViewDescription { + + val ident: Identifier + + // For backwards compatibility, we need to keep the `identifier` as a `TableIdentifier`. + val identifier: TableIdentifier + + val viewText: Option[String] + + val viewCatalogAndNamespace: Seq[String] + + val viewQueryColumnNames: Seq[String] + + val viewSQLConfigs: Map[String, String] + + val schema: StructType + + val properties: Map[String, String] + + def query: String = viewText.getOrElse("") + + def comment: Option[String] = properties.get(ViewCatalog.PROP_COMMENT) + + def owner: Option[String] = properties.get(ViewCatalog.PROP_OWNER) + + def createEngineVersion: Option[String] = properties.get(ViewCatalog.PROP_CREATE_ENGINE_VERSION) +} + +/** + * View description backed by a [[CatalogTable]]. + * + * @param metadata a CatalogTable + */ +case class V1ViewDescription(metadata: CatalogTable) extends ViewDescription { + + override val ident: Identifier = metadata.identifier.nameParts.asIdentifier + + override val identifier: TableIdentifier = metadata.identifier + + override val viewText: Option[String] = metadata.viewText + + override val viewCatalogAndNamespace: Seq[String] = metadata.viewCatalogAndNamespace + + override val viewQueryColumnNames: Seq[String] = metadata.viewQueryColumnNames + + override val viewSQLConfigs: Map[String, String] = metadata.viewSQLConfigs + + override val schema: StructType = metadata.schema + + override val properties: Map[String, String] = metadata.properties +} + +/** + * View description backed by a V2 [[View]]. + * + * @param view a view in V2 catalog + */ +case class V2ViewDescription( + override val ident: Identifier, + view: View) extends ViewDescription { + + override val identifier: TableIdentifier = ident.asTableIdentifier + + override val viewText: Option[String] = Option(view.query) + + override val viewCatalogAndNamespace: Seq[String] = + view.currentCatalog +: view.currentNamespace.toSeq + + override val viewQueryColumnNames: Seq[String] = view.schema.fieldNames.toImmutableArraySeq + + override val viewSQLConfigs: Map[String, String] = Map.empty + + override val schema: StructType = view.schema + + override val properties: Map[String, String] = + view.properties.asScala.toMap -- ViewCatalog.RESERVED_PROPERTIES.asScala +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewSubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewSubstitution.scala new file mode 100644 index 0000000000000..7dfd402a69b6d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewSubstitution.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{View => V2View, _} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * Substitute persisted views in parsed plans with parsed view sql text. + */ +case class ViewSubstitution( + catalogManager: CatalogManager, + sqlParser: ParserInterface) extends Rule[LogicalPlan] with LookupCatalog { + + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case u @ UnresolvedRelation(nameParts, _, _) if isTempView(nameParts) => + u + case u @ UnresolvedRelation(parts @ NonSessionCatalogAndIdentifier(catalog, ident), _, _) + if !isSQLOnFile(parts) => + CatalogV2Util.loadView(catalog, ident) + .map(createViewRelation(ident, _)) + .getOrElse(u) + } + + private def isTempView(parts: Seq[String]): Boolean = + catalogManager.v1SessionCatalog.isTempView(parts) + + private def isSQLOnFile(parts: Seq[String]): Boolean = parts match { + case Seq(_, path) if path.contains("/") => true + case _ => false + } + + private def createViewRelation(ident: Identifier, view: V2View): LogicalPlan = { + if (!catalogManager.isCatalogRegistered(view.currentCatalog)) { + throw QueryCompilationErrors.invalidViewCurrentCatalog( + view.currentCatalog, ident.asMultipartIdentifier) + } + + val child = try { + sqlParser.parsePlan(view.query) + } catch { + case _: ParseException => + throw QueryCompilationErrors.invalidViewText(view.query, ident.quoted) + } + + val desc = view + val catalogAndNamespace = Option(view.currentCatalog) + .map(_ +: view.currentNamespace.toSeq) + val qualifiedChild = catalogAndNamespace match { + case None => + // Views from Spark 2.2 or prior do not store catalog or namespace, + // however its sql text should already be fully qualified. + child + case Some(catalogAndNamespace) => + // Substitute CTEs within the view before qualifying table identifiers + qualifyTableIdentifiers(CTESubstitution.apply(child), catalogAndNamespace) + } + + // The relation is a view, so we wrap the relation by: + // 1. Add a [[View]] operator over the relation to keep track of the view desc; + // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. + SubqueryAlias(ident.quoted, View(V2ViewDescription(ident, desc), false, qualifiedChild)) + } + + /** + * Qualify table identifiers with default catalog and namespace if necessary. + */ + private def qualifyTableIdentifiers( + child: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = + child transform { + case u @ UnresolvedRelation(Seq(table), _, _) => + u.copy(multipartIdentifier = catalogAndNamespace :+ table) + case u @ UnresolvedRelation(parts, _, _) + if !catalogManager.isCatalogRegistered(parts.head) => + u.copy(multipartIdentifier = catalogAndNamespace.head +: parts) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index ecdf40e87a894..a69f02470e55e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, UNRESOLVED_FUNC} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, Table, TableCatalog, ViewCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.catalog.functions.UnboundFunction @@ -196,12 +196,19 @@ case class ResolvedFieldPosition(position: ColumnPosition) extends FieldPosition /** * A plan containing resolved persistent views. */ -// TODO: create a generic representation for views, after we add view support to v2 catalog. For now -// we only hold the view schema. case class ResolvedPersistentView( catalog: CatalogPlugin, identifier: Identifier, - metadata: CatalogTable) extends LeafNodeWithoutStats { + metadata: ViewDescription) extends LeafNodeWithoutStats { + override def output: Seq[Attribute] = Nil +} + +/** + * A plan containing resolved identifier with view catalog determined. + */ +case class ResolvedViewIdentifier( + catalog: ViewCatalog, + identifier: Identifier) extends LeafNodeWithoutStats { override def output: Seq[Attribute] = Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 497f485b67fe2..66688aba4d7ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.{AliasIdentifier, SQLConfHelper} -import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode} +import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode, V1ViewDescription, ViewDescription} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN import org.apache.spark.sql.catalyst.expressions._ @@ -711,22 +711,23 @@ case class InsertIntoDir( } /** - * A container for holding the view description(CatalogTable) and info whether the view is temporary + * A container for holding the view description, and the output of the view. The + * child should be a logical plan parsed from the view text. + * A container for holding the view description and info whether the view is temporary * or not. If it's a SQL (temp) view, the child should be a logical plan parsed from the - * `CatalogTable.viewText`. Otherwise, the view is a temporary one created from a dataframe and the + * `viewText`. Otherwise, the view is a temporary one created from a dataframe and the * view description should contain a `VIEW_CREATED_FROM_DATAFRAME` property; in this case, the child * must be already resolved. * * This operator will be removed at the end of analysis stage. * - * @param desc A view description(CatalogTable) that provides necessary information to resolve the - * view. + * @param desc A view description that provides necessary information to resolve the view. * @param isTempView A flag to indicate whether the view is temporary or not. * @param child The logical plan of a view operator. If the view description is available, it should - * be a logical plan parsed from the `CatalogTable.viewText`. + * be a logical plan parsed from the view text. */ case class View( - desc: CatalogTable, + desc: ViewDescription, isTempView: Boolean, child: LogicalPlan) extends UnaryNode { require(!isTempViewStoringAnalyzedPlan || child.resolved) @@ -736,7 +737,11 @@ case class View( override def metadataOutput: Seq[Attribute] = Nil override def simpleString(maxFields: Int): String = { - s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})" + val identifierString = desc match { + case _: V1ViewDescription => desc.identifier.toString + case _ => desc.ident.toString + } + s"View ($identifierString, ${output.mkString("[", ",", "]")})" } override def doCanonicalize(): LogicalPlan = child match { @@ -768,6 +773,9 @@ case class View( } object View { + def apply(desc: CatalogTable, isTempView: Boolean, child: LogicalPlan): View = + View(V1ViewDescription(desc), isTempView, child) + def effectiveSQLConf(configs: Map[String, String], isTempView: Boolean): SQLConf = { val activeConf = SQLConf.get // For temporary view, we always use captured sql configs diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index 2b712241633be..ba1f772257d2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -108,6 +108,13 @@ private[sql] object CatalogV2Implicits { throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "tables") } + def asViewCatalog: ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") + } + def asNamespaceCatalog: SupportsNamespaces = plugin match { case namespaceCatalog: SupportsNamespaces => namespaceCatalog diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 06887b0b95038..8f70586f78935 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.CurrentUserContext -import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} +import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, NoSuchViewException, TimeTravelSpec} import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} import org.apache.spark.sql.catalyst.util.GeneratedColumn @@ -341,6 +341,20 @@ private[sql] object CatalogV2Util { case _: NoSuchNamespaceException => None } + def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = + try { + catalog match { + case viewCatalog: ViewCatalog => + Option(viewCatalog.loadView(ident)) + case _ => + None + } + } catch { + case _: NoSuchViewException => None + case _: NoSuchDatabaseException => None + case _: NoSuchNamespaceException => None + } + def getTable( catalog: CatalogPlugin, ident: Identifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index e8235fd104668..b315997b36ebe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkUnsupportedOperationException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex} +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, NoSuchViewException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, CreateStruct, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition} @@ -470,8 +470,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1011", - messageParameters = Map("identifier" -> identifier.toString), + errorClass = "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE", + messageParameters = Map( + "viewName" -> toSQLId(identifier.nameParts), + "operation" -> "V2WRITE"), origin = t.origin) } @@ -1382,6 +1384,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat new NoSuchTableException(nameParts) } + def noSuchViewError(ident: Identifier): NoSuchViewException = { + new NoSuchViewException(ident) + } + def noSuchNamespaceError(namespace: Array[String]): Throwable = { new NoSuchNamespaceException(namespace) } @@ -2913,11 +2919,23 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def cannotCreateViewTooManyColumnsError( viewIdent: TableIdentifier, expected: Seq[String], + query: LogicalPlan): Throwable = + cannotCreateViewTooManyColumnsError(viewIdent.nameParts, expected, query) + + def cannotCreateViewTooManyColumnsError( + viewIdent: Identifier, + expected: Seq[String], + query: LogicalPlan): Throwable = + cannotCreateViewTooManyColumnsError(viewIdent.asMultipartIdentifier, expected, query) + + def cannotCreateViewTooManyColumnsError( + viewIdent: Seq[String], + expected: Seq[String], query: LogicalPlan): Throwable = { new AnalysisException( errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", messageParameters = Map( - "viewName" -> toSQLId(viewIdent.nameParts), + "viewName" -> toSQLId(viewIdent), "viewColumns" -> expected.map(c => toSQLId(c)).mkString(", "), "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) } @@ -2925,11 +2943,23 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def cannotCreateViewNotEnoughColumnsError( viewIdent: TableIdentifier, expected: Seq[String], + query: LogicalPlan): Throwable = + cannotCreateViewNotEnoughColumnsError(viewIdent.nameParts, expected, query) + + def cannotCreateViewNotEnoughColumnsError( + viewIdent: Identifier, + expected: Seq[String], + query: LogicalPlan): Throwable = + cannotCreateViewNotEnoughColumnsError(viewIdent.asMultipartIdentifier, expected, query) + + def cannotCreateViewNotEnoughColumnsError( + viewIdent: Seq[String], + expected: Seq[String], query: LogicalPlan): Throwable = { new AnalysisException( errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", messageParameters = Map( - "viewName" -> toSQLId(viewIdent.nameParts), + "viewName" -> toSQLId(viewIdent), "viewColumns" -> expected.map(c => toSQLId(c)).mkString(", "), "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) } @@ -2968,56 +2998,108 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def recursiveViewDetectedError( viewIdent: TableIdentifier, - newPath: Seq[TableIdentifier]): Throwable = { + newPath: Seq[TableIdentifier]): Throwable = + recursiveViewDetectedError(viewIdent.nameParts, newPath.map(_.nameParts)) + + def recursiveViewDetectedError( + viewIdent: Identifier, + newPath: Seq[Identifier]): Throwable = + recursiveViewDetectedError( + viewIdent.asMultipartIdentifier, + newPath.map(_.asMultipartIdentifier)) + + def recursiveViewDetectedError( + viewIdent: Seq[String], + newPath: Seq[Seq[String]]): Throwable = new AnalysisException( errorClass = "RECURSIVE_VIEW", messageParameters = Map( - "viewIdent" -> toSQLId(viewIdent.nameParts), - "newPath" -> newPath.map(p => toSQLId(p.nameParts)).mkString(" -> "))) - } + "viewIdent" -> toSQLId(viewIdent), + "newPath" -> newPath.map(toSQLId(_)).mkString(" -> "))) def notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError( name: TableIdentifier, + attr: Attribute): Throwable = + notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError(name.nameParts, attr) + + def notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError( + name: Identifier, + attr: Attribute): Throwable = + notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError( + name.asMultipartIdentifier, attr) + + def notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError( + name: Seq[String], attr: Attribute): Throwable = { new AnalysisException( errorClass = "CREATE_PERMANENT_VIEW_WITHOUT_ALIAS", messageParameters = Map( - "name" -> toSQLId(name.nameParts), + "name" -> toSQLId(name), "attr" -> toSQLExpr(attr))) } def notAllowedToCreatePermanentViewByReferencingTempViewError( name: TableIdentifier, - nameParts: String): Throwable = { + nameParts: String): Throwable = + notAllowedToCreatePermanentViewByReferencingTempViewError(name.nameParts, nameParts) + + def notAllowedToCreatePermanentViewByReferencingTempViewError( + name: Identifier, + nameParts: String): Throwable = + notAllowedToCreatePermanentViewByReferencingTempViewError(name.asMultipartIdentifier, nameParts) + + def notAllowedToCreatePermanentViewByReferencingTempViewError( + objName: Seq[String], + tempObjName: String): Throwable = { new AnalysisException( errorClass = "INVALID_TEMP_OBJ_REFERENCE", messageParameters = Map( "obj" -> "VIEW", - "objName" -> toSQLId(name.nameParts), + "objName" -> toSQLId(objName), "tempObj" -> "VIEW", - "tempObjName" -> toSQLId(nameParts))) + "tempObjName" -> toSQLId(tempObjName))) } def notAllowedToCreatePermanentViewByReferencingTempFuncError( name: TableIdentifier, + funcName: String): Throwable = + notAllowedToCreatePermanentViewByReferencingTempFuncError(name.nameParts, funcName) + + def notAllowedToCreatePermanentViewByReferencingTempFuncError( + name: Identifier, + funcName: String): Throwable = + notAllowedToCreatePermanentViewByReferencingTempFuncError(name.asMultipartIdentifier, funcName) + + def notAllowedToCreatePermanentViewByReferencingTempFuncError( + objName: Seq[String], funcName: String): Throwable = { new AnalysisException( errorClass = "INVALID_TEMP_OBJ_REFERENCE", messageParameters = Map( "obj" -> "VIEW", - "objName" -> toSQLId(name.nameParts), + "objName" -> toSQLId(objName), "tempObj" -> "FUNCTION", "tempObjName" -> toSQLId(funcName))) } def notAllowedToCreatePermanentViewByReferencingTempVarError( name: TableIdentifier, + varName: String): Throwable = + notAllowedToCreatePermanentViewByReferencingTempVarError(name.nameParts, varName) + + def notAllowedToCreatePermanentViewByReferencingTempVarError( + name: Identifier, + varName: String): Throwable = + notAllowedToCreatePermanentViewByReferencingTempVarError(name.asMultipartIdentifier, varName) + + def notAllowedToCreatePermanentViewByReferencingTempVarError( + objName: Seq[String], varName: String): Throwable = { new AnalysisException( errorClass = "INVALID_TEMP_OBJ_REFERENCE", messageParameters = Map( "obj" -> "VIEW", - "objName" -> toSQLId(name.nameParts), + "objName" -> toSQLId(objName), "tempObj" -> "VARIABLE", "tempObjName" -> toSQLId(varName))) } @@ -3396,6 +3478,14 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "viewName" -> toSQLId(viewName))) } + def invalidViewCurrentCatalog(currentCatalog: String, viewName: Seq[String]): Throwable = { + throw new AnalysisException( + errorClass = "INVALID_VIEW_CURRENT_CATALOG", + messageParameters = Map( + "currentCatalog" -> currentCatalog, + "viewName" -> toSQLId(viewName))) + } + def invalidTimeTravelSpecError(): Throwable = { new AnalysisException( errorClass = "INVALID_TIME_TRAVEL_SPEC", @@ -3927,4 +4017,19 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "dsSchema" -> toSQLType(dsSchema), "expectedSchema" -> toSQLType(expectedSchema))) } + + def describeDoesNotSupportPartitionForViewError(): Throwable = { + new AnalysisException( + errorClass = "INVALID_SQL_SYNTAX.DESCRIBE_PARTITION_FOR_VIEW", + messageParameters = Map.empty) + } + + private def notSupportedForV2ViewsError(cmd: String): Throwable = { + new AnalysisException( + errorClass = "NOT_SUPPORTED_COMMAND_FOR_V2_VIEW", + messageParameters = Map("cmd" -> toSQLStmt(cmd))) + } + + def showCreateTableAsSerdeNotSupportedForV2ViewsError(): Throwable = + notSupportedForV2ViewsError("SHOW CREATE TABLE AS SERDE") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index c00a433180357..c97325fb4b0e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -220,12 +220,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case DropView(ResolvedV1Identifier(ident), ifExists) => DropTableCommand(ident, ifExists, isView = true, purge = false) - case DropView(r @ ResolvedIdentifier(catalog, ident), _) => - if (catalog == FakeSystemCatalog) { - DropTempViewCommand(ident) - } else { - throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views") - } + case DropView(ResolvedIdentifier(FakeSystemCatalog, ident), _) => + DropTempViewCommand(ident) case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command => val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) @@ -391,8 +387,18 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) replace = replace, viewType = PersistedView) - case CreateView(ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _) => - throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "views") + case CreateView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment, + properties, originalText, child, allowExisting, replace) if isSessionCatalog(catalog) => + CreateViewCommand( + name = ident.asTableIdentifier, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + originalText = originalText, + plan = child, + allowExisting = allowExisting, + replace = replace, + viewType = PersistedView) case ShowViews(ns: ResolvedNamespace, pattern, output) => ns match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterViewExec.scala new file mode 100644 index 0000000000000..3f9e2e79c1e9f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterViewExec.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog, ViewChange} + +/** + * Physical plan node for altering a view. + */ +case class AlterViewExec( + catalog: ViewCatalog, + ident: Identifier, + changes: Seq[ViewChange]) extends LeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + try { + catalog.alterView(ident, changes: _*) + } catch { + case e: IllegalArgumentException => + throw new SparkException(s"Invalid view change: ${e.getMessage}", e) + case e: UnsupportedOperationException => + throw new SparkException(s"Unsupported view change: ${e.getMessage}", e) + } + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateViewExec.scala new file mode 100644 index 0000000000000..60aa2cd863a2b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateViewExec.scala @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, VariableReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog, ViewInfo} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.util.ArrayImplicits.SparkArrayOps + +/** + * Physical plan node for creating a view. + */ +case class CreateViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override protected def run(): Seq[InternalRow] = { + val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + + if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + ident, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + ident, userSpecifiedColumns.map(_._1), analyzedPlan) + } + } + + // When creating a permanent view, not allowed to reference temporary objects. + // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved) + verifyTemporaryObjectsNotExists(ident, analyzedPlan) + verifyAutoGeneratedAliasesNotExists(analyzedPlan, ident) + + val queryOutput = analyzedPlan.schema.fieldNames + // Generate the query column names, + // throw an AnalysisException if there exists duplicate column names. + SchemaUtils.checkColumnNameDuplication(queryOutput.toImmutableArraySeq, SQLConf.get.resolver) + + if (replace) { + // Detect cyclic view reference on CREATE OR REPLACE VIEW or ALTER VIEW AS. + checkCyclicViewReference(analyzedPlan, Seq(ident), ident) + } + + val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema + val columnAliases = userSpecifiedColumns.map(_._1).toArray + val columnComments = userSpecifiedColumns.map(_._2.getOrElse(null)).toArray + + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val createEngineVersion = if (replace) None else Some(engineVersion) + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) ++ + createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) + + (ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + val catalogManager = session.sessionState.catalogManager + val currentCatalog = catalogManager.currentCatalog.name + val currentNamespace = catalogManager.currentNamespace + + val viewInfo = new ViewInfo( + ident, + originalText, + currentCatalog, + currentNamespace, + viewSchema, + queryOutput, + columnAliases, + columnComments, + newProperties.asJava + ) + + if (replace) { + // CREATE OR REPLACE VIEW + catalog.replaceView(viewInfo, true) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + catalog.createView(viewInfo) + } catch { + case _: ViewAlreadyExistsException if allowExisting => // Ignore + } + } + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + private def aliasPlan( + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + val projectedPlan = Project(projectList, analyzedPlan) + session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed + } + } + + private def checkCyclicViewReference( + plan: LogicalPlan, + path: Seq[Identifier], + viewIdent: Identifier): Unit = { + plan match { + case v: View => + val ident = v.desc.ident + val newPath = path :+ ident + // If the table identifier equals to the `viewIdent`, current view node is the same with + // the altered view. We detect a view reference cycle, should throw an AnalysisException. + if (ident == viewIdent) { + throw QueryCompilationErrors.recursiveViewDetectedError(viewIdent, newPath) + } else { + v.children.foreach { child => + checkCyclicViewReference(child, newPath, viewIdent) + } + } + case _ => + plan.children.foreach(child => checkCyclicViewReference(child, path, viewIdent)) + } + + // Detect cyclic references from subqueries. + plan.expressions.foreach { expr => + expr match { + case s: SubqueryExpression => + checkCyclicViewReference(s.plan, path, viewIdent) + case _ => // Do nothing. + } + } + } + + private def verifyAutoGeneratedAliasesNotExists(child: LogicalPlan, name: Identifier): Unit = + if (!conf.allowAutoGeneratedAliasForView) { + child.output.foreach { attr => + if (attr.metadata.contains("__autoGeneratedAlias")) { + throw QueryCompilationErrors + .notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError(name, attr) + } + } + } + + /** + * Permanent views are not allowed to reference temp objects, including temp function and views + */ + private def verifyTemporaryObjectsNotExists( + name: Identifier, + child: LogicalPlan): Unit = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val tempViews = collectTemporaryViews(child) + tempViews.foreach { nameParts => + throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempViewError( + name, nameParts.quoted) + } + val tempVars = collectTemporaryVariables(child) + tempVars.foreach { nameParts => + throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempVarError( + name, nameParts.quoted) + } + } + + + /** + * Collect all temporary views and return the identifiers separately. + */ + private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = { + def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { + child.flatMap { + case view: View if view.isTempView => Seq(view.desc.ident.asMultipartIdentifier) + case plan => plan.expressions.flatMap(_.flatMap { + case e: SubqueryExpression => collectTempViews(e.plan) + case _ => Seq.empty + }) + }.distinct + } + collectTempViews(child) + } + + /** + * Collect all temporary SQL variables and return the identifiers separately. + */ + private def collectTemporaryVariables(child: LogicalPlan): Seq[Seq[String]] = { + def collectTempVars(child: LogicalPlan): Seq[Seq[String]] = { + child.flatMap { plan => + plan.expressions.flatMap(_.flatMap { + case e: SubqueryExpression => collectTempVars(e.plan) + case r: VariableReference => Seq(r.originalNameParts) + case _ => Seq.empty + }) + }.distinct + } + collectTempVars(child) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index fe3140c8030ac..77ce7acda39df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, Strategy} -import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} +import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedPersistentView, ResolvedTable, ResolvedViewIdentifier} import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression} @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} -import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable, ViewChange} import org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} @@ -514,6 +514,68 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val table = a.table.asInstanceOf[ResolvedTable] AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil + case CreateView(ResolvedViewIdentifier(catalog, ident), userSpecifiedColumns, comment, + properties, Some(originalText), query, allowExisting, replace) => + CreateViewExec( + catalog = catalog, + ident = ident, + originalText = originalText, + query = query, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) :: Nil + + case AlterViewAs(ResolvedViewIdentifier(catalog, ident), originalText, query) => + CreateViewExec( + catalog = catalog, + ident = ident, + originalText = originalText, + query = query, + userSpecifiedColumns = Seq.empty, + comment = None, + properties = Map.empty, + allowExisting = false, + replace = true) :: Nil + + case DropView(ResolvedViewIdentifier(catalog, ident), ifExists) => + DropViewExec(catalog, ident, ifExists) :: Nil + + case RenameTable(ResolvedViewIdentifier(catalog, oldIdent), newIdent, true) => + RenameViewExec(catalog, oldIdent, newIdent.asIdentifier) :: Nil + + case SetViewProperties(ResolvedViewIdentifier(catalog, ident), props) => + val changes = props.map { + case (property, value) => ViewChange.setProperty(property, value) + }.toSeq + AlterViewExec(catalog, ident, changes) :: Nil + + case UnsetViewProperties(ResolvedViewIdentifier(catalog, ident), propertyKeys, ifExists) => + if (!ifExists) { + val view = catalog.loadView(ident) + propertyKeys.filterNot(view.properties.containsKey).foreach { property => + QueryCompilationErrors.insufficientTablePropertyError(property) + } + } + val changes = propertyKeys.map(ViewChange.removeProperty) + AlterViewExec(catalog, ident, changes) :: Nil + + case DescribeRelation(ResolvedPersistentView(_, _, desc), partitionSpec, isExtended, output) => + if (partitionSpec.nonEmpty) { + throw QueryCompilationErrors.describeDoesNotSupportPartitionForViewError() + } + DescribeViewExec(desc, output, isExtended) :: Nil + + case ShowCreateTable(ResolvedPersistentView(_, _, desc), asSerde, output) => + if (asSerde) { + throw QueryCompilationErrors.showCreateTableAsSerdeNotSupportedForV2ViewsError() + } + ShowCreateViewExec(desc, output) :: Nil + + case ShowTableProperties(ResolvedPersistentView(_, _, desc), propertyKey, output) => + ShowViewPropertiesExec(desc, propertyKey, output) :: Nil + case CreateIndex(ResolvedTable(_, _, table, _), indexName, indexType, ifNotExists, columns, properties) => table match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeViewExec.scala new file mode 100644 index 0000000000000..0078e32f6c6bb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeViewExec.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +case class DescribeViewExec( + desc: ViewDescription, + output: Seq[Attribute], + isExtended: Boolean) extends V2CommandExec with LeafExecNode { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override protected def run(): Seq[InternalRow] = + if (isExtended) { + (describeSchema :+ emptyRow) ++ describeExtended + } else { + describeSchema + } + + private def describeSchema: Seq[InternalRow] = + desc.schema.map { column => + toCatalystRow( + column.name, + column.dataType.simpleString, + column.getComment().getOrElse("")) + } + + private def emptyRow: InternalRow = toCatalystRow("", "", "") + + private def describeExtended: Seq[InternalRow] = { + val outputColumns = desc.viewQueryColumnNames.mkString("[", ", ", "]") + val tableProperties = desc.properties + .map(p => p._1 + "=" + p._2) + .mkString("[", ", ", "]") + + toCatalystRow("# Detailed View Information", "", "") :: + toCatalystRow("Owner", desc.owner.getOrElse(""), "") :: + toCatalystRow("Comment", desc.comment.getOrElse(""), "") :: + toCatalystRow("View Text", desc.viewText.getOrElse(""), "") :: + toCatalystRow("View Catalog and Namespace", desc.viewCatalogAndNamespace.quoted, "") :: + toCatalystRow("View Query Output Columns", outputColumns, "") :: + toCatalystRow("Table Properties", tableProperties, "") :: + toCatalystRow("Created By", desc.createEngineVersion.getOrElse(""), "") :: + Nil + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropViewExec.scala new file mode 100644 index 0000000000000..204fa750ebf74 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropViewExec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog} +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * Physical plan node for dropping a view. + */ +case class DropViewExec( + catalog: ViewCatalog, + ident: Identifier, + ifExists: Boolean) extends LeafV2CommandExec { + + override def run(): Seq[InternalRow] = { + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } else if (!ifExists) { + throw QueryCompilationErrors.noSuchViewError(ident) + } + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameViewExec.scala new file mode 100644 index 0000000000000..8426e9f060e25 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameViewExec.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog} + +/** + * Physical plan node for renaming a view. + */ +case class RenameViewExec( + catalog: ViewCatalog, + oldIdent: Identifier, + newIdent: Identifier) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Seq.empty + + override protected def run(): Seq[InternalRow] = { + catalog.renameView(oldIdent, newIdent) + + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateViewExec.scala new file mode 100644 index 0000000000000..61d766cc5b65f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateViewExec.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for showing view create statement. + */ +case class ShowCreateViewExec( + desc: ViewDescription, + output: Seq[Attribute]) extends V2CommandExec with LeafExecNode { + + override protected def run(): Seq[InternalRow] = { + val schema = desc.schema.map(_.name).mkString("(", ", ", ")") + val create = s"CREATE VIEW ${desc.ident} $schema AS\n${desc.query}\n" + Seq(toCatalystRow(create)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowViewPropertiesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowViewPropertiesExec.scala new file mode 100644 index 0000000000000..8142e58539149 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowViewPropertiesExec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for showing view properties. + */ +case class ShowViewPropertiesExec( + desc: ViewDescription, + propertyKey: Option[String], + output: Seq[Attribute]) extends V2CommandExec with LeafExecNode { + + override protected def run(): Seq[InternalRow] = { + propertyKey match { + case Some(p) => + val propValue = desc.properties.getOrElse(p, + s"View ${desc.ident} does not have property: $p") + Seq(toCatalystRow(propValue)) + case None => + desc.properties.map { + case (k, v) => toCatalystRow(k, v) + }.toSeq + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 00c72294ca071..26c30e7c027d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.internal import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _} import org.apache.spark.sql.artifact.ArtifactManager -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, ReplaceCharWithVarchar, ResolveSessionCatalog, TableFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, ReplaceCharWithVarchar, ResolveSessionCatalog, TableFunctionRegistry, ViewSubstitution} import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.Optimizer @@ -197,6 +197,10 @@ abstract class BaseSessionStateBuilder( * Note: this depends on the `conf` and `catalog` fields. */ protected def analyzer: Analyzer = new Analyzer(catalogManager) { + + override val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = + Seq(ViewSubstitution(this.catalogManager, sqlParser)) + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: @@ -226,6 +230,16 @@ abstract class BaseSessionStateBuilder( customCheckRules } + /** + * Custom substitution rules to add to the Analyzer. Prefer overriding this instead of creating + * your own Analyzer. + * + * Note that this may NOT depend on the `analyzer` function. + */ + protected def customSubstitutionRules: Seq[Rule[LogicalPlan]] = { + extensions.buildResolutionRules(session) + } + /** * Custom resolution rules to add to the Analyzer. Prefer overriding this instead of creating * your own Analyzer. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 44d47abc93faa..9bb192373071c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -174,10 +174,13 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo test("Append: fail if it writes to a view") { spark.sql("CREATE VIEW v AS SELECT 1") - val exc = intercept[AnalysisException] { - spark.table("source").writeTo("v").append() - } - assert(exc.getMessage.contains("Writing into a view is not allowed")) + checkError( + exception = intercept[AnalysisException] { + spark.table("source").writeTo("v").append() + }, + errorClass = "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE", + parameters = Map("viewName" -> "`spark_catalog`.`default`.`v`", "operation" -> "V2WRITE") + ) } test("Append: fail if it writes to a v1 table") { @@ -279,10 +282,13 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo test("Overwrite: fail if it writes to a view") { spark.sql("CREATE VIEW v AS SELECT 1") - val exc = intercept[AnalysisException] { - spark.table("source").writeTo("v").overwrite(lit(true)) - } - assert(exc.getMessage.contains("Writing into a view is not allowed")) + checkError( + exception = intercept[AnalysisException] { + spark.table("source").writeTo("v").overwrite(lit(true)) + }, + errorClass = "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE", + parameters = Map("viewName" -> "`spark_catalog`.`default`.`v`", "operation" -> "V2WRITE") + ) } test("Overwrite: fail if it writes to a v1 table") { @@ -384,10 +390,13 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo test("OverwritePartitions: fail if it writes to a view") { spark.sql("CREATE VIEW v AS SELECT 1") - val exc = intercept[AnalysisException] { - spark.table("source").writeTo("v").overwritePartitions() - } - assert(exc.getMessage.contains("Writing into a view is not allowed")) + checkError( + exception = intercept[AnalysisException] { + spark.table("source").writeTo("v").overwritePartitions() + }, + errorClass = "EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE", + parameters = Map("viewName" -> "`spark_catalog`.`default`.`v`", "operation" -> "V2WRITE") + ) } test("OverwritePartitions: fail if it writes to a v1 table") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 589283a29b852..3a467fecd4cf1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2290,14 +2290,6 @@ class DataSourceV2SQLSuiteV1Filter } } - test("AlterTable: renaming views are not supported") { - val e = intercept[AnalysisException] { - sql(s"ALTER VIEW testcat.ns.tbl RENAME TO ns.view") - } - checkErrorTableNotFound(e, "`testcat`.`ns`.`tbl`", - ExpectedContext("testcat.ns.tbl", 11, 10 + "testcat.ns.tbl".length)) - } - test("ANALYZE TABLE") { val t = "testcat.ns1.ns2.tbl" withTable(t) { @@ -2384,16 +2376,6 @@ class DataSourceV2SQLSuiteV1Filter } } - test("CREATE VIEW") { - val v = "testcat.ns1.ns2.v" - checkError( - exception = intercept[AnalysisException] { - sql(s"CREATE VIEW $v AS SELECT 1") - }, - errorClass = "_LEGACY_ERROR_TEMP_1184", - parameters = Map("plugin" -> "testcat", "ability" -> "views")) - } - test("global temp view should not be masked by v2 catalog") { val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) registerCatalog(globalTempDB, classOf[InMemoryTableCatalog]) @@ -2732,21 +2714,6 @@ class DataSourceV2SQLSuiteV1Filter } } - test("View commands are not supported in v2 catalogs") { - def validateViewCommand(sqlStatement: String): Unit = { - val e = intercept[AnalysisException](sql(sqlStatement)) - checkError( - e, - errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION", - parameters = Map("catalogName" -> "`testcat`", "operation" -> "views")) - } - - validateViewCommand("DROP VIEW testcat.v") - validateViewCommand("ALTER VIEW testcat.v SET TBLPROPERTIES ('key' = 'val')") - validateViewCommand("ALTER VIEW testcat.v UNSET TBLPROPERTIES ('key')") - validateViewCommand("ALTER VIEW testcat.v AS SELECT 1") - } - test("SPARK-33924: INSERT INTO .. PARTITION preserves the partition location") { val t = "testpart.ns1.ns2.tbl" withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index db6c7175c526f..67b03c4cda274 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -753,16 +753,6 @@ class PlanResolutionSuite extends AnalysisTest { DropTempViewCommand(tempViewIdent)) } - test("drop view in v2 catalog") { - val e = intercept[AnalysisException] { - parseAndResolve("DROP VIEW testcat.db.view", checkAnalysis = true) - } - checkError( - e, - errorClass = "UNSUPPORTED_FEATURE.CATALOG_OPERATION", - parameters = Map("catalogName" -> "`testcat`", "operation" -> "views")) - } - // ALTER VIEW view_name SET TBLPROPERTIES ('comment' = new_comment); // ALTER VIEW view_name UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key'); test("alter view: alter view properties") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 32100d060b092..6d2312d5fce50 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, ReplaceCharWithVarchar, ResolveSessionCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, ReplaceCharWithVarchar, ResolveSessionCatalog, ViewSubstitution} import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, InvalidUDFClassException} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -84,6 +84,10 @@ class HiveSessionStateBuilder( * A logical query plan `Analyzer` with rules specific to Hive. */ override protected def analyzer: Analyzer = new Analyzer(catalogManager) { + + override val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = + Seq(ViewSubstitution(this.catalogManager, sqlParser)) + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: