-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Support dropping views #9421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
706bb5c to
17641a6
Compare
| plan: LogicalPlan, | ||
| catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { | ||
| plan: LogicalPlan, | ||
| catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting was off here
| errorOnExceed = true, | ||
| maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key) | ||
|
|
||
| override protected def batches: Seq[Batch] = Seq(Batch("pre-substitution", fixedPoint, V2ViewSubstitution)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to call this pre-substitution still? I originally used that because I thought we wanted substitution rules in it. But it turns out that we don't need this for view substitution, only for command hijacking. Maybe a "Hijack Commands" batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it just occurred to me that we may not need an executor at all if we don't need to run to a fixed point. Can we just apply a command hijacking rule by itself instead?
...extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala
Outdated
Show resolved
Hide resolved
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
Show resolved
Hide resolved
....5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
Outdated
Show resolved
Hide resolved
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this looks about ready. The only major things are:
- Do we need to convert all
DropViewcommands to an Iceberg plan and convert back if the catalog isn't a v2 catalog? - Do we still need a rule batch or can we just apply the rule to convert
DropViewand other commands once? I thought we needed a batch for multiple view substitutions, which is no longer the case.
73424e3 to
77efc8d
Compare
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { | ||
| case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) | ||
| if isViewCatalog(catalogManager.currentCatalog) && !isTempView(nameParts) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test is correct. The pattern using UnresolvedIdentifier will match any DROP VIEW plan and convert it to an Iceberg plan. The isViewCatalog check needs to test the catalog responsible for the view, not the current catalog. The current catalog check makes the v1 drop view test work when USE spark_catalog was called, but it would have the wrong behavior if an Iceberg catalog were the current.
I think the logic should use CatalogAndIdentifier from LookupCatalog like Spark does in ResolveCatalogs:
case d@DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists)
if !isTempView(allowTemp, nameParts) =>
nameParts match {
case CatalogAndIdentifier(catalog, ident) if isViewCatalog(catalog) =>
DropIcebergView(ResolvedIdentifier(catalog, ident), ifExists)
case _ =>
d
}That way we know that the command is only replaced if a v2 ViewCatalog is responsible for it.
You can also make this easier for the next time by creating a custom pattern with unapply:
object ResolvedView {
def unapply(unresolved: UnresolvedIdentifier): Option[ResolvedIdentifier] = unresolved match {
case UnresolvedIdentifier(nameParts, true) if isTempView(nameParts) =>
None
case UnresolvedIdentifier(CatalogAndIdentifier(catalog, ident), _) if isViewCatalog(catalog) =>
Some(ResolvedIdentifier(catalog, ident))
case _ =>
None
}
}Then the rule is much simpler:
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropView(ResolvedView(resolved), ifExists) =>
DropIcebergView(resolved, ifExists)
}I've opened nastra#138 with these changes and test updates to catch this case by changing back to the test catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for spotting and fixing this 💯
...rk-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala
Outdated
Show resolved
Hide resolved
| case OrderAwareCoalesce(numPartitions, coalescer, child) => | ||
| OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil | ||
|
|
||
| case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what happens if this isn't a ViewCatalog, but the new rewrite rule should ensure that it always is.
|
I think once this includes the changes from nastra#138, I'm +1. |
307015a to
a420f0b
Compare
|
thanks for reviewing @rdblue. I'll go ahead and merge this, since everything should be addressed |
This PR introduces DROP support (https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-drop-view.html) for Iceberg views and requires a
pre-substitutionbatch inIcebergSparkSqlExtensionsParserbecauseResolveSessionCatalogexits early in https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala#L224-L229 for V2 commands