-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24596][SQL] Non-cascading Cache Invalidation #21594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
27e484b
483008c
a782aac
0cd8dc1
71b93ed
b9f1507
4171062
c3a8e92
d97149d
2f00f2f
bf42fdf
f7d48e5
bfda8c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -105,24 +105,58 @@ class CacheManager extends Logging { | |
| } | ||
|
|
||
| /** | ||
| * Un-cache all the cache entries that refer to the given plan. | ||
| * Un-cache the given plan or all the cache entries that refer to the given plan. | ||
| * @param query The [[Dataset]] to be un-cached. | ||
| * @param cascade If true, un-cache all the cache entries that refer to the given | ||
| * [[Dataset]]; otherwise un-cache the given [[Dataset]] only. | ||
| * @param blocking Whether to block until all blocks are deleted. | ||
| */ | ||
| def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { | ||
| uncacheQuery(query.sparkSession, query.logicalPlan, blocking) | ||
| def uncacheQuery(query: Dataset[_], | ||
| cascade: Boolean, blocking: Boolean = true): Unit = writeLock { | ||
|
||
| uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking) | ||
| } | ||
|
|
||
| /** | ||
| * Un-cache all the cache entries that refer to the given plan. | ||
| * Un-cache the given plan or all the cache entries that refer to the given plan. | ||
| * @param spark The Spark session. | ||
| * @param plan The plan to be un-cached. | ||
| * @param cascade If true, un-cache all the cache entries that refer to the given | ||
| * plan; otherwise un-cache the given plan only. | ||
| * @param blocking Whether to block until all blocks are deleted. | ||
| */ | ||
| def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: Boolean): Unit = writeLock { | ||
| def uncacheQuery(spark: SparkSession, plan: LogicalPlan, | ||
| cascade: Boolean, blocking: Boolean): Unit = writeLock { | ||
|
||
| val shouldRemove: LogicalPlan => Boolean = | ||
| if (cascade) { | ||
| _.find(_.sameResult(plan)).isDefined | ||
| } else { | ||
| _.sameResult(plan) | ||
| } | ||
| val it = cachedData.iterator() | ||
| while (it.hasNext) { | ||
| val cd = it.next() | ||
| if (cd.plan.find(_.sameResult(plan)).isDefined) { | ||
| if (shouldRemove(cd.plan)) { | ||
| cd.cachedRepresentation.cacheBuilder.clearCache(blocking) | ||
| it.remove() | ||
| } | ||
| } | ||
| // Re-compile dependent cached queries after removing the cached query. | ||
| if (!cascade) { | ||
| val it = cachedData.iterator() | ||
| val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData] | ||
| while (it.hasNext) { | ||
| val cd = it.next() | ||
| if (cd.plan.find(_.sameResult(plan)).isDefined) { | ||
| it.remove() | ||
| val plan = spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan | ||
| val newCache = InMemoryRelation( | ||
|
||
| cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan), | ||
| logicalPlan = cd.plan) | ||
| needToRecache += cd.copy(cachedRepresentation = newCache) | ||
| } | ||
| } | ||
| needToRecache.foreach(cachedData.add) | ||
|
||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -189,8 +189,9 @@ case class DropTableCommand( | |
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val catalog = sparkSession.sessionState.catalog | ||
| val isTempTable = catalog.isTemporaryTable(tableName) | ||
|
||
|
|
||
| if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) { | ||
| if (!isTempTable && catalog.tableExists(tableName)) { | ||
| // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view | ||
| // issue an exception. | ||
| catalog.getTableMetadata(tableName).tableType match { | ||
|
|
@@ -204,9 +205,10 @@ case class DropTableCommand( | |
| } | ||
| } | ||
|
|
||
| if (catalog.isTemporaryTable(tableName) || catalog.tableExists(tableName)) { | ||
| if (isTempTable || catalog.tableExists(tableName)) { | ||
| try { | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery( | ||
| sparkSession.table(tableName), !isTempTable) | ||
|
||
| } catch { | ||
| case NonFatal(e) => log.warn(e.toString, e) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -493,7 +493,7 @@ case class TruncateTableCommand( | |
| spark.sessionState.refreshTable(tableName.unquotedString) | ||
| // Also try to drop the contents of the table from the columnar cache | ||
| try { | ||
| spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier)) | ||
| spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), true) | ||
|
||
| } catch { | ||
| case NonFatal(e) => | ||
| log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -364,7 +364,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { | |
| */ | ||
| override def dropTempView(viewName: String): Boolean = { | ||
| sparkSession.sessionState.catalog.getTempView(viewName).exists { viewDef => | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, viewDef, blocking = true) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery( | ||
| sparkSession, viewDef, cascade = false, blocking = true) | ||
| sessionCatalog.dropTempView(viewName) | ||
| } | ||
| } | ||
|
|
@@ -379,7 +380,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { | |
| */ | ||
| override def dropGlobalTempView(viewName: String): Boolean = { | ||
| sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef => | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, viewDef, blocking = true) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery( | ||
| sparkSession, viewDef, cascade = false, blocking = true) | ||
| sessionCatalog.dropGlobalTempView(viewName) | ||
| } | ||
| } | ||
|
|
@@ -438,7 +440,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { | |
| * @since 2.0.0 | ||
| */ | ||
| override def uncacheTable(tableName: String): Unit = { | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) | ||
| val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery( | ||
| sparkSession.table(tableName), !sessionCatalog.isTemporaryTable(tableIdent)) | ||
|
||
| } | ||
|
|
||
| /** | ||
|
|
@@ -490,7 +494,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { | |
| // cached version and make the new version cached lazily. | ||
| if (isCached(table)) { | ||
| // Uncache the logicalPlan. | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(table, true, blocking = true) | ||
|
||
| // Cache it again. | ||
| sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableIdent.table)) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also update the comment of line 2966 and line 2979 and explain the new behavior