-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Add support for Iceberg views #9332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
0cb895d to
d5ae7ca
Compare
| /** | ||
| * Resolve views in CREATE VIEW and ALTER VIEW AS plans and convert them to logical plans. | ||
| */ | ||
| case class CreateViewAnalysis(spark: SparkSession) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic is almost identical to the CreateViewAnalysis in apache/spark#44197, which a few minor tweaks around using the correct case class
| protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| case u@UnresolvedRelation(nameParts, _, _) if catalogManager.v1SessionCatalog.isTempView(nameParts) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these first 2 case classes are from apache/spark#44197 Analyzer#ViewSubstitution
| .map(createViewRelation(parts.quoted, _)) | ||
| .getOrElse(u) | ||
|
|
||
| case ShowCreateTable(ResolvedV2View(_, ident, view), _, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of the other cases are handled in ResolveCatalogs in apache/spark#44197 but we can't override that behavior, and thus handle those cases here
| // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name. | ||
| // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names | ||
| // with it, instead of current catalog and namespace. | ||
| def resolveViews(plan: LogicalPlan): LogicalPlan = plan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in apache/spark#44197 resolving views happens in the Analyzer
| if (catalog.viewExists(ident)) { | ||
| catalog.dropView(ident) | ||
| } | ||
| // FIXME: replaceView API doesn't exist in Spark 3.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently the ViewCatalog in Spark doesn't have a replaceView(..) API. This feature was added with apache/spark#43677 but requires a Spark release + version bump
d5ae7ca to
23cc43f
Compare
23cc43f to
c53e4e4
Compare
...rk-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CreateViewAnalysis.scala
Show resolved
Hide resolved
.../scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
Show resolved
Hide resolved
| if (!isResolvingView || isReferredTempViewName(nameParts)) return nameParts | ||
|
|
||
| if (nameParts.length == 1) { | ||
| AnalysisContext.get.catalogAndNamespace :+ nameParts.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is AnalysisContext maintained by this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt we want to use AnalysisContext. If I remember correctly from the original Netflix rules, we avoided context by rewriting unresolved references in the table to use fully-qualified identifiers. Can we do that instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code was copied 1:1 from Spark's Analyzer here so that we can plug-in the lookup of views.
|
I should be able to take a look this week. |
|
|
||
| case _ => | ||
| false | ||
| private def isReferredTempViewName(nameParts: Seq[String]): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is needed for Iceberg views because Iceberg views cannot reference anything other than tables or other Iceberg views. Here's the Javadoc for referredTempViewNames:
All the temp view names referred by the current view we are resolving. It's used to make sure the relation resolution is consistent between view creation and view resolution. For example, if
twas a permanent table when the current view was created, it should still be a permanent table when resolving the current view, even if a temp viewthas been created.
Because we know that all references must be tables, the set of names that can be resolved as temporary views should always be empty.
| private def lookupTempView(identifier: Seq[String]): Option[TemporaryViewRelation] = { | ||
| // We are resolving a view and this name is not a temp view when that view was created. We | ||
| // return None earlier here. | ||
| if (isResolvingView && !isReferredTempViewName(identifier)) return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because an Iceberg view must not reference a temporary view, this check isn't needed and I don't think we will need isResolvingView either since that references the AnalysisContext.
I think that the only time this should resolve temporary views is when the original query (as parsed) references a temporary view. Any time there is a single-part identifier in a view, it should be resolved using the view's default catalog and namespace.
In addition, we need to ensure that there is no conflicting temporary view definition when a view is created. We'll need to check that any single-part identifier in the view SQL does not represent a temporary view at creation time.
| v1Table.v1Table.tableType == CatalogTableType.VIEW => | ||
| val v1Ident = v1Table.catalogTable.identifier | ||
| val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier) | ||
| analysis.ResolvedPersistentView(catalog, v2Ident, v1Table.catalogTable.schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we need this. The rules here only need to detect and resolve Iceberg views. We don't want to resolve v1 views because that logic may change and we don't want to alter the behavior of existing views.
| identifier: Seq[String], | ||
| viewOnly: Boolean = false): Option[LogicalPlan] = { | ||
| lookupTempView(identifier).map { tempView => | ||
| ResolvedTempView(identifier.asIdentifier, tempView.tableMeta.schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to resolve a temp view. We only need to skip it and let the resolution logic in Spark handle resolution.
|
|
||
| private def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = | ||
| loadView(catalog, ident).map { | ||
| case view if CatalogV2Util.isSessionCatalog(catalog) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there a different path for the session catalog? I think it should only matter that the view was a v2 view.
.../scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
Show resolved
Hide resolved
| import org.apache.spark.sql.connector.catalog.View | ||
| import org.apache.spark.sql.connector.catalog.ViewCatalog | ||
|
|
||
| case class ResolvedV2View( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this resolved? The view hasn't been parsed yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| throw new AnalysisException( | ||
| s"Cannot move view between catalogs: from=$oldCatalog and to=$newCatalog") | ||
|
|
||
| def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a copy of lookupView from the extensions?
| // look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name. | ||
| // If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names | ||
| // with it, instead of current catalog and namespace. | ||
| def resolveViews(plan: LogicalPlan): LogicalPlan = plan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anywhere we ensure the view isn't stale, and if it is we fail? For example if the output schema of the view does not match the computed schema of the query? This can happen if the table's schema evolves but the view is not updated to reflect that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make sure to handle this in #9340 and also have a test where the schema of the table evolved
|
We discussed internally to split this PR out into smaller pieces. I've created #9340 to split out creation + reading of views. Additional PRs will follow. |
|
Closing this as all of the functionality has been merged individually |

This is currently a WIP and still requires tests and a few improvements.
Most code is from apache/spark#44197 but there were a few workarounds required to make this work without actually requiring changes from the Spark repo.
This PR can be considered an umbrella PR. I've created separate smaller PRs for easier reviewing: