-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24596][SQL] Non-cascading Cache Invalidation #21594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #92105 has finished for PR 21594 at commit
|
|
Test build #92107 has finished for PR 21594 at commit
|
|
Test build #92108 has finished for PR 21594 at commit
|
|
IMHO it is good, but may confuse users. Could you please add some JavaDocs to explain the difference? |
| } | ||
|
|
||
| /** | ||
| * Un-cache all the cache entries that refer to the given plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update this document.
| cd.cachedRepresentation.cacheBuilder.clearCache(blocking) | ||
| } else { | ||
| val plan = spark.sessionState.executePlan(cd.plan).executedPlan | ||
| val newCache = InMemoryRelation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, if the plan to uncache is iterated after a plan containing it, doesn't this still use its cached plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right, although it wouldn't lead to any error just like all other compiled dataframes that refer to this old InMemoryRelation. I'll change this piece of code. But you've brought out another interesting question:
A scenario similar to what you've mentioned:
df2 = df1.filter(...)
df2.cache()
df1.cache()
df1.collect()
, which means we cache the dependent cache first and the cache being depended upon next. Optimally when you do df2.collect(), you would like df2 to use the cached data in df1, but it doesn't work like this now since df2's execution plan has already been generated before we call df1.cache(). It might be worth revisiting the caches and update their plans if necessary when we call cacheQuery()
|
@TomaszGaweda @viirya Nice suggestion about the doc. I'll update it. |
|
Test build #92145 has finished for PR 21594 at commit
|
| */ | ||
| def unpersist(blocking: Boolean): this.type = { | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(this, false, blocking) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's clearer to write cascade =false
| def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { | ||
| uncacheQuery(query.sparkSession, query.logicalPlan, blocking) | ||
| def uncacheQuery(query: Dataset[_], | ||
| cascade: Boolean, blocking: Boolean = true): Unit = writeLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
def f(
param1: X,
param2: Y)....
4 space indentation.
| */ | ||
| def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: Boolean): Unit = writeLock { | ||
| def uncacheQuery(spark: SparkSession, plan: LogicalPlan, | ||
| cascade: Boolean, blocking: Boolean): Unit = writeLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: Boolean): Unit = writeLock { | ||
| def uncacheQuery(spark: SparkSession, plan: LogicalPlan, | ||
| cascade: Boolean, blocking: Boolean): Unit = writeLock { | ||
| val condition: LogicalPlan => Boolean = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
condition -> shouldUncache?
|
Test build #92185 has finished for PR 21594 at commit
|
|
retest please |
| } | ||
|
|
||
| test("SPARK-24596 Non-cascading Cache Invalidation - uncache temporary view") { | ||
| withView("t1", "t2") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withTempView
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.. good catch! A mistake caused by copy-paste.
| } | ||
|
|
||
| test("SPARK-24596 Non-cascading Cache Invalidation - drop temporary view") { | ||
| withView("t1", "t2") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| val df5 = df.agg(sum('a)).filter($"sum(a)" > 1) | ||
| assertCached(df5) | ||
| // first time use, load cache | ||
| df5.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we prove this takes more than 5 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just need to prove the new InMemoryRelation works alright for building cache (since the plan has been re-compiled) ... maybe we should check result though. Plus, I deliberately made this dataframe not dependent on the UDF so it can finish quickly.
|
LGTM except some comments about test |
|
Test build #92194 has finished for PR 21594 at commit
|
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few minor comments.
| */ | ||
| def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: Boolean): Unit = writeLock { | ||
| def uncacheQuery(spark: SparkSession, plan: LogicalPlan, | ||
| cascade: Boolean, blocking: Boolean): Unit = writeLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent.
| def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { | ||
| uncacheQuery(query.sparkSession, query.logicalPlan, blocking) | ||
| def uncacheQuery(query: Dataset[_], | ||
| cascade: Boolean, blocking: Boolean = true): Unit = writeLock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent
| // Also try to drop the contents of the table from the columnar cache | ||
| try { | ||
| spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier)) | ||
| spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
named argument. cascade = true
| needToRecache += cd.copy(cachedRepresentation = newCache) | ||
| } | ||
| } | ||
| needToRecache.foreach(cachedData.add) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create a private function from line 144 and line 158?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's almost the same logic as "recache", except that it tries to reuse the cached buffer here. It would be nice to integrate these two, but it would look so clean given the inconvenience of copying a CacheBuilder. I'll try though.
| */ | ||
| def unpersist(blocking: Boolean): this.type = { | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(this, cascade = false, blocking) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also update the comment of line 2966 and line 2979 and explain the new behavior
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val catalog = sparkSession.sessionState.catalog | ||
| val isTempTable = catalog.isTemporaryTable(tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename it to isTempView
| try { | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery( | ||
| sparkSession.table(tableName), !isTempTable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cascade = !isTempTable
| sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) | ||
| val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery( | ||
| sparkSession.table(tableName), !sessionCatalog.isTemporaryTable(tableIdent)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val cascade = !sessionCatalog.isTemporaryTable(tableIdent)
...
| if (isCached(table)) { | ||
| // Uncache the logicalPlan. | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(table, true, blocking = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the same here.
|
document the behavior changes in the |
|
@maryannxue Thanks for fixing the current behavior! This is a very important fix. |
|
Test build #92265 has finished for PR 21594 at commit
|
|
retest this please |
|
Test build #92274 has finished for PR 21594 at commit
|
|
LGTM |
|
Test build #92286 has finished for PR 21594 at commit
|
|
Test build #92287 has finished for PR 21594 at commit
|
|
retest this please |
|
Test build #92290 has finished for PR 21594 at commit
|
|
Thanks! Merged to master. |
What changes were proposed in this pull request?
DataSet.unpersist(): non-cascading modeCatalog.uncacheTable(): follow the same convention as drop tables/view, which is, use non-cascading mode for temporary views and regular mode for the restNote that a regular (persistent) view is a database object just like a table, so after dropping a regular view (whether cached or not cached), any query referring to that view should no long be valid. Hence if a cached persistent view is dropped, we need to invalidate the all dependent caches so that exceptions will be thrown for any later reference. On the other hand, a temporary view is in fact equivalent to an unnamed DataSet, and dropping a temporary view should have no impact on queries referencing that view. Thus we should do non-cascading uncaching for temporary views, which also guarantees a consistent uncaching behavior between temporary views and unnamed DataSets.
How was this patch tested?
New tests in CachedTableSuite and DatasetCacheSuite.