Skip to content

[SPARK-15678] Add support to REFRESH data source paths#13566

Closed
sameeragarwal wants to merge 3 commits intoapache:masterfrom
sameeragarwal:refresh-path-2
Closed

[SPARK-15678] Add support to REFRESH data source paths#13566
sameeragarwal wants to merge 3 commits intoapache:masterfrom
sameeragarwal:refresh-path-2

Conversation

@sameeragarwal
Copy link
Member

What changes were proposed in this pull request?

Spark currently incorrectly continues to use cached data even if the underlying data is overwritten.

Current behavior:

val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset

This patch fixes this bug by adding support for REFRESH path that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path.

Expected behavior:

val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset

How was this patch tested?

Unit tests for overwrites and appends in ParquetQuerySuite and CachedTableSuite.

@SparkQA
Copy link

SparkQA commented Jun 8, 2016

Test build #60187 has finished for PR 13566 at commit ece34ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RefreshResource(path: String)

(fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
cachedData.foreach {
case data if data.plan.find {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this into a separate function; it was kinda hard to understand that it is a part of the case guard.

@hvanhovell
Copy link
Contributor

Looks pretty good. Left one comment.

@sameeragarwal
Copy link
Member Author

Thanks, I pulled it out in a separate function.

@SparkQA
Copy link

SparkQA commented Jun 9, 2016

Test build #60212 has finished for PR 13566 at commit 6acd0c0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* @since 2.0.0
*/
def refreshResource(path: String): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it invalidateCache() to reflect the things we actually done?

Also, it's a bit confusing to have this API on catalog, can we put it on SparkSession?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confusing with these Catalog/SessionCatalog/ExternalCatalog here, thought this is SessionCatalog or ExternalCatalog, so it make sense to be here (together with other API related to cache).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like invalidateCache() but the reason for choosing refreshResource() was to make it sound similar to refreshTable () above. Let me know if you prefer one over the other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resource does sound a bit werid to me

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, changed this to refreshByPath based on @ericl's suggestion :)

@SparkQA
Copy link

SparkQA commented Jun 11, 2016

Test build #60317 has finished for PR 13566 at commit e79f3f7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Jun 11, 2016

LGTM,
Merging this into master and 2.0, thanks!

asfgit pushed a commit that referenced this pull request Jun 11, 2016
## What changes were proposed in this pull request?

Spark currently incorrectly continues to use cached data even if the underlying data is overwritten.

Current behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using the cached dataset
```

This patch fixes this bug by adding support for `REFRESH path` that invalidates and refreshes all the cached data (and the associated metadata) for any dataframe that contains the given data source path.

Expected behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the cached dataset
```

## How was this patch tested?

Unit tests for overwrites and appends in `ParquetQuerySuite` and `CachedTableSuite`.

Author: Sameer Agarwal <sameer@databricks.com>

Closes #13566 from sameeragarwal/refresh-path-2.

(cherry picked from commit 468da03)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
@asfgit asfgit closed this in 468da03 Jun 11, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants