diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0bef6998b177d..10c8ac58840f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1107,12 +1107,16 @@ class Analyzer(override val catalogManager: CatalogManager) case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) => lookupRelation(u.multipartIdentifier, u.options, false) - .map(relation => c.copy(table = EliminateSubqueryAliases(relation))) + .map(resolveViews) + .map(EliminateSubqueryAliases(_)) + .map(relation => c.copy(table = relation)) .getOrElse(c) case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) => lookupRelation(u.multipartIdentifier, u.options, false) - .map(relation => c.copy(table = EliminateSubqueryAliases(relation))) + .map(resolveViews) + .map(EliminateSubqueryAliases(_)) + .map(relation => c.copy(table = relation)) .getOrElse(c) // TODO (SPARK-27484): handle streaming write commands when we have them. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index f163d85914bc9..b3671945e5891 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -88,12 +88,34 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = { - val planToCache = query.logicalPlan + cacheQuery(query.sparkSession, query.logicalPlan, tableName, storageLevel) + } + + /** + * Caches the data produced by the given [[LogicalPlan]]. + * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because + * recomputing the in-memory columnar representation of the underlying table is expensive. + */ + def cacheQuery( + spark: SparkSession, + planToCache: LogicalPlan, + tableName: Option[String]): Unit = { + cacheQuery(spark, planToCache, tableName, MEMORY_AND_DISK) + } + + /** + * Caches the data produced by the given [[LogicalPlan]]. + */ + def cacheQuery( + spark: SparkSession, + planToCache: LogicalPlan, + tableName: Option[String], + storageLevel: StorageLevel): Unit = { if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff( - query.sparkSession, forceDisableConfigs) + spark, forceDisableConfigs) val inMemoryRelation = sessionWithConfigsOff.withActive { val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache) InMemoryRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 2d8e5b5e286b8..4a7152232e8fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.Locale -import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -29,10 +29,13 @@ import org.apache.spark.storage.StorageLevel trait BaseCacheTableExec extends V2CommandExec { def relationName: String - def dataFrameToCache: DataFrame + def planToCache: LogicalPlan + def dataFrameForCachedPlan: DataFrame def isLazy: Boolean def options: Map[String, String] + protected val sparkSession: SparkSession = sqlContext.sparkSession + override def run(): Seq[InternalRow] = { val storageLevelKey = "storagelevel" val storageLevelValue = @@ -42,20 +45,22 @@ trait BaseCacheTableExec extends V2CommandExec { logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") } - val sparkSession = sqlContext.sparkSession - val df = dataFrameToCache if (storageLevelValue.nonEmpty) { sparkSession.sharedState.cacheManager.cacheQuery( - df, + sparkSession, + planToCache, Some(relationName), StorageLevel.fromString(storageLevelValue.get)) } else { - sparkSession.sharedState.cacheManager.cacheQuery(df, Some(relationName)) + sparkSession.sharedState.cacheManager.cacheQuery( + sparkSession, + planToCache, + Some(relationName)) } if (!isLazy) { - // Performs eager caching - df.count() + // Performs eager caching. + dataFrameForCachedPlan.count() } Seq.empty @@ -69,9 +74,13 @@ case class CacheTableExec( multipartIdentifier: Seq[String], override val isLazy: Boolean, override val options: Map[String, String]) extends BaseCacheTableExec { - override def relationName: String = multipartIdentifier.quoted + override lazy val relationName: String = multipartIdentifier.quoted + + override lazy val planToCache: LogicalPlan = relation - override def dataFrameToCache: DataFrame = Dataset.ofRows(sqlContext.sparkSession, relation) + override lazy val dataFrameForCachedPlan: DataFrame = { + Dataset.ofRows(sparkSession, planToCache) + } } case class CacheTableAsSelectExec( @@ -79,11 +88,14 @@ case class CacheTableAsSelectExec( query: LogicalPlan, override val isLazy: Boolean, override val options: Map[String, String]) extends BaseCacheTableExec { - override def relationName: String = tempViewName + override lazy val relationName: String = tempViewName - override def dataFrameToCache: DataFrame = { - val sparkSession = sqlContext.sparkSession + override lazy val planToCache: LogicalPlan = { Dataset.ofRows(sparkSession, query).createTempView(tempViewName) + dataFrameForCachedPlan.logicalPlan + } + + override lazy val dataFrameForCachedPlan: DataFrame = { sparkSession.table(tempViewName) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 120fa5288dda9..61fc4f15ee3f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession, Strategy} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -66,8 +66,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel // recache with the same name and cache level. - val ds = Dataset.ofRows(session, v2Relation) - session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel) + session.sharedState.cacheManager.cacheQuery(session, v2Relation, cacheName, cacheLevel) } }