Skip to content

[SPARK-15678][SQL] Not use cache on appends and overwrites#13419

Closed
sameeragarwal wants to merge 2 commits intoapache:masterfrom
sameeragarwal:drop-cache-on-write
Closed

[SPARK-15678][SQL] Not use cache on appends and overwrites#13419
sameeragarwal wants to merge 2 commits intoapache:masterfrom
sameeragarwal:drop-cache-on-write

Conversation

@sameeragarwal
Copy link
Member

@sameeragarwal sameeragarwal commented May 31, 2016

What changes were proposed in this pull request?

Spark currently incorrectly continues to use cached data even if the underlying data is overwritten.

Current behavior:

val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset

Expected behavior:

val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
df.count() // outputs 1000
sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset

This patch fixes this bug by modifying the ListingFileCatalog logic that used to only compare the directory name (as opposed to individual files) while comparing 2 plans. Note that in theory, this could lead to a slight regression (for large number of files) but I didn't notice any regression for micro-benchmarks with 1000s of files.

How was this patch tested?

Unit tests for overwrites and appends in ParquetQuerySuite.

@sameeragarwal
Copy link
Member Author

@yhuai @mengxr what are your thoughts on this approach?

spark.range(1000).write.mode("overwrite").parquet(path)
val df = sqlContext.read.parquet(path).cache()
assert(df.count() == 1000)
sqlContext.range(10).write.mode("overwrite").parquet(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlContext -> spark

@SparkQA
Copy link

SparkQA commented May 31, 2016

Test build #59668 has finished for PR 13419 at commit ee631d2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Hi, @sameeragarwal .
Is there any reason to use SQLContext instead of SparkSession in this PR?

@sameeragarwal
Copy link
Member Author

@dongjoon-hyun no reason; old habits. I'll fix this. Thanks! :)

@mengxr
Copy link
Contributor

mengxr commented May 31, 2016

I will prefer refreshing the dataset every time a dataset is reloaded but keeping existing ones unchanged.

val df1 = sqlContext.read.parquet(dir).cache()
df1.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
val df2 = sqlContext.read.parquet(dir).count() // outputs 10
df2.count() // outputs 10
df1.count() // still outputs 1000 because it was cached

Neither approach is perfectly safe. So I don't have strong preference on either.

@sameeragarwal sameeragarwal changed the title [SPARK-15678][SQL] Drop cache on appends and overwrites [SPARK-15678][SQL] Not use cache on appends and overwrites Jun 1, 2016
@sameeragarwal
Copy link
Member Author

@mengxr it seems like overwriting generates new files so we can achieve the same semantics without introducing an additional timestamp. The current solution should respect the contract for old dataframes while making sure that the new ones don't use the cached value. Let me know what you think.

@sameeragarwal
Copy link
Member Author

Also cc'ing @davies

@SparkQA
Copy link

SparkQA commented Jun 1, 2016

Test build #59706 has finished for PR 13419 at commit a21013a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tejasapatil
Copy link
Contributor

I guess that the caching is done over multiple nodes. If the data for a dataset is updated physically and some of the nodes where the data was cached go down, would the existing cached dataset be invalidated and refreshed ? If not, then old dataframes can give inconsistent or incomplete data.

@sameeragarwal
Copy link
Member Author

@tejasapatil if the nodes where the data was cached go down, the CacheManager should still consider that data as cached. In that case, the next time the data is accessed, the underlying RDD will be recomputed and cached again.

@sameeragarwal
Copy link
Member Author

I ended up creating a small design doc describing the problem and presenting 2 possible solutions at https://docs.google.com/document/d/1h5SzfC5UsvIrRpeLNDKSMKrKJvohkkccFlXo-GBAwQQ/edit?ts=574f717f#. Based on this, we decided in favor of option 2 (#13566) as it is a less intrusive change to the default behavior. I'm going to close this PR for now, but we may revisit this approach (i.e., option 1) for 2.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants