diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 154587eeff7a8..0f985d16d7fbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields +import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, StringUtils} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{View => _, _} +import org.apache.spark.sql.connector.catalog.{View => V2View, _} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition} import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, ScalarFunction, UnboundFunction} @@ -239,6 +240,11 @@ class Analyzer(override val catalogManager: CatalogManager) errorOnExceed = true, maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key) + /** + * Override to provide additional rules for the "Substitution" batch. + */ + val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = Nil + /** * Override to provide additional rules for the "Resolution" batch. */ @@ -263,11 +269,12 @@ class Analyzer(override val catalogManager: CatalogManager) // However, when manipulating deeply nested schema, `UpdateFields` expression tree could be // very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early // at the beginning of analysis. - OptimizeUpdateFields, - CTESubstitution, - WindowsSubstitution, - EliminateUnions, - SubstituteUnresolvedOrdinals), + OptimizeUpdateFields +: + CTESubstitution +: + WindowsSubstitution +: + EliminateUnions +: + SubstituteUnresolvedOrdinals +: + extendedSubstitutionRules : _*), Batch("Disable Hints", Once, new ResolveHints.DisableHints), Batch("Hints", fixedPoint, @@ -451,6 +458,74 @@ class Analyzer(override val catalogManager: CatalogManager) } } + /** + * Substitute persisted views in parsed plans with parsed view sql text. + */ + case class ViewSubstitution(sqlParser: ParserInterface) extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case u @ UnresolvedRelation(nameParts, _, _) if v1SessionCatalog.isTempView(nameParts) => + u + case u @ UnresolvedRelation( + parts @ NonSessionCatalogAndIdentifier(catalog, ident), _, _) if !isSQLOnFile(parts) => + CatalogV2Util.loadView(catalog, ident) + .map(createViewRelation(parts.quoted, _)) + .getOrElse(u) + } + + private def isSQLOnFile(parts: Seq[String]): Boolean = parts match { + case Seq(_, path) if path.contains("/") => true + case _ => false + } + + private def createViewRelation(name: String, view: V2View): LogicalPlan = { + if (!catalogManager.isCatalogRegistered(view.currentCatalog)) { + throw new AnalysisException( + s"Invalid current catalog '${view.currentCatalog}' in view '$name'") + } + + val child = parseViewText(name, view.query) + val desc = V2ViewDescription(name, view) + val qualifiedChild = desc.viewCatalogAndNamespace match { + case Seq() => + // Views from Spark 2.2 or prior do not store catalog or namespace, + // however its sql text should already be fully qualified. + child + case catalogAndNamespace => + // Substitute CTEs within the view before qualifying table identifiers + qualifyTableIdentifiers(CTESubstitution.apply(child), catalogAndNamespace) + } + + // The relation is a view, so we wrap the relation by: + // 1. Add a [[View]] operator over the relation to keep track of the view desc; + // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. + SubqueryAlias(name, View(desc, false, qualifiedChild)) + } + + private def parseViewText(name: String, viewText: String): LogicalPlan = { + try { + sqlParser.parsePlan(viewText) + } catch { + case _: ParseException => + throw QueryCompilationErrors.invalidViewText(viewText, name) + } + } + + /** + * Qualify table identifiers with default catalog and namespace if necessary. + */ + private def qualifyTableIdentifiers( + child: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = + child transform { + case u @ UnresolvedRelation(Seq(table), _, _) => + u.copy(multipartIdentifier = catalogAndNamespace :+ table) + case u @ UnresolvedRelation(parts, _, _) + if !catalogManager.isCatalogRegistered(parts.head) => + u.copy(multipartIdentifier = catalogAndNamespace.head +: parts) + } + } + /** * Substitute child plan with WindowSpecDefinitions. */ @@ -1067,7 +1142,7 @@ class Analyzer(override val catalogManager: CatalogManager) // The view's child should be a logical plan parsed from the `desc.viewText`, the variable // `viewText` should be defined, or else we throw an error on the generation of the View // operator. - case view @ View(desc, isTempView, child) if !child.resolved => + case view @ View(CatalogTableViewDescription(desc), isTempView, child) if !child.resolved => // Resolve all the UnresolvedRelations and Views in the child. val newChild = AnalysisContext.withAnalysisContext(desc) { val nestedViewDepth = AnalysisContext.get.nestedViewDepth @@ -1206,23 +1281,32 @@ class Analyzer(override val catalogManager: CatalogManager) }.orElse { expandIdentifier(identifier) match { case CatalogAndIdentifier(catalog, ident) => - if (viewOnly && !CatalogV2Util.isSessionCatalog(catalog)) { - throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views") - } - CatalogV2Util.loadTable(catalog, ident).map { - case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) && - v1Table.v1Table.tableType == CatalogTableType.VIEW => - val v1Ident = v1Table.catalogTable.identifier - val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier) - ResolvedPersistentView(catalog, v2Ident, v1Table.catalogTable.schema) - case table => - ResolvedTable.create(catalog.asTableCatalog, ident, table) - } + lookupView(catalog, ident) + .orElse(lookupTable(catalog, ident)) case _ => None } } } + private def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = + CatalogV2Util.loadView(catalog, ident).map { + case view if CatalogV2Util.isSessionCatalog(catalog) => + ResolvedPersistentView(catalog, ident, view.schema) + case view => + ResolvedV2View(catalog.asViewCatalog, ident, view) + } + + private def lookupTable(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = + CatalogV2Util.loadTable(catalog, ident).map { + case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) && + v1Table.v1Table.tableType == CatalogTableType.VIEW => + val v1Ident = v1Table.catalogTable.identifier + val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier) + analysis.ResolvedPersistentView(catalog, v2Ident, v1Table.catalogTable.schema) + case table => + ResolvedTable.create(catalog.asTableCatalog, ident, table) + } + private def createRelation( catalog: CatalogPlugin, ident: Identifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 276bf714a343c..e0e790ff3129f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -607,6 +607,44 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case alter: AlterTableCommand => checkAlterTableCommand(alter) + case c: CreateView => + if (c.originalText.isEmpty) { + throw new AnalysisException( + "'originalText' must be provided to create permanent view") + } + + if (c.allowExisting && c.replace) { + throw new AnalysisException( + "CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.") + } + + // If the view output doesn't have the same number of columns neither with the child + // output, nor with the query column names, throw an AnalysisException. + // If the view's child output can't up cast to the view output, + // throw an AnalysisException, too. + case v @ View(desc, _, child) if child.resolved && v.output != child.output => + val queryColumnNames = desc.viewQueryColumnNames + val queryOutput = if (queryColumnNames.nonEmpty) { + if (v.output.length != queryColumnNames.length) { + // If the view output doesn't have the same number of columns with the query column + // names, throw an AnalysisException. + throw new AnalysisException( + s"The view output ${v.output.mkString("[", ",", "]")} doesn't have the same" + + "number of columns with the query column names " + + s"${queryColumnNames.mkString("[", ",", "]")}") + } + val resolver = SQLConf.get.resolver + queryColumnNames.map { colName => + child.output.find { attr => + resolver(attr.name, colName) + }.getOrElse(throw new AnalysisException( + s"Attribute with name '$colName' is not found in " + + s"'${child.output.map(_.name).mkString("(", ",", ")")}'")) + } + } else { + child.output + } + case _ => // Falls back to the following checks } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 221f1a0f3135c..d73e3e6bc54e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, ViewChange} +import org.apache.spark.sql.errors.QueryCompilationErrors /** * Resolves the catalog of the name parts for table/view/function/namespace. @@ -27,6 +28,8 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, Looku class ResolveCatalogs(val catalogManager: CatalogManager) extends Rule[LogicalPlan] with LookupCatalog { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case UnresolvedIdentifier(nameParts, allowTemp) => if (allowTemp && catalogManager.v1SessionCatalog.isTempView(nameParts)) { @@ -50,5 +53,52 @@ class ResolveCatalogs(val catalogManager: CatalogManager) ResolvedNamespace(currentCatalog, Seq.empty[String]) case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) => ResolvedNamespace(catalog, ns) + + case DescribeRelation(ResolvedV2View(_, ident, view), _, isExtended, _) => + DescribeV2View(V2ViewDescription(ident.quoted, view), isExtended) + + case ShowCreateTable(ResolvedV2View(_, ident, view), _, _) => + ShowCreateV2View(V2ViewDescription(ident.quoted, view)) + + case ShowTableProperties(ResolvedV2View(_, ident, view), propertyKeys, _) => + ShowV2ViewProperties(V2ViewDescription(ident.quoted, view), propertyKeys) + + case SetViewProperties(ResolvedV2View(catalog, ident, _), props) => + val changes = props.map { + case (property, value) => ViewChange.setProperty(property, value) + }.toSeq + AlterV2View(catalog, ident, changes) + + case UnsetViewProperties(ResolvedV2View(catalog, ident, _), propertyKeys, ifExists) => + if (!ifExists) { + val view = catalog.loadView(ident) + propertyKeys.filterNot(view.properties.containsKey).foreach { property => + QueryCompilationErrors.cannotUnsetNonExistentViewProperty(ident, property) + } + } + val changes = propertyKeys.map(ViewChange.removeProperty) + AlterV2View(catalog, ident, changes) + + case RenameTable(ResolvedV2View(oldCatalog, oldIdent, _), + NonSessionCatalogAndIdentifier(newCatalog, newIdent), true) => + if (oldCatalog.name != newCatalog.name) { + QueryCompilationErrors.cannotMoveViewBetweenCatalogs( + oldCatalog.name, newCatalog.name) + } + RenameV2View(oldCatalog, oldIdent, newIdent) + + case RefreshTable(ResolvedV2View(catalog, ident, _)) => + RefreshView(catalog, ident) + + case DropView(ResolvedV2View(catalog, ident, _), ifExists) => + DropV2View(catalog, ident, ifExists) + } + + object NonSessionCatalogAndTable { + def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match { + case NonSessionCatalogAndIdentifier(catalog, ident) => + Some(catalog -> ident.asMultipartIdentifier) + case _ => None + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2ViewDescription.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2ViewDescription.scala new file mode 100644 index 0000000000000..8d5f51da128d1 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2ViewDescription.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.plans.logical.ViewDescription +import org.apache.spark.sql.connector.catalog.{View, ViewCatalog} +import org.apache.spark.sql.types.StructType + +/** + * View description backed by a View in V2 catalog. + * + * @param view a view in V2 catalog + */ +case class V2ViewDescription( + override val identifier: String, + view: View) extends ViewDescription { + + override val schema: StructType = view.schema + + override val viewText: Option[String] = Option(view.query) + + override val viewCatalogAndNamespace: Seq[String] = + view.currentCatalog +: view.currentNamespace.toSeq + + override val viewQueryColumnNames: Seq[String] = view.schema.fieldNames + + val query: String = view.query + + val comment: Option[String] = Option(view.properties.get(ViewCatalog.PROP_COMMENT)) + + val owner: Option[String] = Option(view.properties.get(ViewCatalog.PROP_OWNER)) + + val createEngineVersion: Option[String] = + Option(view.properties.get(ViewCatalog.PROP_CREATE_ENGINE_VERSION)) + + val properties: Map[String, String] = + view.properties.asScala.toMap -- ViewCatalog.RESERVED_PROPERTIES.asScala +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index e6be5c2395551..b6c15563379a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, LeafExpression, Une import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, UNRESOLVED_FUNC} import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, Table, TableCatalog, View => V2View, ViewCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.catalog.functions.UnboundFunction @@ -218,6 +218,13 @@ case class ResolvedTempView(identifier: Identifier, viewSchema: StructType) override def output: Seq[Attribute] = Nil } +case class ResolvedV2View( + catalog: ViewCatalog, + identifier: Identifier, + view: V2View) extends LeafNodeWithoutStats { + override def output: Seq[Attribute] = Nil +} + /** * A plan containing resolved persistent function. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 343fa3517c650..44986c138976d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -713,6 +713,34 @@ case class InsertIntoDir( } /** + * A trait for view description used by [[View]] container. + */ +trait ViewDescription { + val identifier: String + val schema: StructType + val viewText: Option[String] + val viewCatalogAndNamespace: Seq[String] + val viewQueryColumnNames: Seq[String] + val properties: Map[String, String] +} + +/** + * View description backed by a [[CatalogTable]]. + * + * @param metadata a CatalogTable + */ +case class CatalogTableViewDescription(metadata: CatalogTable) extends ViewDescription { + override val identifier: String = metadata.identifier.quotedString + override val schema: StructType = metadata.schema + override val viewText: Option[String] = metadata.viewText + override val viewCatalogAndNamespace: Seq[String] = metadata.viewCatalogAndNamespace + override val viewQueryColumnNames: Seq[String] = metadata.viewQueryColumnNames + override val properties: Map[String, String] = metadata.properties +} + +/** + * A container for holding the view description, and the output of the view. The + * child should be a logical plan parsed from the view text. * A container for holding the view description(CatalogTable) and info whether the view is temporary * or not. If it's a SQL (temp) view, the child should be a logical plan parsed from the * `CatalogTable.viewText`. Otherwise, the view is a temporary one created from a dataframe and the @@ -721,14 +749,13 @@ case class InsertIntoDir( * * This operator will be removed at the end of analysis stage. * - * @param desc A view description(CatalogTable) that provides necessary information to resolve the - * view. + * @param desc A view description that provides necessary information to resolve the view. * @param isTempView A flag to indicate whether the view is temporary or not. * @param child The logical plan of a view operator. If the view description is available, it should - * be a logical plan parsed from the `CatalogTable.viewText`. + * be a logical plan parsed from the view text. */ case class View( - desc: CatalogTable, + desc: ViewDescription, isTempView: Boolean, child: LogicalPlan) extends UnaryNode { require(!isTempViewStoringAnalyzedPlan || child.resolved) @@ -770,6 +797,9 @@ case class View( } object View { + def apply(desc: CatalogTable, isTempView: Boolean, child: LogicalPlan): View = + View(CatalogTableViewDescription(desc), isTempView, child) + def effectiveSQLConf(configs: Map[String, String], isTempView: Boolean): SQLConf = { val activeConf = SQLConf.get // For temporary view, we always use captured sql configs diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 9508b2fb99336..dd91747e40d5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException} +import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException, V2ViewDescription} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, NamedExpression, Unevaluable, V2ExpressionUtils} @@ -1363,3 +1363,76 @@ case class TableSpec( comment: Option[String], serde: Option[SerdeInfo], external: Boolean) + +/** + * Create or replace a view in a v2 catalog. + */ +case class CreateV2View( + catalog: ViewCatalog, + ident: Identifier, + sql: String, + comment: Option[String], + viewSchema: StructType, + queryColumnNames: Array[String], + columnAliases: Array[String], + columnComments: Array[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafCommand + +/** + * Drop a view in a v2 catalog. + */ +case class DropV2View( + catalog: ViewCatalog, + ident: Identifier, + ifExists: Boolean) extends LeafCommand + +/** + * Alter a view in a v2 catalog. + */ +case class AlterV2View( + catalog: ViewCatalog, + ident: Identifier, + changes: Seq[ViewChange]) extends LeafCommand + +/** + * The logical plan of the ALTER VIEW RENAME command for v2 views. + */ +case class RenameV2View( + catalog: ViewCatalog, + oldIdent: Identifier, + newIdent: Identifier) extends LeafCommand + +/** + * Describe a view in a v2 catalog. + */ +case class DescribeV2View(desc: V2ViewDescription, isExtended: Boolean) extends LeafCommand { + override def output: Seq[Attribute] = DescribeCommandSchema.describeTableAttributes() +} + +/** + * Show create statement for a view in a v2 catalog. + */ +case class ShowCreateV2View(desc: V2ViewDescription) extends LeafCommand { + override def output: Seq[Attribute] = Seq( + AttributeReference("create_statement", StringType, nullable = false)()) +} + +/** + * Show view properties. + */ +case class ShowV2ViewProperties( + desc: V2ViewDescription, + propertyKey: Option[String]) extends LeafCommand { + override val output: Seq[Attribute] = Seq( + AttributeReference("key", StringType, nullable = false)(), + AttributeReference("value", StringType, nullable = false)()) +} + +/** + * Refresh a view + */ +case class RefreshView( + catalog: ViewCatalog, + ident: Identifier) extends LeafCommand diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index d9f15d84d8932..e6126829ae18b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -86,6 +86,13 @@ private[sql] object CatalogV2Implicits { throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "tables") } + def asViewCatalog: ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") + } + def asNamespaceCatalog: SupportsNamespaces = plugin match { case namespaceCatalog: SupportsNamespaces => namespaceCatalog diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 72c557c8d7726..d95ca000c6758 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,7 +22,7 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} +import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, NoSuchViewException, TimeTravelSpec} import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ import org.apache.spark.sql.connector.catalog.TableChange._ @@ -367,6 +367,16 @@ private[sql] object CatalogV2Util { loadTable(catalog, ident).map(DataSourceV2Relation.create(_, Some(catalog), Some(ident))) } + def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { + case viewCatalog: ViewCatalog => + try { + Option(viewCatalog.loadView(ident)) + } catch { + case _: NoSuchViewException => None + } + case _ => None + } + def isSessionCatalog(catalog: CatalogPlugin): Boolean = { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 1a8c42b599e80..df499466a26dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -358,14 +358,14 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { origin = t.origin) } - def insertIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + def insertIntoViewNotAllowedError(identifier: String, t: TreeNode[_]): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1010", - messageParameters = Map("identifier" -> identifier.toString), + messageParameters = Map("identifier" -> identifier), origin = t.origin) } - def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + def writeIntoViewNotAllowedError(identifier: String, t: TreeNode[_]): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1011", messageParameters = Map("identifier" -> identifier.toString), @@ -2669,12 +2669,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { } def recursiveViewDetectedError( - viewIdent: TableIdentifier, - newPath: Seq[TableIdentifier]): Throwable = { + viewIdent: String, + newPath: Seq[String]): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1281", messageParameters = Map( - "viewIdent" -> viewIdent.toString, + "viewIdent" -> viewIdent, "newPath" -> newPath.mkString(" -> "))) } @@ -3426,4 +3426,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { errorClass = "NULLABLE_ROW_ID_ATTRIBUTES", messageParameters = Map("nullableRowIdAttrs" -> nullableRowIdAttrs.mkString(", "))) } + + def cannotUnsetNonExistentViewProperty(ident: Identifier, property: String): Throwable = + throw new AnalysisException( + s"Attempted to unset non-existent property '$property' in view $ident") + + def cannotMoveViewBetweenCatalogs(oldCatalog: String, newCatalog: String): Throwable = + throw new AnalysisException( + s"Cannot move view between catalogs: from=$oldCatalog and to=$newCatalog") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/CreateViewAnalysis.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/CreateViewAnalysis.scala new file mode 100644 index 0000000000000..6b9c5b80215af --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/CreateViewAnalysis.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.plans.logical.{AlterViewAs, CreateV2View, CreateView, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog, ViewCatalog} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.{CommandExecutionMode, QueryExecution} +import org.apache.spark.sql.execution.command.ViewHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils + +/** + * Resolve views in CREATE VIEW and ALTER VIEW AS plans and convert them to logical plans. + */ +case class CreateViewAnalysis( + override val catalogManager: CatalogManager, + executePlan: (LogicalPlan, CommandExecutionMode.Value) => QueryExecution) + extends Rule[LogicalPlan] with LookupCatalog { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + private lazy val isTempView = + (nameParts: Seq[String]) => catalogManager.v1SessionCatalog.isTempView(nameParts) + private lazy val isTemporaryFunction = catalogManager.v1SessionCatalog.isTemporaryFunction _ + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + + case CreateView(ResolvedIdentifier(catalog, nameParts), userSpecifiedColumns, comment, + properties, originalText, child, allowExisting, replace) => + convertCreateView( + catalog = catalog.asViewCatalog, + ident = nameParts, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + originalText = originalText, + child = child, + allowExisting = allowExisting, + replace = replace) + + case AlterViewAs(ResolvedV2View(catalog, ident, _), originalText, query) => + convertCreateView( + catalog = catalog, + ident = ident, + userSpecifiedColumns = Seq.empty, + comment = None, + properties = Map.empty, + originalText = Option(originalText), + child = query, + allowExisting = false, + replace = true) + } + + /** + * Convert [[CreateView]] or [[AlterViewAs]] to logical plan [[CreateV2View]]. + */ + private def convertCreateView( + catalog: ViewCatalog, + ident: Identifier, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: Option[String], + child: LogicalPlan, + allowExisting: Boolean, + replace: Boolean): LogicalPlan = { + val qe = executePlan(child, CommandExecutionMode.SKIP) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + + if (userSpecifiedColumns.nonEmpty && + userSpecifiedColumns.length != analyzedPlan.output.length) { + throw new AnalysisException(s"The number of columns produced by the SELECT clause " + + s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " + + s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).") + } + + verifyTemporaryObjectsNotExists(ident, child) + + val queryOutput = analyzedPlan.schema.fieldNames + // Generate the query column names, + // throw an AnalysisException if there exists duplicate column names. + SchemaUtils.checkColumnNameDuplication(queryOutput, SQLConf.get.resolver) + + userSpecifiedColumns.map(_._1).zip(queryOutput).foreach { case (n1, n2) => + if (n1 != n2) { + throw new AnalysisException(s"Renaming columns is not supported: $n1 != $n2") + } + } + + if (replace) { + // Detect cyclic view reference on CREATE OR REPLACE VIEW or ALTER VIEW AS. + val parts = (catalog.name +: ident.asMultipartIdentifier).quoted + ViewHelper.checkCyclicViewReference(analyzedPlan, Seq(parts), parts) + } + + val sql = originalText.getOrElse { + throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() + } + + val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema + val columnAliases = userSpecifiedColumns.map(_._1).toArray + val columnComments = userSpecifiedColumns.map(_._2.getOrElse(null)).toArray + + CreateV2View( + catalog = catalog, + ident = ident, + sql = sql, + comment = comment, + viewSchema = viewSchema, + queryOutput, + columnAliases = columnAliases, + columnComments = columnComments, + properties = properties, + allowExisting = allowExisting, + replace = replace) + } + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + private def aliasPlan( + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + executePlan(Project(projectList, analyzedPlan), CommandExecutionMode.SKIP).analyzed + } + } + + /** + * Permanent views are not allowed to reference temp objects, including temp function and views + */ + private def verifyTemporaryObjectsNotExists( + name: Identifier, + child: LogicalPlan): Unit = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + // This func traverses the unresolved plan `child`. Below are the reasons: + // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding + // logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is + // added/generated from a temporary view. + // 2) The temp functions are represented by multiple classes. Most are inaccessible from this + // package (e.g., HiveGenericUDF). + child.collect { + // Disallow creating permanent views based on temporary views. + case UnresolvedRelation(nameParts, _, _) if isTempView(nameParts) => + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary view ${nameParts.quoted}. " + + "Please create a temp view instead by CREATE TEMP VIEW") + case other if !other.resolved => other.expressions.flatMap(_.collect { + // Disallow creating permanent views based on temporary UDFs. + case UnresolvedFunction(Seq(funcName), _, _, _, _) + if isTemporaryFunction(FunctionIdentifier(funcName)) => + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary function $funcName") + }) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index d41611439f0ba..59e564dc35692 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint -import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View} +import org.apache.spark.sql.catalyst.plans.logical.{CatalogTableViewDescription, IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View} import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation @@ -185,7 +185,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { isSameName(ident.qualifier :+ ident.name) && isSameName(catalog.name() +: v2Ident.namespace() :+ v2Ident.name()) - case SubqueryAlias(ident, View(catalogTable, _, _)) => + case SubqueryAlias(ident, View(CatalogTableViewDescription(catalogTable), _, _)) => val v1Ident = catalogTable.identifier isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 3ad98fa0d0c1d..8cc3d6cd991ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CatalogTableViewDescription, LogicalPlan, Project, View} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.errors.QueryCompilationErrors @@ -500,8 +500,8 @@ object ViewHelper extends SQLConfHelper with Logging { */ def checkCyclicViewReference( plan: LogicalPlan, - path: Seq[TableIdentifier], - viewIdent: TableIdentifier): Unit = { + path: Seq[String], + viewIdent: String): Unit = { plan match { case v: View => val ident = v.desc.identifier @@ -542,6 +542,12 @@ object ViewHelper extends SQLConfHelper with Logging { } } + def checkCyclicViewReference( + plan: LogicalPlan, + path: Seq[TableIdentifier], + viewIdent: TableIdentifier): Unit = + checkCyclicViewReference(plan, path.map(_.quotedString), viewIdent.quotedString) + /** * Permanent views are not allowed to reference temp objects, including temp function and views */ @@ -570,8 +576,8 @@ object ViewHelper extends SQLConfHelper with Logging { private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = { def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { child.flatMap { - case view: View if view.isTempView => - val ident = view.desc.identifier + case View(CatalogTableViewDescription(desc), true, _) => + val ident = desc.identifier Seq(ident.database.toSeq :+ ident.table) case plan => plan.expressions.flatMap(_.flatMap { case e: SubqueryExpression => collectTempViews(e.plan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterViewExec.scala new file mode 100644 index 0000000000000..3f9e2e79c1e9f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterViewExec.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog, ViewChange} + +/** + * Physical plan node for altering a view. + */ +case class AlterViewExec( + catalog: ViewCatalog, + ident: Identifier, + changes: Seq[ViewChange]) extends LeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + try { + catalog.alterView(ident, changes: _*) + } catch { + case e: IllegalArgumentException => + throw new SparkException(s"Invalid view change: ${e.getMessage}", e) + case e: UnsupportedOperationException => + throw new SparkException(s"Unsupported view change: ${e.getMessage}", e) + } + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateViewExec.scala new file mode 100644 index 0000000000000..d1e6a0d50b32f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateViewExec.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog} +import org.apache.spark.sql.types.StructType + +/** + * Physical plan node for creating a view. + */ +case class CreateViewExec( + catalog: ViewCatalog, + ident: Identifier, + sql: String, + currentCatalog: String, + currentNamespace: Array[String], + comment: Option[String], + viewSchema: StructType, + queryColumnNames: Array[String], + columnAliases: Array[String], + columnComments: Array[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val createEngineVersion = if (replace) None else Some(engineVersion) + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) ++ + createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) + + (ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } + catalog.createView( + ident, + sql, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + catalog.createView( + ident, + sql, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } catch { + case _: ViewAlreadyExistsException if allowExisting => // Ignore + } + } + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 29f0da1158ff5..972698d655e57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns, V2ExpressionBuilder} -import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable} import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate} @@ -54,6 +54,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat import DataSourceV2Implicits._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val catalogManager: CatalogManager = session.sessionState.catalogManager + private def withProjectAndFilter( project: Seq[NamedExpression], filters: Seq[Expression], @@ -493,6 +495,43 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val table = a.table.asInstanceOf[ResolvedTable] AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil + case c: CreateV2View => + CreateViewExec( + c.catalog, + c.ident, + c.sql, + catalogManager.currentCatalog.name, + catalogManager.currentNamespace, + c.comment, + c.schema, + c.queryColumnNames, + c.columnAliases, + c.columnComments, + c.properties, + c.allowExisting, + c.replace) :: Nil + + case AlterV2View(catalog, ident, changes) => + AlterViewExec(catalog, ident, changes) :: Nil + + case DropV2View(catalog, ident, ifExists) => + DropViewExec(catalog, ident, ifExists) :: Nil + + case RenameV2View(catalog, oldIdent, newIdent) => + RenameViewExec(catalog, oldIdent, newIdent) :: Nil + + case d @ DescribeV2View(desc, isExtended) => + DescribeViewExec(d.output, desc, isExtended) :: Nil + + case show @ ShowCreateV2View(view) => + ShowCreateViewExec(show.output, view) :: Nil + + case show @ ShowV2ViewProperties(view, propertyKey) => + ShowViewPropertiesExec(show.output, view, propertyKey) :: Nil + + case RefreshView(catalog, ident) => + RefreshViewExec(catalog, ident) :: Nil + case CreateIndex(ResolvedTable(_, _, table, _), indexName, indexType, ifNotExists, columns, properties) => table match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeViewExec.scala new file mode 100644 index 0000000000000..c827d85b0674f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeViewExec.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +case class DescribeViewExec( + output: Seq[Attribute], + desc: V2ViewDescription, + isExtended: Boolean) extends V2CommandExec with LeafExecNode { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override protected def run(): Seq[InternalRow] = + if (isExtended) { + (describeSchema :+ emptyRow) ++ describeExtended + } else { + describeSchema + } + + private def describeSchema: Seq[InternalRow] = + desc.schema.map { column => + toCatalystRow( + column.name, + column.dataType.simpleString, + column.getComment().getOrElse("")) + } + + private def emptyRow: InternalRow = toCatalystRow("", "", "") + + private def describeExtended: Seq[InternalRow] = { + val outputColumns = desc.viewQueryColumnNames.mkString("[", ", ", "]") + val tableProperties = desc.properties + .map(p => p._1 + "=" + p._2) + .mkString("[", ", ", "]") + + toCatalystRow("# Detailed View Information", "", "") :: + toCatalystRow("Owner", desc.owner.getOrElse(""), "") :: + toCatalystRow("Comment", desc.comment.getOrElse(""), "") :: + toCatalystRow("View Text", desc.query, "") :: + toCatalystRow("View Catalog and Namespace", desc.viewCatalogAndNamespace.quoted, "") :: + toCatalystRow("View Query Output Columns", outputColumns, "") :: + toCatalystRow("Table Properties", tableProperties, "") :: + toCatalystRow("Created By", desc.createEngineVersion.getOrElse(""), "") :: + Nil + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropViewExec.scala new file mode 100644 index 0000000000000..aeccae56372e4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropViewExec.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog} + +/** + * Physical plan node for dropping a view. + */ +case class DropViewExec( + catalog: ViewCatalog, + ident: Identifier, + ifExists: Boolean) extends LeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + val exists = try { + catalog.viewExists(ident) + } catch { + case _: NoSuchViewException => + false + case _: Throwable => + // if the existence check failed, try to run the delete + true + } + + if (exists) { + catalog.dropView(ident) + } else if (!ifExists) { + throw new NoSuchViewException(ident) + } + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshViewExec.scala new file mode 100644 index 0000000000000..676efc5c8b928 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshViewExec.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog} + +case class RefreshViewExec( + catalog: ViewCatalog, + ident: Identifier) extends LeafV2CommandExec { + override protected def run(): Seq[InternalRow] = { + // REFRESH VIEW is no op for now + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameViewExec.scala new file mode 100644 index 0000000000000..8426e9f060e25 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameViewExec.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, ViewCatalog} + +/** + * Physical plan node for renaming a view. + */ +case class RenameViewExec( + catalog: ViewCatalog, + oldIdent: Identifier, + newIdent: Identifier) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Seq.empty + + override protected def run(): Seq[InternalRow] = { + catalog.renameView(oldIdent, newIdent) + + Seq.empty + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateViewExec.scala new file mode 100644 index 0000000000000..0c0b09fb7e0fa --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateViewExec.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for showing view create statement. + */ +case class ShowCreateViewExec(output: Seq[Attribute], desc: V2ViewDescription) + extends V2CommandExec with LeafExecNode { + + override protected def run(): Seq[InternalRow] = { + val schema = desc.schema.map(_.name).mkString("(", ", ", ")") + val create = s"CREATE VIEW ${desc.identifier} $schema AS\n${desc.query}\n" + Seq(toCatalystRow(create)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowViewPropertiesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowViewPropertiesExec.scala new file mode 100644 index 0000000000000..67df0adc3a45a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowViewPropertiesExec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for showing view properties. + */ +case class ShowViewPropertiesExec( + output: Seq[Attribute], + desc: V2ViewDescription, + propertyKey: Option[String]) extends V2CommandExec with LeafExecNode { + + override protected def run(): Seq[InternalRow] = { + propertyKey match { + case Some(p) => + val propValue = desc.properties.getOrElse(p, + s"View ${desc.identifier} does not have property: $p") + Seq(toCatalystRow(p, propValue)) + case None => + desc.properties.map { + case (k, v) => toCatalystRow(k, v) + }.toSeq + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index f81b12796ce97..576895003fb88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.internal import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, ReplaceCharWithVarchar, ResolveSessionCatalog, TableFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, CreateViewAnalysis, EvalSubqueriesForTimeTravel, FunctionRegistry, ReplaceCharWithVarchar, ResolveSessionCatalog, TableFunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.Optimizer @@ -181,9 +181,14 @@ abstract class BaseSessionStateBuilder( * Note: this depends on the `conf` and `catalog` fields. */ protected def analyzer: Analyzer = new Analyzer(catalogManager) { + + override val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = + Seq(ViewSubstitution(sqlParser)) + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: + CreateViewAnalysis(catalogManager, createQueryExecution) +: new FallBackFileSourceV2(session) +: ResolveEncodersInScalaAgg +: new ResolveSessionCatalog(catalogManager) +: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 79b0084da23ad..8a8bd86c4f9a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, TableSpec, View} -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table} +import org.apache.spark.sql.catalyst.plans.logical.{CatalogTableViewDescription, CreateTable, LocalRelation, LogicalPlan, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, TableSpec, View} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, TableCatalog, V1Table, Table => V2Table} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.ShowTablesCommand @@ -805,8 +805,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { // Temporary and global temporary views are not supposed to be put into the relation cache // since they are tracked separately. V1 and V2 plans are cache invalidated accordingly. def invalidateCache(plan: LogicalPlan): Unit = plan match { - case v: View => - if (!v.isTempView) sessionCatalog.invalidateCachedTable(v.desc.identifier) + case v @ View(CatalogTableViewDescription(desc), _, _) => + if (!v.isTempView) sessionCatalog.invalidateCachedTable(desc.identifier) case r: LogicalRelation => sessionCatalog.invalidateCachedTable(r.catalogTable.get.identifier) case h: HiveTableRelation => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index a4b7f762dbaa5..2c2799ec16f5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1946,7 +1946,7 @@ class DataSourceV2SQLSuiteV1Filter } } - test("AlterTable: renaming views are not supported") { + test("ALTER VIEW RENAME: not a view catalog") { val e = intercept[AnalysisException] { sql(s"ALTER VIEW testcat.ns.tbl RENAME TO ns.view") } @@ -2369,7 +2369,7 @@ class DataSourceV2SQLSuiteV1Filter } } - test("View commands are not supported in v2 catalogs") { + ignore("View commands are not supported in v2 catalogs") { def validateViewCommand(sqlStatement: String): Unit = { val e = intercept[AnalysisException](sql(sqlStatement)) checkError( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index c4e0057ae952d..afa8c775c29ec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, ReplaceCharWithVarchar, ResolveSessionCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, CreateViewAnalysis, EvalSubqueriesForTimeTravel, ReplaceCharWithVarchar, ResolveSessionCatalog} import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, InvalidUDFClassException} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -83,10 +83,15 @@ class HiveSessionStateBuilder( * A logical query plan `Analyzer` with rules specific to Hive. */ override protected def analyzer: Analyzer = new Analyzer(catalogManager) { + + override val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = + Seq(ViewSubstitution(sqlParser)) + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: + CreateViewAnalysis(catalogManager, createQueryExecution) +: new FallBackFileSourceV2(session) +: ResolveEncodersInScalaAgg +: new ResolveSessionCatalog(catalogManager) +: