-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Spark: Support creating views via SQL #9423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
92eba8f
5153b4b
6559f72
ffe9fba
7b86876
71a00a4
f348f8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView | ||
| import org.apache.spark.sql.connector.catalog.Identifier | ||
| import org.apache.spark.sql.connector.catalog.ViewCatalog | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.util.SchemaUtils | ||
|
|
||
| object CheckViews extends (LogicalPlan => Unit) { | ||
|
|
||
| override def apply(plan: LogicalPlan): Unit = { | ||
| plan foreach { | ||
| case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, | ||
| queryColumnNames, _, _, _, _, _) => | ||
| verifyColumnCount(ident, columnAliases, query) | ||
| SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) | ||
|
|
||
| case _ => // OK | ||
| } | ||
| } | ||
|
|
||
| private def verifyColumnCount(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = { | ||
| if (columns.nonEmpty) { | ||
| if (columns.length > query.output.length) { | ||
| throw new AnalysisException( | ||
| errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", | ||
| messageParameters = Map( | ||
| "viewName" -> ident.toString, | ||
| "viewColumns" -> columns.mkString(", "), | ||
| "dataColumns" -> query.output.map(c => c.name).mkString(", "))) | ||
| } else if (columns.length < query.output.length) { | ||
| throw new AnalysisException( | ||
| errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", | ||
| messageParameters = Map( | ||
| "viewName" -> ident.toString, | ||
| "viewColumns" -> columns.mkString(", "), | ||
| "dataColumns" -> query.output.map(c => c.name).mkString(", "))) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException | |
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.plans.logical.Project | ||
| import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias | ||
| import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView | ||
| import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.catalyst.trees.CurrentOrigin | ||
|
|
@@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog | |
| import org.apache.spark.sql.connector.catalog.View | ||
| import org.apache.spark.sql.connector.catalog.ViewCatalog | ||
| import org.apache.spark.sql.errors.QueryCompilationErrors | ||
| import org.apache.spark.sql.types.MetadataBuilder | ||
|
|
||
| case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { | ||
|
|
||
|
|
@@ -59,6 +61,33 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look | |
| loadView(catalog, ident) | ||
| .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) | ||
| .getOrElse(u) | ||
|
|
||
| case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, _) | ||
| if query.resolved && !c.rewritten => | ||
| val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nastra, this can't be done here. It needs to be done before any of Spark's rules run. Otherwise, a temporary view may already have been substituted into the plan and could allow the checks to pass even though the view is invalid. This rule is only responsible for applying the column aliases and comments. Rewriting the identifiers should be done in |
||
| val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments) | ||
| c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true) | ||
| } | ||
|
|
||
| private def aliasPlan( | ||
|
nastra marked this conversation as resolved.
Outdated
|
||
| analyzedPlan: LogicalPlan, | ||
|
nastra marked this conversation as resolved.
Outdated
|
||
| columnAliases: Seq[String], | ||
| columnComments: Seq[Option[String]]): LogicalPlan = { | ||
| if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) { | ||
| analyzedPlan | ||
|
nastra marked this conversation as resolved.
Outdated
|
||
| } else { | ||
| val projectList = analyzedPlan.output.zipWithIndex.map { case (_, pos) => | ||
| val column = GetColumnByOrdinal(pos, analyzedPlan.schema.fields.apply(pos).dataType) | ||
|
nastra marked this conversation as resolved.
Outdated
|
||
|
|
||
| if (columnComments.apply(pos).isDefined) { | ||
| val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build() | ||
| Alias(column, columnAliases.apply(pos))(explicitMetadata = Some(meta)) | ||
| } else { | ||
| Alias(column, columnAliases.apply(pos))() | ||
| } | ||
| } | ||
| Project(projectList, analyzedPlan) | ||
| } | ||
| } | ||
|
|
||
| def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { | ||
|
|
@@ -151,7 +180,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look | |
| } | ||
|
|
||
|
|
||
| implicit class ViewHelper(plugin: CatalogPlugin) { | ||
| implicit class IcebergViewHelper(plugin: CatalogPlugin) { | ||
|
nastra marked this conversation as resolved.
|
||
| def asViewCatalog: ViewCatalog = plugin match { | ||
| case viewCatalog: ViewCatalog => | ||
| viewCatalog | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,13 +19,19 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.analysis | ||
|
|
||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.expressions.SubqueryExpression | ||
| import org.apache.spark.sql.catalyst.plans.logical.CreateView | ||
| import org.apache.spark.sql.catalyst.plans.logical.DropView | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.catalyst.plans.logical.View | ||
| import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView | ||
| import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.connector.catalog.CatalogManager | ||
| import org.apache.spark.sql.connector.catalog.CatalogPlugin | ||
| import org.apache.spark.sql.connector.catalog.Identifier | ||
| import org.apache.spark.sql.connector.catalog.LookupCatalog | ||
| import org.apache.spark.sql.connector.catalog.ViewCatalog | ||
|
|
||
|
|
@@ -40,6 +46,19 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi | |
| override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { | ||
| case DropView(ResolvedView(resolved), ifExists) => | ||
| DropIcebergView(resolved, ifExists) | ||
|
|
||
| case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, | ||
| Some(queryText), query, allowExisting, replace) => | ||
| verifyTemporaryObjectsDontExist(resolved.identifier, query) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the query needs to have CTE substitution run before running the temporary object check. Otherwise, there could be conflicts between CTE substitution and temporary names. If there is a conflict, the CTE should be used.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're absolutely right. I've done that and also added a test |
||
| CreateIcebergView(child = resolved, | ||
| queryText = queryText, | ||
| query = query, | ||
| columnAliases = userSpecifiedColumns.map(_._1), | ||
| columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)), | ||
| comment = comment, | ||
| properties = properties, | ||
| allowExisting = allowExisting, | ||
| replace = replace) | ||
| } | ||
|
|
||
| private def isTempView(nameParts: Seq[String]): Boolean = { | ||
|
|
@@ -62,4 +81,45 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi | |
| None | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Permanent views are not allowed to reference temp objects | ||
| */ | ||
| private def verifyTemporaryObjectsDontExist( | ||
| name: Identifier, | ||
| child: LogicalPlan): Unit = { | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ | ||
|
|
||
| val tempViews = collectTemporaryViews(child) | ||
| tempViews.foreach { nameParts => | ||
| throw new AnalysisException( | ||
| errorClass = "INVALID_TEMP_OBJ_REFERENCE", | ||
| messageParameters = Map( | ||
| "obj" -> "VIEW", | ||
| "objName" -> name.name(), | ||
| "tempObj" -> "VIEW", | ||
| "tempObjName" -> nameParts.quoted)) | ||
| } | ||
|
|
||
| // TODO: check for temp function names | ||
| } | ||
|
|
||
| /** | ||
| * Collect all temporary views and return the identifiers separately | ||
| */ | ||
| private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = { | ||
| def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = { | ||
| child.flatMap { | ||
| case unresolved: UnresolvedRelation if isTempView(unresolved.multipartIdentifier) => | ||
| Seq(unresolved.multipartIdentifier) | ||
| case view: View if view.isTempView => Seq(view.desc.identifier.nameParts) | ||
| case plan => plan.expressions.flatMap(_.flatMap { | ||
| case e: SubqueryExpression => collectTempViews(e.plan) | ||
| case _ => Seq.empty | ||
| }) | ||
| }.distinct | ||
| } | ||
|
|
||
| collectTempViews(child) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.plans.logical.views | ||
|
|
||
| import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
|
|
||
| case class CreateIcebergView( | ||
|
rdblue marked this conversation as resolved.
|
||
| child: LogicalPlan, | ||
| queryText: String, | ||
| query: LogicalPlan, | ||
| columnAliases: Seq[String], | ||
| columnComments: Seq[Option[String]], | ||
| queryColumnNames: Seq[String] = Seq.empty, | ||
| comment: Option[String], | ||
| properties: Map[String, String], | ||
| allowExisting: Boolean, | ||
| replace: Boolean, | ||
| rewritten: Boolean = false) extends BinaryCommand { | ||
| override def left: LogicalPlan = child | ||
|
|
||
| override def right: LogicalPlan = query | ||
|
|
||
| override protected def withNewChildrenInternal( | ||
| newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = | ||
| copy(child = newLeft, query = newRight) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.v2 | ||
|
|
||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
| import org.apache.spark.sql.connector.catalog.Identifier | ||
| import org.apache.spark.sql.connector.catalog.ViewCatalog | ||
| import org.apache.spark.sql.types.StructType | ||
| import scala.collection.JavaConverters._ | ||
|
|
||
|
|
||
| case class CreateV2ViewExec( | ||
|
nastra marked this conversation as resolved.
|
||
| catalog: ViewCatalog, | ||
| ident: Identifier, | ||
| queryText: String, | ||
| viewSchema: StructType, | ||
| columnAliases: Seq[String], | ||
| columnComments: Seq[Option[String]], | ||
| queryColumnNames: Seq[String], | ||
| comment: Option[String], | ||
| properties: Map[String, String], | ||
| allowExisting: Boolean, | ||
| replace: Boolean) extends LeafV2CommandExec { | ||
|
|
||
| override lazy val output: Seq[Attribute] = Nil | ||
|
|
||
| override protected def run(): Seq[InternalRow] = { | ||
| val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name | ||
| val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null | ||
| val currentNamespace = session.sessionState.catalogManager.currentNamespace | ||
|
|
||
| val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION | ||
| val newProperties = properties ++ | ||
| comment.map(ViewCatalog.PROP_COMMENT -> _) + | ||
| (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion, | ||
| ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) | ||
|
|
||
| if (replace) { | ||
| // CREATE OR REPLACE VIEW | ||
| if (catalog.viewExists(ident)) { | ||
| catalog.dropView(ident) | ||
| } | ||
| // FIXME: replaceView API doesn't exist in Spark 3.5 | ||
| catalog.createView( | ||
| ident, | ||
| queryText, | ||
| currentCatalog, | ||
| currentNamespace, | ||
| viewSchema, | ||
| queryColumnNames.toArray, | ||
| columnAliases.toArray, | ||
| columnComments.map(c => c.orNull).toArray, | ||
| newProperties.asJava) | ||
| } else { | ||
| try { | ||
| // CREATE VIEW [IF NOT EXISTS] | ||
| catalog.createView( | ||
| ident, | ||
| queryText, | ||
| currentCatalog, | ||
| currentNamespace, | ||
| viewSchema, | ||
| queryColumnNames.toArray, | ||
| columnAliases.toArray, | ||
| columnComments.map(c => c.orNull).toArray, | ||
| newProperties.asJava) | ||
| } catch { | ||
| case _: ViewAlreadyExistsException if allowExisting => // Ignore | ||
| } | ||
| } | ||
|
|
||
| Nil | ||
| } | ||
|
|
||
| override def simpleString(maxFields: Int): String = { | ||
| s"CreateV2ViewExec: ${ident}" | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable | |
| import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField | ||
| import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields | ||
| import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering | ||
| import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView | ||
| import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView | ||
| import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View | ||
| import org.apache.spark.sql.connector.catalog.Identifier | ||
|
|
@@ -107,6 +108,21 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi | |
| case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => | ||
| DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil | ||
|
|
||
| case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, | ||
| columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) => | ||
| CreateV2ViewExec( | ||
| catalog = viewCatalog, | ||
| ident = ident, | ||
| queryText = queryText, | ||
| columnAliases = columnAliases, | ||
| columnComments = columnComments, | ||
| queryColumnNames = queryColumnNames, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd also be fine with not tracking this and passing an empty array through. That's not correct for Spark, but it would work for us since Iceberg doesn't store these.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically we don't need to track it in this PR but we'll eventually need it when we show the properties of views and such. I'd probably keep it here but let me know if you'd like me to remove it here and introduce it in an upcoming PR
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems reasonable to include it. Let's just leave it as-is since you even have a test for it. |
||
| viewSchema = query.schema, | ||
| comment = comment, | ||
| properties = properties, | ||
| allowExisting = allowExisting, | ||
| replace = replace) :: Nil | ||
|
|
||
| case _ => Nil | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.