diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index 3cfda994ebb9..8d98835cd4e9 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -20,10 +20,13 @@ package org.apache.iceberg.spark.extensions import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.analysis.CreateViewAnalysis import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion import org.apache.spark.sql.catalyst.analysis.ResolveProcedures +import org.apache.spark.sql.catalyst.analysis.ResolveViews import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser +import org.apache.spark.sql.catalyst.plans.logical.views.EliminateIcebergView import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { @@ -34,10 +37,13 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { // analyzer extensions extensions.injectResolutionRule { spark => ResolveProcedures(spark) } + extensions.injectResolutionRule { spark => ResolveViews(spark) } extensions.injectResolutionRule { _ => ProcedureArgumentCoercion } + extensions.injectResolutionRule { spark => CreateViewAnalysis(spark) } // optimizer extensions extensions.injectOptimizerRule { _ => ReplaceStaticInvoke } + extensions.injectOptimizerRule(_ => EliminateIcebergView) // planner extensions extensions.injectPlannerStrategy { spark => ExtendedDataSourceV2Strategy(spark) } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CreateViewAnalysis.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CreateViewAnalysis.scala new file mode 100644 index 000000000000..a7f589dea94f --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CreateViewAnalysis.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.AlterViewAs +import org.apache.spark.sql.catalyst.plans.logical.CreateView +import org.apache.spark.sql.catalyst.plans.logical.IcebergView +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.catalyst.plans.logical.views.CreateV2View +import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.LookupCatalog +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils + +/** + * Resolve views in CREATE VIEW and ALTER VIEW AS plans and convert them to logical plans. + */ +case class CreateViewAnalysis(spark: SparkSession) + extends Rule[LogicalPlan] with LookupCatalog { + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + private lazy val isTempView = + (nameParts: Seq[String]) => catalogManager.v1SessionCatalog.isTempView(nameParts) + private lazy val isTemporaryFunction = catalogManager.v1SessionCatalog.isTemporaryFunction _ + + def asViewCatalog(plugin: CatalogPlugin): ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") + } + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case CreateIcebergView(ResolvedIdentifier(catalog, nameParts), userSpecifiedColumns, comment, + properties, originalText, child, allowExisting, replace) => + convertCreateView( + catalog = asViewCatalog(catalog), + viewInfo = ViewInfo( + ident = nameParts, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + originalText = originalText), + child = child, + allowExisting = allowExisting, + replace = replace) + + case AlterViewAs(ResolvedV2View(catalog, ident, _), originalText, query) => + convertCreateView( + catalog = catalog, + viewInfo = ViewInfo( + ident = ident, + userSpecifiedColumns = Seq.empty, + comment = None, + properties = Map.empty, + originalText = Option(originalText)), + child = query, + allowExisting = false, + replace = true) + } + + private case class ViewInfo(ident: Identifier, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: Option[String]) + + /** + * Convert [[CreateView]] or [[AlterViewAs]] to logical plan [[CreateV2View]]. + */ + private def convertCreateView( + catalog: ViewCatalog, + viewInfo: ViewInfo, + child: LogicalPlan, + allowExisting: Boolean, + replace: Boolean): LogicalPlan = { + + val qe = new QueryExecution(spark, child, mode = CommandExecutionMode.SKIP) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + + if (viewInfo.userSpecifiedColumns.nonEmpty && + viewInfo.userSpecifiedColumns.length != analyzedPlan.output.length) { + throw new AnalysisException(s"The number of columns produced by the SELECT clause " + + s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " + + s"specified by CREATE VIEW (num: `${viewInfo.userSpecifiedColumns.length}`).") + } + + verifyTemporaryObjectsNotExists(viewInfo.ident, child) + + val queryOutput = analyzedPlan.schema.fieldNames + // Generate the query column names, + // throw an AnalysisException if there exists duplicate column names. + SchemaUtils.checkColumnNameDuplication(queryOutput, SQLConf.get.resolver) + + viewInfo.userSpecifiedColumns.map(_._1).zip(queryOutput).foreach { case (n1, n2) => + if (n1 != n2) { + throw new AnalysisException(s"Renaming columns is not supported: $n1 != $n2") + } + } + + if (replace) { + // Detect cyclic view reference on CREATE OR REPLACE VIEW or ALTER VIEW AS. + val parts = (catalog.name +: viewInfo.ident.asMultipartIdentifier).quoted + checkCyclicViewReference(analyzedPlan, Seq(parts), parts) + } + + val sql = viewInfo.originalText.getOrElse { + throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() + } + + val viewSchema = aliasPlan(analyzedPlan, viewInfo.userSpecifiedColumns).schema + val columnAliases = viewInfo.userSpecifiedColumns.map(_._1).toArray + val columnComments = viewInfo.userSpecifiedColumns.map(_._2.getOrElse(null)).toArray + + CreateV2View( + view = viewInfo.ident.asMultipartIdentifier, + sql = sql, + comment = None, + viewSchema = viewSchema, + queryOutput, + columnAliases = columnAliases, + columnComments = columnComments, + properties = viewInfo.properties, + allowExisting = allowExisting, + replace = replace) + } + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + private def aliasPlan( + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + new QueryExecution(spark, Project(projectList, analyzedPlan), mode = CommandExecutionMode.SKIP).analyzed + } + } + + /** + * Permanent views are not allowed to reference temp objects, including temp function and views + */ + private def verifyTemporaryObjectsNotExists( + name: Identifier, + child: LogicalPlan): Unit = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + // This func traverses the unresolved plan `child`. Below are the reasons: + // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding + // logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is + // added/generated from a temporary view. + // 2) The temp functions are represented by multiple classes. Most are inaccessible from this + // package (e.g., HiveGenericUDF). + child.collect { + // Disallow creating permanent views based on temporary views. + case UnresolvedRelation(nameParts, _, _) if isTempView(nameParts) => + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary view ${nameParts.quoted}. " + + "Please create a temp view instead by CREATE TEMP VIEW") + case other if !other.resolved => other.expressions.flatMap(_.collect { + // Disallow creating permanent views based on temporary UDFs. + case UnresolvedFunction(Seq(funcName), _, _, _, _) + if isTemporaryFunction(FunctionIdentifier(funcName)) => + throw new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary function $funcName") + }) + } + } + + /** + * Recursively search the logical plan to detect cyclic view references, throw an + * AnalysisException if cycle detected. + * + * A cyclic view reference is a cycle of reference dependencies, for example, if the following + * statements are executed: + * CREATE VIEW testView AS SELECT id FROM tbl + * CREATE VIEW testView2 AS SELECT id FROM testView + * ALTER VIEW testView AS SELECT * FROM testView2 + * The view `testView` references `testView2`, and `testView2` also references `testView`, + * therefore a reference cycle (testView -> testView2 -> testView) exists. + * + * @param plan the logical plan we detect cyclic view references from. + * @param path the path between the altered view and current node. + * @param viewIdent the table identifier of the altered view, we compare two views by the + * `desc.identifier`. + */ + private def checkCyclicViewReference( + plan: LogicalPlan, + path: Seq[String], + viewIdent: String): Unit = { + plan match { + case v: IcebergView => + val ident = v.desc.identifier + val newPath = path :+ ident + // If the table identifier equals to the `viewIdent`, current view node is the same with + // the altered view. We detect a view reference cycle, should throw an AnalysisException. + if (ident == viewIdent) { + throw recursiveViewDetectedError(viewIdent, newPath) + } else { + v.children.foreach { child => + checkCyclicViewReference(child, newPath, viewIdent) + } + } + case _ => + plan.children.foreach(child => checkCyclicViewReference(child, path, viewIdent)) + } + + // Detect cyclic references from subqueries. + plan.expressions.foreach { expr => + expr match { + case s: SubqueryExpression => + checkCyclicViewReference(s.plan, path, viewIdent) + case _ => // Do nothing. + } + } + } + + private def recursiveViewDetectedError( + viewIdent: String, + newPath: Seq[String]): Throwable = { + new AnalysisException( + errorClass = "RECURSIVE_VIEW", + messageParameters = Map( + "viewIdent" -> toSQLId(viewIdent), + "newPath" -> newPath.map(p => toSQLId(p)).mkString(" -> "))) + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala new file mode 100644 index 000000000000..0f239bdd6a0f --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.checkAnalysis +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.execute +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.CatalogTableViewDescription +import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation +import org.apache.spark.sql.catalyst.plans.logical.IcebergView +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.RenameTable +import org.apache.spark.sql.catalyst.plans.logical.SetViewProperties +import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable +import org.apache.spark.sql.catalyst.plans.logical.ShowTableProperties +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.UnsetViewProperties +import org.apache.spark.sql.catalyst.plans.logical.views.AlterV2View +import org.apache.spark.sql.catalyst.plans.logical.views.DescribeV2View +import org.apache.spark.sql.catalyst.plans.logical.views.RenameV2View +import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View +import org.apache.spark.sql.catalyst.plans.logical.views.ShowCreateV2View +import org.apache.spark.sql.catalyst.plans.logical.views.ShowV2ViewProperties +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{View => V2View} +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.CatalogV2Util +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.LookupCatalog +import org.apache.spark.sql.connector.catalog.View +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.connector.catalog.ViewChange +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case u@UnresolvedRelation(nameParts, _, _) if catalogManager.v1SessionCatalog.isTempView(nameParts) => + u + + case u@UnresolvedRelation( + parts@NonSessionCatalogAndIdentifier(catalog, ident), _, _) if !isSQLOnFile(parts) => + loadView(catalog, ident) + .map(createViewRelation(parts.quoted, _)) + .getOrElse(u) + + case ShowCreateTable(ResolvedV2View(_, ident, view), _, _) => + ShowCreateV2View(V2ViewDescription(ident.quoted, view)) + + case DescribeRelation(ResolvedV2View(_, ident, view), _, isExtended, _) => + DescribeV2View(V2ViewDescription(ident.quoted, view), isExtended) + + case ShowTableProperties(ResolvedV2View(_, ident, view), propertyKeys, _) => + ShowV2ViewProperties(V2ViewDescription(ident.quoted, view), propertyKeys) + + case SetViewProperties(ResolvedV2View(catalog, ident, _), props) => + val changes = props.map { + case (property, value) => ViewChange.setProperty(property, value) + }.toSeq + AlterV2View(catalog, ident, changes) + + case UnsetViewProperties(ResolvedV2View(catalog, ident, _), propertyKeys, ifExists) => + if (!ifExists) { + val view = catalog.loadView(ident) + propertyKeys.filterNot(view.properties.containsKey).foreach { property => + cannotUnsetNonExistentViewProperty(ident, property) + } + } + val changes = propertyKeys.map(ViewChange.removeProperty) + AlterV2View(catalog, ident, changes) + + case RenameTable(ResolvedV2View(oldCatalog, oldIdent, _), + NonSessionCatalogAndIdentifier(newCatalog, newIdent), true) => + if (oldCatalog.name != newCatalog.name) { + cannotMoveViewBetweenCatalogs( + oldCatalog.name, newCatalog.name) + } + RenameV2View(oldCatalog, oldIdent, newIdent) + + case p@SubqueryAlias(_, view: IcebergView) => + p.copy(child = resolveViews(view)) + + } + + def cannotUnsetNonExistentViewProperty(ident: Identifier, property: String): Throwable = + throw new AnalysisException( + s"Attempted to unset non-existent property '$property' in view $ident") + + def cannotMoveViewBetweenCatalogs(oldCatalog: String, newCatalog: String): Throwable = + throw new AnalysisException( + s"Cannot move view between catalogs: from=$oldCatalog and to=$newCatalog") + + def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = + loadView(catalog, ident).map { + case view if CatalogV2Util.isSessionCatalog(catalog) => + ResolvedPersistentView(catalog, ident, view.schema) + case view => + ResolvedV2View(CatalogHelper(catalog).asViewCatalog, ident, view) + } + + // The current catalog and namespace may be different from when the view was created, we must + // resolve the view logical plan here, with the catalog and namespace stored in view metadata. + // This is done by keeping the catalog and namespace in `AnalysisContext`, and analyzer will + // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name. + // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names + // with it, instead of current catalog and namespace. + def resolveViews(plan: LogicalPlan): LogicalPlan = plan match { + // The view's child should be a logical plan parsed from the `desc.viewText`, the variable + // `viewText` should be defined, or else we throw an error on the generation of the View + // operator. + case view@IcebergView(CatalogTableViewDescription(desc), isTempView, child) if !child.resolved => + // Resolve all the UnresolvedRelations and Views in the child. + val newChild = AnalysisContext.withAnalysisContext(desc) { + val nestedViewDepth = AnalysisContext.get.nestedViewDepth + val maxNestedViewDepth = AnalysisContext.get.maxNestedViewDepth + if (nestedViewDepth > maxNestedViewDepth) { + throw QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( + desc.identifier, maxNestedViewDepth, view) + } + SQLConf.withExistingConf(IcebergView.effectiveSQLConf(desc.viewSQLConfigs, isTempView)) { + execute(child) + } + } + // Fail the analysis eagerly because outside AnalysisContext, the unresolved operators + // inside a view maybe resolved incorrectly. + checkAnalysis(newChild) + view.copy(child = newChild) + case p@SubqueryAlias(_, view: IcebergView) => + p.copy(child = resolveViews(view)) + case _ => plan + } + + + private def isSQLOnFile(parts: Seq[String]): Boolean = parts match { + case Seq(_, path) if path.contains("/") => true + case _ => false + } + + private def createViewRelation(name: String, view: V2View): LogicalPlan = { + if (!SparkSession.active.sessionState.catalogManager.isCatalogRegistered(view.currentCatalog)) { + throw new AnalysisException( + s"Invalid current catalog '${view.currentCatalog}' in view '$name'") + } + + val child = parseViewText(name, view.query) + val desc = V2ViewDescription(name, view) + val qualifiedChild = desc.viewCatalogAndNamespace match { + case Seq() => + // Views from Spark 2.2 or prior do not store catalog or namespace, + // however its sql text should already be fully qualified. + child + case catalogAndNamespace => + // Substitute CTEs within the view before qualifying table identifiers + qualifyTableIdentifiers(CTESubstitution.apply(child), catalogAndNamespace) + } + + // The relation is a view, so we wrap the relation by: + // 1. Add a [[View]] operator over the relation to keep track of the view desc; + // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. + SubqueryAlias(name, IcebergView(desc, false, qualifiedChild)) + } + + private def parseViewText(name: String, viewText: String): LogicalPlan = { + try { + SparkSession.active.sessionState.sqlParser.parsePlan(viewText) + } catch { + case _: ParseException => + throw QueryCompilationErrors.invalidViewText(viewText, name) + } + } + + def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { + case viewCatalog: ViewCatalog => + try { + Option(viewCatalog.loadView(ident)) + } catch { + case _: NoSuchViewException => None + } + case _ => None + } + + + /** + * Qualify table identifiers with default catalog and namespace if necessary. + */ + private def qualifyTableIdentifiers( + child: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = + child transform { + case u@UnresolvedRelation(Seq(table), _, _) => + u.copy(multipartIdentifier = catalogAndNamespace :+ table) + case u@UnresolvedRelation(parts, _, _) + if !SparkSession.active.sessionState.catalogManager.isCatalogRegistered(parts.head) => + u.copy(multipartIdentifier = catalogAndNamespace.head +: parts) + } + + + implicit class CatalogHelper(plugin: CatalogPlugin) { + def asViewCatalog: ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") + } + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2ViewDescription.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2ViewDescription.scala new file mode 100644 index 000000000000..89124dd5c5ec --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2ViewDescription.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.plans.logical.ViewDescription +import org.apache.spark.sql.connector.catalog.View +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.types.StructType +import scala.collection.JavaConverters._ + +/** + * View description backed by a View in V2 catalog. + * + * @param view a view in V2 catalog + */ +case class V2ViewDescription(override val identifier: String, view: View) + extends ViewDescription { + + override val schema: StructType = view.schema + + override val viewText: Option[String] = Option(view.query) + + override val viewCatalogAndNamespace: Seq[String] = + view.currentCatalog +: view.currentNamespace.toSeq + + override val viewQueryColumnNames: Seq[String] = view.schema.fieldNames + + val query: String = view.query + + val comment: Option[String] = Option(view.properties.get(ViewCatalog.PROP_COMMENT)) + + val owner: Option[String] = Option(view.properties.get(ViewCatalog.PROP_OWNER)) + + val createEngineVersion: Option[String] = + Option(view.properties.get(ViewCatalog.PROP_CREATE_ENGINE_VERSION)) + + val properties: Map[String, String] = + view.properties.asScala.toMap -- ViewCatalog.RESERVED_PROPERTIES.asScala +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index be516674f278..7cb645de22c0 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -29,33 +29,58 @@ import org.apache.iceberg.common.DynConstructors import org.apache.iceberg.spark.ExtendedParser import org.apache.iceberg.spark.ExtendedParser.RawOrderField import org.apache.iceberg.spark.Spark3Util -import org.apache.iceberg.spark.source.SparkTable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.analysis.AnalysisContext +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException +import org.apache.spark.sql.catalyst.analysis.ResolvedPersistentView +import org.apache.spark.sql.catalyst.analysis.ResolvedTable +import org.apache.spark.sql.catalyst.analysis.ResolvedTempView +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver +import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedNamespace +import org.apache.spark.sql.catalyst.analysis.UnresolvedTableOrView +import org.apache.spark.sql.catalyst.analysis.UnresolvedView +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.TemporaryViewRelation import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext +import org.apache.spark.sql.catalyst.plans.logical.CreateView +import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.ShowViews +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView +import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View +import org.apache.spark.sql.catalyst.plans.logical.views.ShowIcebergViews +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.trees.Origin -import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.connector.catalog.{View => V2View} +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.CatalogV2Util +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.V1Table +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.VariableSubstitution import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.StructType import scala.jdk.CollectionConverters._ -import scala.util.Try class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface with ExtendedParser { import IcebergSparkSqlExtensionsParser._ private lazy val substitutor = substitutorCtor.newInstance(SQLConf.get) + private lazy val maxIterations = SQLConf.get.analyzerMaxIterations private lazy val astBuilder = new IcebergSqlExtensionsAstBuilder(delegate) /** @@ -122,37 +147,132 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] } else { - delegate.parsePlan(sqlText) + ViewSubstitutionExecutor.execute(delegate.parsePlan(sqlText)) } } - object UnresolvedIcebergTable { + private object ViewSubstitutionExecutor extends RuleExecutor[LogicalPlan] { + private val fixedPoint = FixedPoint( + maxIterations, + errorOnExceed = true, + maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key) - def unapply(plan: LogicalPlan): Option[LogicalPlan] = { - EliminateSubqueryAliases(plan) match { - case UnresolvedRelation(multipartIdentifier, _, _) if isIcebergTable(multipartIdentifier) => - Some(plan) - case _ => + override protected def batches: Seq[Batch] = Seq(Batch("pre-substitution", fixedPoint, V2ViewSubstitution)) + } + + private object V2ViewSubstitution extends Rule[LogicalPlan] { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + // the reason for handling these cases here is because ResolveSessionCatalog exits early for v2 commands + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case u@UnresolvedView(identifier, _, _, _) => + lookupTableOrView(identifier, viewOnly = true).getOrElse(u) + + case u@UnresolvedTableOrView(identifier, _, _) => + lookupTableOrView(identifier).getOrElse(u) + + case CreateView(UnresolvedIdentifier(nameParts, allowTemp), userSpecifiedColumns, + comment, properties, originalText, query, allowExisting, replace) => + CreateIcebergView(UnresolvedIdentifier(nameParts, allowTemp), userSpecifiedColumns, + comment, properties, originalText, query, allowExisting, replace) + + case ShowViews(UnresolvedNamespace(multipartIdentifier), pattern, output) => + ShowIcebergViews(UnresolvedNamespace(multipartIdentifier), pattern, output) + + case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) => + DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) + } + + private def expandIdentifier(nameParts: Seq[String]): Seq[String] = { + if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts + + if (nameParts.length == 1) { + AnalysisContext.get.catalogAndNamespace :+ nameParts.head + } else if (SparkSession.active.sessionState.catalogManager.isCatalogRegistered(nameParts.head)) { + nameParts + } else { + AnalysisContext.get.catalogAndNamespace.head +: nameParts + } + } + + /** + * Resolves relations to `ResolvedTable` or `Resolved[Temp/Persistent]View`. This is + * for resolving DDL and misc commands. Code is copied from Spark's Analyzer, but performs + * a view lookup before performing a table lookup. + */ + private def lookupTableOrView( + identifier: Seq[String], + viewOnly: Boolean = false): Option[LogicalPlan] = { + lookupTempView(identifier).map { tempView => + ResolvedTempView(identifier.asIdentifier, tempView.tableMeta.schema) + }.orElse { + val multipartIdent = expandIdentifier(identifier) + val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(SparkSession.active, multipartIdent.asJava) + if (null != catalogAndIdentifier) { + lookupView(SparkSession.active.sessionState.catalogManager.currentCatalog, + catalogAndIdentifier.identifier()) + .orElse(lookupTable(SparkSession.active.sessionState.catalogManager.currentCatalog, + catalogAndIdentifier.identifier())) + } else { None + } } } - private def isIcebergTable(multipartIdent: Seq[String]): Boolean = { - val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(SparkSession.active, multipartIdent.asJava) - catalogAndIdentifier.catalog match { - case tableCatalog: TableCatalog => - Try(tableCatalog.loadTable(catalogAndIdentifier.identifier)) - .map(isIcebergTable) - .getOrElse(false) + private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty - case _ => - false + private def isReferredTempViewName(nameParts: Seq[String]): Boolean = { + AnalysisContext.get.referredTempViewNames.exists { n => + (n.length == nameParts.length) && n.zip(nameParts).forall { + case (a, b) => resolver(a, b) + } } } - private def isIcebergTable(table: Table): Boolean = table match { - case _: SparkTable => true - case _ => false + private def lookupTempView(identifier: Seq[String]): Option[TemporaryViewRelation] = { + // We are resolving a view and this name is not a temp view when that view was created. We + // return None earlier here. + if (isResolvingView && !isReferredTempViewName(identifier)) return None + SparkSession.active.sessionState.catalogManager.v1SessionCatalog.getRawLocalOrGlobalTempView(identifier) + } + + + private def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = + loadView(catalog, ident).map { + case view if CatalogV2Util.isSessionCatalog(catalog) => + ResolvedPersistentView(catalog, ident, view.schema) + case view => + ResolvedV2View(ViewHelper(catalog).asViewCatalog, ident, view) + } + + private def lookupTable(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = + CatalogV2Util.loadTable(catalog, ident).map { + case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) && + v1Table.v1Table.tableType == CatalogTableType.VIEW => + val v1Ident = v1Table.catalogTable.identifier + val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier) + analysis.ResolvedPersistentView(catalog, v2Ident, v1Table.catalogTable.schema) + case table => + ResolvedTable.create(catalog.asTableCatalog, ident, table) + } + + def loadView(catalog: CatalogPlugin, ident: Identifier): Option[V2View] = catalog match { + case viewCatalog: ViewCatalog => + try { + Option(viewCatalog.loadView(ident)) + } catch { + case _: NoSuchViewException => None + } + case _ => None + } + } + + implicit class ViewHelper(plugin: CatalogPlugin) { + def asViewCatalog: ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/logicalOperators.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/logicalOperators.scala new file mode 100644 index 000000000000..66addf993621 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/logicalOperators.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * A trait for view description used by [[View]] container. + */ +trait ViewDescription { + val identifier: String + val schema: StructType + val viewText: Option[String] + val viewCatalogAndNamespace: Seq[String] + val viewQueryColumnNames: Seq[String] + val properties: Map[String, String] +} + +/** + * View description backed by a [[CatalogTable]]. + * + * @param metadata a CatalogTable + */ +case class CatalogTableViewDescription(metadata: CatalogTable) extends ViewDescription { + override val identifier: String = metadata.identifier.quotedString + override val schema: StructType = metadata.schema + override val viewText: Option[String] = metadata.viewText + override val viewCatalogAndNamespace: Seq[String] = metadata.viewCatalogAndNamespace + override val viewQueryColumnNames: Seq[String] = metadata.viewQueryColumnNames + override val properties: Map[String, String] = metadata.properties +} + +/** + * A container for holding the view description and info whether the view is temporary + * or not. If it's a SQL (temp) view, the child should be a logical plan parsed from the + * `CatalogTable.viewText`. Otherwise, the view is a temporary one created from a dataframe and the + * view description should contain a `VIEW_CREATED_FROM_DATAFRAME` property; in this case, the child + * must be already resolved. + * + * This operator will be removed at the end of analysis stage. + * + * @param desc A view description(CatalogTable) that provides necessary information to resolve the + * view. + * @param isTempView A flag to indicate whether the view is temporary or not. + * @param child The logical plan of a view operator. If the view description is available, it should + * be a logical plan parsed from the view text. + */ +case class IcebergView( + desc: ViewDescription, + isTempView: Boolean, + child: LogicalPlan) extends UnaryNode { + require(!isTempViewStoringAnalyzedPlan || child.resolved) + + override def output: Seq[Attribute] = child.output + + override def metadataOutput: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})" + } + + override def doCanonicalize(): LogicalPlan = child match { + case p: Project if p.resolved && canRemoveProject(p) => p.child.canonicalized + case _ => child.canonicalized + } + + def isTempViewStoringAnalyzedPlan: Boolean = + isTempView && desc.properties.contains(VIEW_STORING_ANALYZED_PLAN) + + // When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view + // output schema doesn't change even if the table referenced by the view is changed after view + // creation. We should remove this extra Project during canonicalize if it does nothing. + // See more details in `SessionCatalog.fromCatalogTable`. + private def canRemoveProject(p: Project): Boolean = { + p.output.length == p.child.output.length && p.projectList.zip(p.child.output).forall { + case (Alias(cast: Cast, name), childAttr) => + cast.child match { + case a: AttributeReference => + a.dataType == cast.dataType && a.name == name && childAttr.semanticEquals(a) + case _ => false + } + case _ => false + } + } + + override protected def withNewChildInternal(newChild: LogicalPlan): IcebergView = + copy(child = newChild) +} + +object IcebergView { + def apply(desc: CatalogTable, isTempView: Boolean, child: LogicalPlan): IcebergView = + IcebergView(CatalogTableViewDescription(desc), isTempView, child) + + def effectiveSQLConf(configs: Map[String, String], isTempView: Boolean): SQLConf = { + val activeConf = SQLConf.get + // For temporary view, we always use captured sql configs + if (activeConf.useCurrentSQLConfigsForView && !isTempView) return activeConf + + val sqlConf = new SQLConf() + // We retain below configs from current session because they are not captured by view + // as optimization configs but they are still needed during the view resolution. + // TODO: remove this `retainedConfigs` after the `RelationConversions` is moved to + // optimization phase. + val retainedConfigs = activeConf.getAllConfs.filterKeys(key => + Seq( + "spark.sql.hive.convertMetastoreParquet", + "spark.sql.hive.convertMetastoreOrc", + "spark.sql.hive.convertInsertingPartitionedTable", + "spark.sql.hive.convertMetastoreCtas" + ).contains(key) || key.startsWith("spark.sql.catalog.")) + for ((k, v) <- configs ++ retainedConfigs) { + sqlConf.settings.put(k, v) + } + sqlConf + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/AlterV2View.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/AlterV2View.scala new file mode 100644 index 000000000000..0f1d735299d1 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/AlterV2View.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.connector.catalog.ViewChange + +case class AlterV2View( + catalog: ViewCatalog, + view: Identifier, + changes: Seq[ViewChange]) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"AlterV2View: ${view.quoted}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala new file mode 100644 index 000000000000..4645d468e9ce --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +case class CreateIcebergView( + child: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: Option[String], + query: LogicalPlan, + allowExisting: Boolean, + replace: Boolean) extends BinaryCommand { + override def left: LogicalPlan = child + + override def right: LogicalPlan = query + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = + copy(child = newLeft, query = newRight) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala new file mode 100644 index 000000000000..d4c10cfaf916 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.types.StructType + +case class CreateV2View( + view: Seq[String], + sql: String, + comment: Option[String], + viewSchema: StructType, + queryColumnNames: Array[String], + columnAliases: Array[String], + columnComments: Array[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"CreateV2View: ${view.quoted}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DescribeV2View.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DescribeV2View.scala new file mode 100644 index 000000000000..b9ec53748f3d --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DescribeV2View.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand + +case class DescribeV2View(desc: V2ViewDescription, isExtended: Boolean) extends LeafCommand { + + override def output: Seq[Attribute] = DescribeCommandSchema.describeTableAttributes() + + override def simpleString(maxFields: Int): String = { + s"DescribeV2View: ${desc.identifier}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala new file mode 100644 index 000000000000..4e663944356a --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.UnaryCommand + +case class DropIcebergView( + child: LogicalPlan, + ifExists: Boolean) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): DropIcebergView = + copy(child = newChild) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/EliminateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/EliminateIcebergView.scala new file mode 100644 index 000000000000..d8db1de5c980 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/EliminateIcebergView.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.analysis.CastSupport +import org.apache.spark.sql.catalyst.plans.logical.IcebergView +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * This rule removes [[IcebergView]] operators from the plan. The operator is respected till the end of + * analysis stage because we want to see which part of an analyzed logical plan is generated from a + * view. + */ +object EliminateIcebergView extends Rule[LogicalPlan] with CastSupport { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case IcebergView(_, _, child) => child + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/RenameV2View.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/RenameV2View.scala new file mode 100644 index 000000000000..7af2f4beab79 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/RenameV2View.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + +case class RenameV2View(catalog: ViewCatalog, + oldView: Identifier, + newView: Identifier) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"RenameV2View ${oldView.quoted} to ${newView.quoted}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala new file mode 100644 index 000000000000..444da27fad88 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.analysis.LeafNodeWithoutStats +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.View +import org.apache.spark.sql.connector.catalog.ViewCatalog + +case class ResolvedV2View( + catalog: ViewCatalog, + identifier: Identifier, + view: View) extends LeafNodeWithoutStats { + override def output: Seq[Attribute] = Nil +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowCreateV2View.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowCreateV2View.scala new file mode 100644 index 000000000000..69ad82c9c2d0 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowCreateV2View.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.types.StringType + +case class ShowCreateV2View(desc: V2ViewDescription) extends LeafCommand { + + override def output: Seq[Attribute] = Seq( + AttributeReference("create_statement", StringType, nullable = false)()) + + override def simpleString(maxFields: Int): String = { + s"ShowCreateV2View" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowIcebergViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowIcebergViews.scala new file mode 100644 index 000000000000..e92f81352a6e --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowIcebergViews.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.ShowViews +import org.apache.spark.sql.catalyst.plans.logical.UnaryCommand + +case class ShowIcebergViews( + namespace: LogicalPlan, + pattern: Option[String], + override val output: Seq[Attribute] = ShowViews.getOutputAttrs) extends UnaryCommand { + + override def child: LogicalPlan = namespace + + override protected def withNewChildInternal(newChild: LogicalPlan): ShowIcebergViews = + copy(namespace = newChild) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowV2ViewProperties.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowV2ViewProperties.scala new file mode 100644 index 000000000000..e165d25da05a --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ShowV2ViewProperties.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.types.StringType + +case class ShowV2ViewProperties(desc: V2ViewDescription, + propertyKey: Option[String]) extends LeafCommand { + + override val output: Seq[Attribute] = Seq( + AttributeReference("key", StringType, nullable = false)(), + AttributeReference("value", StringType, nullable = false)()) + + override def simpleString(maxFields: Int): String = { + s"ShowV2ViewProperties" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterV2ViewExec.scala new file mode 100644 index 000000000000..c14b94aca595 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterV2ViewExec.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.connector.catalog.ViewChange + + +case class AlterV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + changes: Seq[ViewChange]) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.alterView(ident, changes: _*) + + Nil + } + + + override def simpleString(maxFields: Int): String = { + s"AlterV2View: ${ident}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala new file mode 100644 index 000000000000..0efa0b195241 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.types.StructType +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + sql: String, + currentCatalog: String, + currentNamespace: Array[String], + comment: Option[String], + viewSchema: StructType, + queryColumnNames: Array[String], + columnAliases: Array[String], + columnComments: Array[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val createEngineVersion = Some(engineVersion) + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) ++ + createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) + + (ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } + // FIXME: replaceView API doesn't exist in Spark 3.5 + catalog.createView( + ident, + sql, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + catalog.createView( + ident, + sql, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } catch { + case _: ViewAlreadyExistsException if allowExisting => // Ignore + } + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"CreateV2ViewExec: ${ident}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeV2ViewExec.scala new file mode 100644 index 000000000000..90a3242fc4e3 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeV2ViewExec.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + + +case class DescribeV2ViewExec(output: Seq[Attribute], + desc: V2ViewDescription, + isExtended: Boolean) extends V2CommandExec with LeafExecNode { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override protected def run(): Seq[InternalRow] = + if (isExtended) { + (describeSchema :+ emptyRow) ++ describeExtended + } else { + describeSchema + } + + private def describeSchema: Seq[InternalRow] = + desc.schema.map { column => + toCatalystRow( + column.name, + column.dataType.simpleString, + column.getComment().getOrElse("")) + } + + private def emptyRow: InternalRow = toCatalystRow("", "", "") + + private def describeExtended: Seq[InternalRow] = { + val outputColumns = desc.viewQueryColumnNames.mkString("[", ", ", "]") + val tableProperties = desc.properties + .map(p => p._1 + "=" + p._2) + .mkString("[", ", ", "]") + + toCatalystRow("# Detailed View Information", "", "") :: + toCatalystRow("Owner", desc.owner.getOrElse(""), "") :: + toCatalystRow("Comment", desc.comment.getOrElse(""), "") :: + toCatalystRow("View Text", desc.query, "") :: + toCatalystRow("View Catalog and Namespace", desc.viewCatalogAndNamespace.quoted, "") :: + toCatalystRow("View Query Output Columns", outputColumns, "") :: + toCatalystRow("Table Properties", tableProperties, "") :: + toCatalystRow("Created By", desc.createEngineVersion.getOrElse(""), "") :: + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DescribeV2ViewExec" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala new file mode 100644 index 000000000000..04b463947736 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + + +case class DropV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + ifExists: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } else if (!ifExists) { + throw new NoSuchViewException(ident) + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropV2View: ${ident}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 6302d8307a65..3834308e19aa 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -25,6 +25,9 @@ import org.apache.iceberg.spark.SparkSessionCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier +import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.expressions.PredicateHelper @@ -41,13 +44,31 @@ import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable +import org.apache.spark.sql.catalyst.plans.logical.ShowViews +import org.apache.spark.sql.catalyst.plans.logical.views.AlterV2View +import org.apache.spark.sql.catalyst.plans.logical.views.CreateV2View +import org.apache.spark.sql.catalyst.plans.logical.views.DescribeV2View +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView +import org.apache.spark.sql.catalyst.plans.logical.views.RenameV2View +import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View +import org.apache.spark.sql.catalyst.plans.logical.views.ShowCreateV2View +import org.apache.spark.sql.catalyst.plans.logical.views.ShowIcebergViews +import org.apache.spark.sql.catalyst.plans.logical.views.ShowV2ViewProperties +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogPlugin import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.OrderAwareCoalesceExec import org.apache.spark.sql.execution.SparkPlan import scala.jdk.CollectionConverters._ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy with PredicateHelper { + val catalogManager: CatalogManager = spark.sessionState.catalogManager + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case c @ Call(procedure, args) => @@ -90,9 +111,50 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case OrderAwareCoalesce(numPartitions, coalescer, child) => OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil + case CreateV2View( + IcebergViewCatalogAndIdentifier(catalog, ident), sql, comment, viewSchema, queryColumnNames, + columnAliases, columnComments, properties, allowExisting, replace) => + CreateV2ViewExec(catalog, ident, sql, catalogManager.currentCatalog.name, + catalogManager.currentNamespace, comment, viewSchema, queryColumnNames, + columnAliases, columnComments, properties, allowExisting, replace) :: Nil + + case d@DescribeV2View(desc, isExtended) => + DescribeV2ViewExec(d.output, desc, isExtended) :: Nil + + case show@ShowCreateV2View(view) => + ShowCreateV2ViewExec(show.output, view) :: Nil + + case show@ShowCreateTable(ResolvedV2View(_, ident, view), _, _) => + ShowCreateV2ViewExec(show.output, V2ViewDescription(ident.quoted, view)) :: Nil + + case show@ShowV2ViewProperties(view, propertyKey) => + ShowV2ViewPropertiesExec(show.output, view, propertyKey) :: Nil + + case ShowIcebergViews(ResolvedNamespace(catalog, ns), pattern, output) => + ShowV2ViewsExec(output, asViewCatalog(catalog), ns, pattern) :: Nil + + case ShowViews(ResolvedNamespace(catalog, ns), pattern, output) => + ShowV2ViewsExec(output, asViewCatalog(catalog), ns, pattern) :: Nil + + case DropIcebergView(ResolvedIdentifier(catalog, ident), ifExists) => + DropV2ViewExec(asViewCatalog(catalog), ident, ifExists) :: Nil + + case RenameV2View(catalog, oldIdent, newIdent) => + RenameV2ViewExec(catalog, oldIdent, newIdent) :: Nil + + case AlterV2View(catalog, ident, changes) => + AlterV2ViewExec(catalog, ident, changes) :: Nil + case _ => Nil } + private def asViewCatalog(plugin: CatalogPlugin): ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") + } + private def buildInternalRow(exprs: Seq[Expression]): InternalRow = { val values = new Array[Any](exprs.size) for (index <- exprs.indices) { @@ -114,4 +176,18 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi } } } + + private object IcebergViewCatalogAndIdentifier { + def unapply(identifier: Seq[String]): Option[(ViewCatalog, Identifier)] = { + val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark, identifier.asJava) + catalogAndIdentifier.catalog match { + case icebergCatalog: SparkCatalog => + Some((icebergCatalog, catalogAndIdentifier.identifier)) + case icebergCatalog: SparkSessionCatalog[_] => + Some((icebergCatalog, catalogAndIdentifier.identifier)) + case _ => + None + } + } + } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala new file mode 100644 index 000000000000..6594f0b6b924 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + + +case class RenameV2ViewExec( + catalog: ViewCatalog, + oldIdent: Identifier, + newIdent: Identifier) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.renameView(oldIdent, newIdent) + + Seq.empty + } + + + override def simpleString(maxFields: Int): String = { + s"RenameV2View ${oldIdent} to {newIdent}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateV2ViewExec.scala new file mode 100644 index 000000000000..146e0e2b2ee4 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateV2ViewExec.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + + +case class ShowCreateV2ViewExec( + output: Seq[Attribute], + desc: V2ViewDescription) extends V2CommandExec with LeafExecNode { + override protected def run(): Seq[InternalRow] = { + val schema = desc.schema.map(_.name).mkString("(", ", ", ")") + val create = s"CREATE VIEW ${desc.identifier} $schema AS\n${desc.query}\n" + Seq(toCatalystRow(create)) + } + + override def simpleString(maxFields: Int): String = { + s"ShowCreateV2ViewExec" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowV2ViewPropertiesExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowV2ViewPropertiesExec.scala new file mode 100644 index 000000000000..013495a0b8fb --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowV2ViewPropertiesExec.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.V2ViewDescription +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode + + +case class ShowV2ViewPropertiesExec( + output: Seq[Attribute], + desc: V2ViewDescription, + propertyKey: Option[String]) extends V2CommandExec with LeafExecNode { + + override protected def run(): Seq[InternalRow] = { + propertyKey match { + case Some(p) => + val propValue = desc.properties.getOrElse(p, + s"View ${desc.identifier} does not have property: $p") + Seq(toCatalystRow(p, propValue)) + case None => + desc.properties.map { + case (k, v) => toCatalystRow(k, v) + }.toSeq + } + } + + override def simpleString(maxFields: Int): String = { + s"ShowV2ViewPropertiesExec" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowV2ViewsExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowV2ViewsExec.scala new file mode 100644 index 000000000000..1ac8e7dd0839 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowV2ViewsExec.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.execution.LeafExecNode +import scala.collection.mutable.ArrayBuffer + + +case class ShowV2ViewsExec( + output: Seq[Attribute], + catalog: ViewCatalog, + namespace: Seq[String], + pattern: Option[String]) extends V2CommandExec with LeafExecNode { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override protected def run(): Seq[InternalRow] = { + val rows = new ArrayBuffer[InternalRow]() + + val tables = catalog.listViews(namespace :_*) + tables.map { table => + if (pattern.map(StringUtils.filterPattern(Seq(table.name()), _).nonEmpty).getOrElse(true)) { + rows += toCatalystRow(table.namespace().quoted, table.name(), isTempView(table)) + } + } + + rows.toSeq + } + + private def isTempView(ident: Identifier): Boolean = { + catalog match { + case s: V2SessionCatalog => s.isTempView(ident) + case _ => false + } + } + + override def simpleString(maxFields: Int): String = { + s"ShowV2ViewsExec" + } +} diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java new file mode 100644 index 000000000000..31b455047e90 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestViews extends SparkExtensionsTestBase { + private final String tableName = "table"; + private final String viewName = "view"; + + @Before + public void before() { + spark.conf().set("spark.sql.defaultCatalog", SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName()); + sql("CREATE NAMESPACE IF NOT EXISTS default"); + sql("CREATE TABLE %s (id INT, data STRING)", tableName); + } + + @After + public void removeTable() { + sql("DROP VIEW IF EXISTS %s", viewName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName(), + SparkCatalogConfig.SPARK_WITH_VIEWS.implementation(), + SparkCatalogConfig.SPARK_WITH_VIEWS.properties() + } + }; + } + + public TestViews(String catalog, String implementation, Map properties) { + super(catalog, implementation, properties); + } + + @Test + public void createView() throws NoSuchTableException { + insertRows(10); + + sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName); + + List objects = sql("SELECT * FROM %s", viewName); + List expected = + IntStream.rangeClosed(1, 10).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(objects).hasSize(10).containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + public void createMultipleViews() throws NoSuchTableException { + insertRows(6); + String secondView = "secondView"; + + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 3", viewName, tableName); + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id > 3", secondView, tableName); + + List first = sql("SELECT * FROM %s", viewName); + assertThat(first).hasSize(3).containsExactlyInAnyOrder(row(1), row(2), row(3)); + + List second = sql("SELECT * FROM %s", secondView); + assertThat(second).hasSize(3).containsExactlyInAnyOrder(row(4), row(5), row(6)); + } + + @Test + public void createAlreadyExistingView() { + sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create view default.view because it already exists"); + + // using IF NOT EXISTS should work + sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", viewName, tableName); + } + + @Test + public void createViewUsingNonExistingTable() throws NoSuchTableException { + insertRows(10); + + sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName); + + List objects = sql("SELECT * FROM %s", viewName); + List expected = + IntStream.rangeClosed(1, 10).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(objects).hasSize(10).containsExactlyInAnyOrderElementsOf(expected); + } + + private void insertRows(int numRows) throws NoSuchTableException { + List records = Lists.newArrayListWithCapacity(numRows); + for (int i = 1; i <= numRows; i++) { + int val = 'a'; + val += i; + records.add(new SimpleRecord(i, Character.toString((char) val))); + } + + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java index 38f15a42958c..7693a806c7d6 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java @@ -25,6 +25,7 @@ import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.StagingTableCatalog; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.ViewCatalog; import org.apache.spark.sql.connector.iceberg.catalog.Procedure; import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog; @@ -33,7 +34,8 @@ abstract class BaseCatalog ProcedureCatalog, SupportsNamespaces, HasIcebergCatalog, - SupportsFunctions { + SupportsFunctions, + ViewCatalog { @Override public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index eef0f0703bc3..5b40875ffa74 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -46,6 +46,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopCatalog; @@ -61,15 +62,19 @@ import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.source.SparkChangelogTable; import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.spark.source.SparkView; import org.apache.iceberg.spark.source.StagedSparkTable; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.view.UpdateViewProperties; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.StagedTable; @@ -79,6 +84,8 @@ import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange; import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty; import org.apache.spark.sql.connector.catalog.TableChange.SetProperty; +import org.apache.spark.sql.connector.catalog.View; +import org.apache.spark.sql.connector.catalog.ViewChange; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -124,6 +131,7 @@ public class SparkCatalog extends BaseCatalog { private Catalog icebergCatalog = null; private boolean cacheEnabled = CatalogProperties.CACHE_ENABLED_DEFAULT; private SupportsNamespaces asNamespaceCatalog = null; + private ViewCatalog asViewCatalog = null; private String[] defaultNamespace = null; private HadoopTables tables; @@ -570,6 +578,10 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { } } + if (catalog instanceof ViewCatalog) { + this.asViewCatalog = (ViewCatalog) catalog; + } + EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "spark"); EnvironmentContext.put( EnvironmentContext.ENGINE_VERSION, sparkSession.sparkContext().version()); @@ -810,4 +822,123 @@ private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) { public Catalog icebergCatalog() { return icebergCatalog; } + + @Override + public Identifier[] listViews(String... namespace) { + if (null != asViewCatalog) { + return asViewCatalog.listViews(Namespace.of(namespace)).stream() + .map(ident -> Identifier.of(ident.namespace().levels(), ident.name())) + .toArray(Identifier[]::new); + } + + return new Identifier[0]; + } + + @Override + public View loadView(Identifier ident) throws NoSuchViewException { + if (null != asViewCatalog) { + try { + org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); + return new SparkView(view); + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + throw new NoSuchViewException(ident); + } + } + + throw new NoSuchViewException(ident); + } + + @Override + public View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] queryColumnNames, + String[] columnAliases, + String[] columnComments, + Map properties) + throws ViewAlreadyExistsException, NoSuchNamespaceException { + if (null != asViewCatalog) { + Schema icebergSchema = SparkSchemaUtil.convert(schema); + + try { + org.apache.iceberg.view.View view = + asViewCatalog + .buildView(buildIdentifier(ident)) + .withDefaultCatalog(currentCatalog) + .withDefaultNamespace(Namespace.of(currentNamespace)) + .withQuery("spark", sql) + .withSchema(icebergSchema) + .withLocation(properties.get("location")) + .withProperties(Spark3Util.rebuildCreateProperties(properties)) + .create(); + return new SparkView(view); + } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) { + throw new NoSuchNamespaceException(currentNamespace); + } catch (AlreadyExistsException e) { + throw new ViewAlreadyExistsException(ident); + } + } + + throw new UnsupportedOperationException( + "Creating view is not supported by catalog: " + catalogName); + } + + @Override + public View alterView(Identifier ident, ViewChange... changes) + throws NoSuchViewException, IllegalArgumentException { + if (null != asViewCatalog) { + try { + org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); + UpdateViewProperties updateViewProperties = view.updateProperties(); + + for (ViewChange change : changes) { + if (change instanceof ViewChange.SetProperty) { + ViewChange.SetProperty property = (ViewChange.SetProperty) change; + updateViewProperties.set(property.property(), property.value()); + } else if (change instanceof ViewChange.RemoveProperty) { + ViewChange.RemoveProperty remove = (ViewChange.RemoveProperty) change; + updateViewProperties.remove(remove.property()); + } + } + + updateViewProperties.commit(); + + return new SparkView(view); + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + throw new NoSuchViewException(ident); + } + } + + throw new UnsupportedOperationException( + "Altering view is not supported by catalog: " + catalogName); + } + + @Override + public boolean dropView(Identifier ident) { + if (null != asViewCatalog) { + return asViewCatalog.dropView(buildIdentifier(ident)); + } + + return false; + } + + @Override + public void renameView(Identifier fromIdentifier, Identifier toIdentifier) + throws NoSuchViewException, ViewAlreadyExistsException { + if (null != asViewCatalog) { + try { + asViewCatalog.renameView(buildIdentifier(fromIdentifier), buildIdentifier(toIdentifier)); + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + throw new NoSuchViewException(fromIdentifier); + } catch (org.apache.iceberg.exceptions.AlreadyExistsException e) { + throw new ViewAlreadyExistsException(toIdentifier); + } + } else { + throw new UnsupportedOperationException( + "Renaming view is not supported by catalog: " + catalogName); + } + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index 33384e3eff08..87fc57e7145b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -31,8 +31,10 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; import org.apache.spark.sql.connector.catalog.CatalogExtension; import org.apache.spark.sql.connector.catalog.CatalogPlugin; import org.apache.spark.sql.connector.catalog.FunctionCatalog; @@ -44,6 +46,9 @@ import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.View; +import org.apache.spark.sql.connector.catalog.ViewCatalog; +import org.apache.spark.sql.connector.catalog.ViewChange; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; @@ -55,12 +60,14 @@ * @param CatalogPlugin class to avoid casting to TableCatalog, FunctionCatalog and * SupportsNamespaces. */ -public class SparkSessionCatalog +public class SparkSessionCatalog< + T extends TableCatalog & FunctionCatalog & SupportsNamespaces & ViewCatalog> extends BaseCatalog implements CatalogExtension { private static final String[] DEFAULT_NAMESPACE = new String[] {"default"}; private String catalogName = null; private TableCatalog icebergCatalog = null; + private ViewCatalog icebergViewCatalog = null; private StagingTableCatalog asStagingCatalog = null; private T sessionCatalog = null; private boolean createParquetAsIceberg = false; @@ -317,6 +324,10 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { this.asStagingCatalog = (StagingTableCatalog) icebergCatalog; } + if (icebergCatalog instanceof ViewCatalog) { + this.icebergViewCatalog = (ViewCatalog) icebergCatalog; + } + this.createParquetAsIceberg = options.getBoolean("parquet-enabled", createParquetAsIceberg); this.createAvroAsIceberg = options.getBoolean("avro-enabled", createAvroAsIceberg); this.createOrcAsIceberg = options.getBoolean("orc-enabled", createOrcAsIceberg); @@ -395,4 +406,107 @@ public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExcep return getSessionCatalog().loadFunction(ident); } } + + private boolean isViewCatalog() { + return getSessionCatalog() instanceof ViewCatalog; + } + + @Override + public Identifier[] listViews(String... namespace) throws NoSuchNamespaceException { + if (isViewCatalog()) { + // delegate to the session catalog because all views share the same namespace + return getSessionCatalog().listViews(namespace); + } + + return new Identifier[0]; + } + + @Override + public View loadView(Identifier ident) throws NoSuchViewException { + if (null != icebergViewCatalog && icebergViewCatalog.viewExists(ident)) { + return icebergViewCatalog.loadView(ident); + } else if (isViewCatalog() && getSessionCatalog().viewExists(ident)) { + return getSessionCatalog().loadView(ident); + } + + throw new NoSuchViewException(ident); + } + + @Override + public View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] queryColumnNames, + String[] columnAliases, + String[] columnComments, + Map properties) + throws ViewAlreadyExistsException, NoSuchNamespaceException { + if (null != icebergViewCatalog) { + return icebergViewCatalog.createView( + ident, + sql, + currentCatalog, + currentNamespace, + schema, + queryColumnNames, + columnAliases, + columnComments, + properties); + } else if (isViewCatalog()) { + return getSessionCatalog() + .createView( + ident, + sql, + currentCatalog, + currentNamespace, + schema, + queryColumnNames, + columnAliases, + columnComments, + properties); + } + + throw new UnsupportedOperationException( + "Creating view is not supported by catalog: " + catalogName); + } + + @Override + public View alterView(Identifier ident, ViewChange... changes) + throws NoSuchViewException, IllegalArgumentException { + if (null != icebergViewCatalog && icebergViewCatalog.viewExists(ident)) { + return icebergViewCatalog.alterView(ident, changes); + } else if (isViewCatalog()) { + return getSessionCatalog().alterView(ident, changes); + } + + throw new UnsupportedOperationException( + "Altering view is not supported by catalog: " + catalogName); + } + + @Override + public boolean dropView(Identifier ident) { + if (null != icebergViewCatalog && icebergViewCatalog.viewExists(ident)) { + return icebergViewCatalog.dropView(ident); + } else if (isViewCatalog()) { + return getSessionCatalog().dropView(ident); + } + + return false; + } + + @Override + public void renameView(Identifier oldIdent, Identifier newIdent) + throws NoSuchViewException, ViewAlreadyExistsException { + if (null != icebergViewCatalog && icebergViewCatalog.viewExists(oldIdent)) { + icebergViewCatalog.renameView(oldIdent, newIdent); + } else if (isViewCatalog()) { + getSessionCatalog().renameView(oldIdent, newIdent); + } + + throw new UnsupportedOperationException( + "Renaming view is not supported by catalog: " + catalogName); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java new file mode 100644 index 000000000000..df19e3355064 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewOperations; +import org.apache.spark.sql.types.StructType; + +public class SparkView implements org.apache.spark.sql.connector.catalog.View { + + private static final Set RESERVED_PROPERTIES = + ImmutableSet.of("provider", "location", FORMAT_VERSION); + + private final View icebergView; + + public SparkView(View icebergView) { + this.icebergView = icebergView; + } + + public View view() { + return icebergView; + } + + @Override + public String name() { + return icebergView.toString(); + } + + @Override + public String query() { + return Optional.ofNullable(icebergView.sqlFor("spark")) + .orElseThrow(() -> new IllegalStateException("No SQL query found for view " + name())) + .sql(); + } + + @Override + public String currentCatalog() { + return icebergView.currentVersion().defaultCatalog(); + } + + @Override + public String[] currentNamespace() { + return icebergView.currentVersion().defaultNamespace().levels(); + } + + @Override + public StructType schema() { + return SparkSchemaUtil.convert(icebergView.schema()); + } + + @Override + public String[] queryColumnNames() { + return new String[0]; + } + + @Override + public String[] columnAliases() { + return new String[0]; + } + + @Override + public String[] columnComments() { + return new String[0]; + } + + @Override + public Map properties() { + ImmutableMap.Builder propsBuilder = ImmutableMap.builder(); + + propsBuilder.put("provider", "iceberg"); + propsBuilder.put("location", icebergView.location()); + + if (icebergView instanceof BaseView) { + ViewOperations ops = ((BaseView) icebergView).operations(); + propsBuilder.put(FORMAT_VERSION, String.valueOf(ops.current().formatVersion())); + } + + icebergView.properties().entrySet().stream() + .filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey())) + .forEach(propsBuilder::put); + + return propsBuilder.build(); + } + + @Override + public String toString() { + return icebergView.toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (other == null || getClass() != other.getClass()) { + return false; + } + + // use only name in order to correctly invalidate Spark cache + SparkView that = (SparkView) other; + return icebergView.name().equals(that.icebergView.name()); + } + + @Override + public int hashCode() { + // use only name in order to correctly invalidate Spark cache + return icebergView.name().hashCode(); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java index fc18ed3bb174..abfd7da0c7bd 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark; import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; public enum SparkCatalogConfig { @@ -41,7 +43,17 @@ public enum SparkCatalogConfig { "parquet-enabled", "true", "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync - )); + )), + SPARK_WITH_VIEWS( + "spark_with_views", + SparkCatalog.class.getName(), + ImmutableMap.of( + CatalogProperties.CATALOG_IMPL, + InMemoryCatalog.class.getName(), + "default-namespace", + "default", + "cache-enabled", + "false")); private final String catalogName; private final String implementation; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java index 658159894543..a28119499b4b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java @@ -87,7 +87,7 @@ public SparkTestBaseWithCatalog( config.forEach( (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); - if (config.get("type").equalsIgnoreCase("hadoop")) { + if (null != config.get("type") && config.get("type").equalsIgnoreCase("hadoop")) { spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java index 0c6cad7f369c..02a74e5803be 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java @@ -31,8 +31,10 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.ViewCatalog; -public class TestSparkCatalog +public class TestSparkCatalog< + T extends TableCatalog & FunctionCatalog & SupportsNamespaces & ViewCatalog> extends SparkSessionCatalog { private static final Map tableMap = Maps.newHashMap();