Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1107,12 +1107,16 @@ class Analyzer(override val catalogManager: CatalogManager)

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(relation => c.copy(table = EliminateSubqueryAliases(relation)))
.map(resolveViews)
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(relation => c.copy(table = EliminateSubqueryAliases(relation)))
.map(resolveViews)
.map(EliminateSubqueryAliases(_))
.map(relation => c.copy(table = relation))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,34 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = {
val planToCache = query.logicalPlan
cacheQuery(query.sparkSession, query.logicalPlan, tableName, storageLevel)
}

/**
* Caches the data produced by the given [[LogicalPlan]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this! I think we may replace one more usage in DataSourceV2Strategy.invalidateCache as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, updated.

spark: SparkSession,
planToCache: LogicalPlan,
tableName: Option[String]): Unit = {
cacheQuery(spark, planToCache, tableName, MEMORY_AND_DISK)
}

/**
* Caches the data produced by the given [[LogicalPlan]].
*/
def cacheQuery(
spark: SparkSession,
planToCache: LogicalPlan,
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we can just keep a single method with default value of tableName being None and storageLevel being MEMORY_AND_DISK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scala compiler will complain if we do that. See #30815 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it. Thanks.

if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff(
query.sparkSession, forceDisableConfigs)
spark, forceDisableConfigs)
val inMemoryRelation = sessionWithConfigsOff.withActive {
val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache)
InMemoryRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import java.util.Locale

import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -29,10 +29,13 @@ import org.apache.spark.storage.StorageLevel

trait BaseCacheTableExec extends V2CommandExec {
def relationName: String
def dataFrameToCache: DataFrame
def planToCache: LogicalPlan
def dataFrameForCachedPlan: DataFrame
def isLazy: Boolean
def options: Map[String, String]

protected val sparkSession: SparkSession = sqlContext.sparkSession

override def run(): Seq[InternalRow] = {
val storageLevelKey = "storagelevel"
val storageLevelValue =
Expand All @@ -42,20 +45,22 @@ trait BaseCacheTableExec extends V2CommandExec {
logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
}

val sparkSession = sqlContext.sparkSession
val df = dataFrameToCache
if (storageLevelValue.nonEmpty) {
sparkSession.sharedState.cacheManager.cacheQuery(
df,
sparkSession,
planToCache,
Some(relationName),
StorageLevel.fromString(storageLevelValue.get))
} else {
sparkSession.sharedState.cacheManager.cacheQuery(df, Some(relationName))
sparkSession.sharedState.cacheManager.cacheQuery(
sparkSession,
planToCache,
Some(relationName))
}

if (!isLazy) {
// Performs eager caching
df.count()
// Performs eager caching.
dataFrameForCachedPlan.count()
}

Seq.empty
Expand All @@ -69,21 +74,28 @@ case class CacheTableExec(
multipartIdentifier: Seq[String],
override val isLazy: Boolean,
override val options: Map[String, String]) extends BaseCacheTableExec {
override def relationName: String = multipartIdentifier.quoted
override lazy val relationName: String = multipartIdentifier.quoted

override lazy val planToCache: LogicalPlan = relation

override def dataFrameToCache: DataFrame = Dataset.ofRows(sqlContext.sparkSession, relation)
override lazy val dataFrameForCachedPlan: DataFrame = {
Dataset.ofRows(sparkSession, planToCache)
}
}

case class CacheTableAsSelectExec(
tempViewName: String,
query: LogicalPlan,
override val isLazy: Boolean,
override val options: Map[String, String]) extends BaseCacheTableExec {
override def relationName: String = tempViewName
override lazy val relationName: String = tempViewName

override def dataFrameToCache: DataFrame = {
val sparkSession = sqlContext.sparkSession
override lazy val planToCache: LogicalPlan = {
Dataset.ofRows(sparkSession, query).createTempView(tempViewName)
dataFrameForCachedPlan.logicalPlan
}

override lazy val dataFrameForCachedPlan: DataFrame = {
sparkSession.table(tempViewName)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._

import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession, Strategy}
import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand Down Expand Up @@ -66,8 +66,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel

// recache with the same name and cache level.
val ds = Dataset.ofRows(session, v2Relation)
session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel)
session.sharedState.cacheManager.cacheQuery(session, v2Relation, cacheName, cacheLevel)
}
}

Expand Down