Skip to content

Commit 4934da5

Browse files
sunchaocloud-fan
authored andcommitted
[SPARK-33305][SQL] DSv2: DROP TABLE command should also invalidate cache
### What changes were proposed in this pull request? This changes `DropTableExec` to also invalidate caches referencing the table to be dropped, in a cascading manner. ### Why are the changes needed? In DSv1, `DROP TABLE` command also invalidate caches as described in [SPARK-19765](https://issues.apache.org/jira/browse/SPARK-19765). However in DSv2 the same command only drops the table but doesn't handle the caches. This could lead to correctness issue. ### Does this PR introduce _any_ user-facing change? Yes. Now DSv2 `DROP TABLE` command also invalidates cache. ### How was this patch tested? Added a new UT Closes apache#30211 from sunchao/SPARK-33305. Authored-by: Chao Sun <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 27bb40b commit 4934da5

File tree

3 files changed

+23
-2
lines changed

3 files changed

+23
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
229229
throw new AnalysisException("Describing columns is not supported for v2 tables.")
230230

231231
case DropTable(r: ResolvedTable, ifExists, purge) =>
232-
DropTableExec(r.catalog, r.identifier, ifExists, purge) :: Nil
232+
DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil
233233

234234
case _: NoopDropTable =>
235235
LocalTableScanExec(Nil, Nil) :: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,27 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20+
import org.apache.spark.sql.SparkSession
2021
import org.apache.spark.sql.catalyst.InternalRow
2122
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
2223
import org.apache.spark.sql.catalyst.expressions.Attribute
23-
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
24+
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
2425

2526
/**
2627
* Physical plan node for dropping a table.
2728
*/
2829
case class DropTableExec(
30+
session: SparkSession,
2931
catalog: TableCatalog,
32+
table: Table,
3033
ident: Identifier,
3134
ifExists: Boolean,
3235
purge: Boolean) extends V2CommandExec {
3336

3437
override def run(): Seq[InternalRow] = {
3538
if (catalog.tableExists(ident)) {
39+
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
40+
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
3641
catalog.dropTable(ident, purge)
3742
} else if (!ifExists) {
3843
throw new NoSuchTableException(ident)

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,22 @@ class DataSourceV2SQLSuite
784784
}
785785
}
786786

787+
test("SPARK-33305: DROP TABLE should also invalidate cache") {
788+
val t = "testcat.ns.t"
789+
val view = "view"
790+
withTable(t) {
791+
withTempView(view) {
792+
sql(s"CREATE TABLE $t USING foo AS SELECT id, data FROM source")
793+
sql(s"CACHE TABLE $view AS SELECT id FROM $t")
794+
checkAnswer(sql(s"SELECT * FROM $t"), spark.table("source"))
795+
checkAnswer(sql(s"SELECT * FROM $view"), spark.table("source").select("id"))
796+
797+
sql(s"DROP TABLE $t")
798+
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(view)).isEmpty)
799+
}
800+
}
801+
}
802+
787803
test("Relation: basic") {
788804
val t1 = "testcat.ns1.ns2.tbl"
789805
withTable(t1) {

0 commit comments

Comments
 (0)