Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ statement
tableIdentifier partitionSpec? describeColName? #describeTable
| (DESC | DESCRIBE) DATABASE EXTENDED? identifier #describeDatabase
| REFRESH TABLE tableIdentifier #refreshTable
| REFRESH .*? #refreshResource
| CACHE LAZY? TABLE identifier (AS? query)? #cacheTable
| UNCACHE TABLE identifier #uncacheTable
| CLEAR CACHE #clearCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,11 @@ abstract class Catalog {
*/
def refreshTable(tableName: String): Unit

/**
* Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that
* contains the given data source path.
*
* @since 2.0.0
*/
def refreshByPath(path: String): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ package org.apache.spark.sql.execution

import java.util.concurrent.locks.ReentrantReadWriteLock

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

Expand Down Expand Up @@ -157,4 +161,49 @@ private[sql] class CacheManager extends Logging {
case _ =>
}
}

/**
* Invalidates the cache of any data that contains `resourcePath` in one or more
* `HadoopFsRelation` node(s) as part of its logical plan.
*/
private[sql] def invalidateCachedPath(
sparkSession: SparkSession, resourcePath: String): Unit = writeLock {
val (fs, qualifiedPath) = {
val path = new Path(resourcePath)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
(fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}

cachedData.foreach {
case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined =>
val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan))
if (dataIndex >= 0) {
data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true)
cachedData.remove(dataIndex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we refresh the LogicalRelation and unpersist the cachedRepresentation, do we still need to remove it and add it back?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I didn't find a way around it. The old plan has an incorrect catalog, and removing and adding the new one seems like the only option.

}
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan))
case _ => // Do Nothing
}
}

/**
* Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the
* [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes
* in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns
* false.
*/
private def lookupAndRefresh(plan: LogicalPlan, fs: FileSystem, qualifiedPath: Path): Boolean = {
plan match {
case lr: LogicalRelation => lr.relation match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LogicalRelation could be part of cached logical plan, should we also invalidate that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, we are using plan.find in invalidateCachedPath

case hr: HadoopFsRelation =>
val invalidate = hr.location.paths
.map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
.contains(qualifiedPath)
if (invalidate) hr.location.refresh()
invalidate
case _ => false
}
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.antlr.v4.runtime.tree.TerminalNode

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
Expand Down Expand Up @@ -209,6 +208,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
RefreshTable(visitTableIdentifier(ctx.tableIdentifier))
}

/**
* Create a [[RefreshTable]] logical plan.
*/
override def visitRefreshResource(ctx: RefreshResourceContext): LogicalPlan = withOrigin(ctx) {
val resourcePath = remainder(ctx.REFRESH.getSymbol).trim
RefreshResource(resourcePath)
}

/**
* Create a [[CacheTableCommand]] logical plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ case class RefreshTable(tableIdent: TableIdentifier)
}
}

case class RefreshResource(path: String)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.catalog.refreshByPath(path)
Seq.empty[Row]
}
}

/**
* Builds a map in which keys are case insensitive
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
}

/**
* Refresh the cache entry and the associated metadata for all dataframes (if any), that contain
* the given data source path.
*
* @group cachemgmt
* @since 2.0.0
*/
override def refreshByPath(resourcePath: String): Unit = {
sparkSession.sharedState.cacheManager.invalidateCachedPath(sparkSession, resourcePath)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,34 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
TableIdentifier("tmp"), ignoreIfNotExists = true)
}

test("SPARK-15678: not use cache on overwrite") {
withTempDir { dir =>
val path = dir.toString
spark.range(1000).write.mode("overwrite").parquet(path)
val df = spark.read.parquet(path).cache()
assert(df.count() == 1000)
spark.range(10).write.mode("overwrite").parquet(path)
assert(df.count() == 1000)
spark.catalog.refreshByPath(path)
assert(df.count() == 10)
assert(spark.read.parquet(path).count() == 10)
}
}

test("SPARK-15678: not use cache on append") {
withTempDir { dir =>
val path = dir.toString
spark.range(1000).write.mode("append").parquet(path)
val df = spark.read.parquet(path).cache()
assert(df.count() == 1000)
spark.range(10).write.mode("append").parquet(path)
assert(df.count() == 1000)
spark.catalog.refreshByPath(path)
assert(df.count() == 1010)
assert(spark.read.parquet(path).count() == 1010)
}
}

test("self-join") {
// 4 rows, cells of column 1 of row 2 and row 4 are null
val data = (1 to 4).map { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,51 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
Utils.deleteRecursively(tempPath)
}

test("SPARK-15678: REFRESH PATH") {
val tempPath: File = Utils.createTempDir()
tempPath.delete()
table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
sql("DROP TABLE IF EXISTS refreshTable")
sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
table("refreshTable"),
table("src").collect())
// Cache the table.
sql("CACHE TABLE refreshTable")
assertCached(table("refreshTable"))
// Append new data.
table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
// We are still using the old data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").collect())
// Refresh the table.
sql(s"REFRESH ${tempPath.toString}")
// We are using the new data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").union(table("src")).collect())

// Drop the table and create it again.
sql("DROP TABLE refreshTable")
sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
// Refresh the table. REFRESH command should not make a uncached
// table cached.
sql(s"REFRESH ${tempPath.toString}")
checkAnswer(
table("refreshTable"),
table("src").union(table("src")).collect())
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")

sql("DROP TABLE refreshTable")
Utils.deleteRecursively(tempPath)
}

test("SPARK-11246 cache parquet table") {
sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")

Expand Down