-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40425][SQL] DROP TABLE does not need to do table lookup #37879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis | |
|
|
||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog} | ||
| import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog} | ||
|
|
||
| /** | ||
| * Resolves the catalog of the name parts for table/view/function/namespace. | ||
|
|
@@ -28,8 +28,14 @@ class ResolveCatalogs(val catalogManager: CatalogManager) | |
| extends Rule[LogicalPlan] with LookupCatalog { | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
| case UnresolvedIdentifier(CatalogAndIdentifier(catalog, identifier)) => | ||
| ResolvedIdentifier(catalog, identifier) | ||
| case UnresolvedIdentifier(nameParts, allowTemp) => | ||
| if (allowTemp && catalogManager.v1SessionCatalog.isTempView(nameParts)) { | ||
| val ident = Identifier.of(nameParts.dropRight(1).toArray, nameParts.last) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea! |
||
| ResolvedIdentifier(FakeSystemCatalog, ident) | ||
| } else { | ||
| val CatalogAndIdentifier(catalog, identifier) = nameParts | ||
| ResolvedIdentifier(catalog, identifier) | ||
| } | ||
| case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) => | ||
| s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace)) | ||
| case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ | |
| import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition | ||
| import org.apache.spark.sql.connector.catalog.functions.UnboundFunction | ||
| import org.apache.spark.sql.types.{DataType, StructField, StructType} | ||
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
|
||
| /** | ||
| * Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to | ||
|
|
@@ -135,7 +136,8 @@ case class UnresolvedFunc( | |
| * Holds the name of a table/view/function identifier that we need to determine the catalog. It will | ||
| * be resolved to [[ResolvedIdentifier]] during analysis. | ||
| */ | ||
| case class UnresolvedIdentifier(nameParts: Seq[String]) extends LeafNode { | ||
| case class UnresolvedIdentifier(nameParts: Seq[String], allowTemp: Boolean = false) | ||
| extends LeafNode { | ||
| override lazy val resolved: Boolean = false | ||
| override def output: Seq[Attribute] = Nil | ||
| } | ||
|
|
@@ -244,3 +246,9 @@ case class ResolvedIdentifier( | |
| identifier: Identifier) extends LeafNodeWithoutStats { | ||
| override def output: Seq[Attribute] = Nil | ||
| } | ||
|
|
||
| // A fake v2 catalog to hold temp views. | ||
| object FakeSystemCatalog extends CatalogPlugin { | ||
| override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {} | ||
| override def name(): String = "SYSTEM" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FAKE_SYSTEM?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the name doesn't matter. We won't show it or look it up for now. But later I think it's a good idea to add a system catalog officially, to host temp view, temp functions and builtin functions. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -685,15 +685,15 @@ class DDLParserSuite extends AnalysisTest { | |
| val cmd = "DROP VIEW" | ||
| val hint = Some("Please use DROP TABLE instead.") | ||
| parseCompare(s"DROP VIEW testcat.db.view", | ||
| DropView(UnresolvedView(Seq("testcat", "db", "view"), cmd, true, hint), ifExists = false)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's still used by commands like |
||
| DropView(UnresolvedIdentifier(Seq("testcat", "db", "view"), true), ifExists = false)) | ||
| parseCompare(s"DROP VIEW db.view", | ||
| DropView(UnresolvedView(Seq("db", "view"), cmd, true, hint), ifExists = false)) | ||
| DropView(UnresolvedIdentifier(Seq("db", "view"), true), ifExists = false)) | ||
| parseCompare(s"DROP VIEW IF EXISTS db.view", | ||
| DropView(UnresolvedView(Seq("db", "view"), cmd, true, hint), ifExists = true)) | ||
| DropView(UnresolvedIdentifier(Seq("db", "view"), true), ifExists = true)) | ||
| parseCompare(s"DROP VIEW view", | ||
| DropView(UnresolvedView(Seq("view"), cmd, true, hint), ifExists = false)) | ||
| DropView(UnresolvedIdentifier(Seq("view"), true), ifExists = false)) | ||
| parseCompare(s"DROP VIEW IF EXISTS view", | ||
| DropView(UnresolvedView(Seq("view"), cmd, true, hint), ifExists = true)) | ||
| DropView(UnresolvedIdentifier(Seq("view"), true), ifExists = true)) | ||
| } | ||
|
|
||
| private def testCreateOrReplaceDdl( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -216,19 +216,23 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |
| c | ||
| } | ||
|
|
||
| case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) => | ||
| case DropTable(ResolvedV1Identifier(ident), ifExists, purge) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am afraid this breaks the session catalog delegation. Previously, we checked the table was
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan @viirya @dongjoon-hyun, could you double check if I missed anything?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked the difference between
If
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I saw for many commands, there is a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yea we should. Can you create a PR? thanks!
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me to switch to V2 DROP.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll have time, probably, on Monday. I'll do that then unless someone gets there earlier.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @aokolnychyi Any update for this? If you don't mind I can finish it this weekend.😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you create a management table in spark 3.5.1, you cannot delete the path when dropping the table. I think this is a bug. It can be deleted correctly in spark 3.3. |
||
| DropTableCommand(ident, ifExists, isView = false, purge = purge) | ||
|
|
||
| case DropTable(_: ResolvedPersistentView, ifExists, purge) => | ||
| throw QueryCompilationErrors.cannotDropViewWithDropTableError | ||
|
|
||
| // v1 DROP TABLE supports temp view. | ||
| case DropTable(ResolvedTempView(ident, _), ifExists, purge) => | ||
| DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge) | ||
| case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) => | ||
| DropTempViewCommand(ident) | ||
|
|
||
| case DropView(ResolvedViewIdentifier(ident), ifExists) => | ||
| case DropView(ResolvedV1Identifier(ident), ifExists) => | ||
| DropTableCommand(ident, ifExists, isView = true, purge = false) | ||
|
|
||
| case DropView(r @ ResolvedIdentifier(catalog, ident), _) => | ||
| if (catalog == FakeSystemCatalog) { | ||
| DropTempViewCommand(ident) | ||
| } else { | ||
| throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views") | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: case DropView(ResolvedIdentifier(FakeSystemCatalog, ident), _) =>
DropTempViewCommand(ident)
case DropView(ResolvedIdentifier(catalog, _), _) =>
throw ... |
||
|
|
||
| case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command => | ||
| val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT) | ||
| val location = c.properties.get(SupportsNamespaces.PROP_LOCATION) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ import org.apache.spark.internal.config.ConfigEntry | |
| import org.apache.spark.sql.{Dataset, SparkSession} | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} | ||
| import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint | ||
| import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View} | ||
| import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION | ||
| import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper | ||
| import org.apache.spark.sql.execution.columnar.InMemoryRelation | ||
|
|
@@ -159,11 +159,51 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |
| plan: LogicalPlan, | ||
| cascade: Boolean, | ||
| blocking: Boolean = false): Unit = { | ||
| uncacheQuery(spark, _.sameResult(plan), cascade, blocking) | ||
| } | ||
|
|
||
| def uncacheTableOrView(spark: SparkSession, name: Seq[String], cascade: Boolean): Unit = { | ||
| uncacheQuery( | ||
| spark, | ||
| isMatchedTableOrView(_, name, spark.sessionState.conf), | ||
| cascade, | ||
| blocking = false) | ||
| } | ||
|
|
||
| private def isMatchedTableOrView(plan: LogicalPlan, name: Seq[String], conf: SQLConf): Boolean = { | ||
| def isSameName(nameInCache: Seq[String]): Boolean = { | ||
| nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled) | ||
| } | ||
|
|
||
| plan match { | ||
| case SubqueryAlias(ident, LogicalRelation(_, _, Some(catalogTable), _)) => | ||
| val v1Ident = catalogTable.identifier | ||
| isSameName(ident.qualifier :+ ident.name) && | ||
| isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) | ||
|
|
||
| case SubqueryAlias(ident, DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _)) => | ||
| isSameName(ident.qualifier :+ ident.name) && | ||
| isSameName(catalog.name() +: v2Ident.namespace() :+ v2Ident.name()) | ||
|
Comment on lines
+184
to
+186
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does SubqueryAlias have same name as the underlying relation?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, see |
||
|
|
||
| case SubqueryAlias(ident, View(catalogTable, _, _)) => | ||
| val v1Ident = catalogTable.identifier | ||
| isSameName(ident.qualifier :+ ident.name) && | ||
| isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) | ||
|
|
||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| def uncacheQuery( | ||
| spark: SparkSession, | ||
| isMatchedPlan: LogicalPlan => Boolean, | ||
| cascade: Boolean, | ||
| blocking: Boolean): Unit = { | ||
| val shouldRemove: LogicalPlan => Boolean = | ||
| if (cascade) { | ||
| _.exists(_.sameResult(plan)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm |
||
| _.exists(isMatchedPlan) | ||
| } else { | ||
| _.sameResult(plan) | ||
| isMatchedPlan | ||
| } | ||
| val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) | ||
| this.synchronized { | ||
|
|
@@ -187,7 +227,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { | |
| // will keep it as it is. It means the physical plan has been re-compiled already in the | ||
| // other thread. | ||
| val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded | ||
| cd.plan.exists(_.sameResult(plan)) && !cacheAlreadyLoaded | ||
| cd.plan.exists(isMatchedPlan) && !cacheAlreadyLoaded | ||
| }) | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just nit. If we use IDed TODO with JIRA id, some contributor can pick up the item more easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't create a JIRA for this TODO because @MaxGekk will fix it shortly (we talked offline) :)