diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a46f2e3168c6b..0bef6998b177d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -875,6 +875,10 @@ class Analyzer(override val catalogManager: CatalogManager) lookupTempView(ident) .map(view => c.copy(table = view)) .getOrElse(c) + case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) => + lookupTempView(ident) + .map(view => c.copy(table = view, isTempView = true)) + .getOrElse(c) // TODO (SPARK-27484): handle streaming write commands when we have them. case write: V2WriteCommand => write.table match { @@ -1005,6 +1009,11 @@ class Analyzer(override val catalogManager: CatalogManager) .map(v2Relation => c.copy(table = v2Relation)) .getOrElse(c) + case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) => + lookupV2Relation(u.multipartIdentifier, u.options, false) + .map(v2Relation => c.copy(table = v2Relation)) + .getOrElse(c) + // TODO (SPARK-27484): handle streaming write commands when we have them. case write: V2WriteCommand => write.table match { @@ -1098,7 +1107,12 @@ class Analyzer(override val catalogManager: CatalogManager) case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) => lookupRelation(u.multipartIdentifier, u.options, false) - .map(v2Relation => c.copy(table = v2Relation)) + .map(relation => c.copy(table = EliminateSubqueryAliases(relation))) + .getOrElse(c) + + case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) => + lookupRelation(u.multipartIdentifier, u.options, false) + .map(relation => c.copy(table = EliminateSubqueryAliases(relation))) .getOrElse(c) // TODO (SPARK-27484): handle streaming write commands when we have them. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index c8e137e9c18ac..30467685d75a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -125,7 +125,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}") case CacheTable(u: UnresolvedRelation, _, _, _) => - failAnalysis(s"Table or view not found for `CACHE TABLE`: ${u.multipartIdentifier.quoted}") + failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}") + + case UncacheTable(u: UnresolvedRelation, _, _) => + failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}") // TODO (SPARK-27484): handle streaming write commands when we have them. case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala index 196a07a7f9904..60f86b31a4bdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand} +import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable} import org.apache.spark.sql.catalyst.rules.Rule /** @@ -31,5 +31,7 @@ object ResolveCommandsWithIfExists extends Rule[LogicalPlan] { NoopCommand("DROP TABLE", u.multipartIdentifier) case DropView(u: UnresolvedView, ifExists) if ifExists => NoopCommand("DROP VIEW", u.multipartIdentifier) + case UncacheTable(u: UnresolvedRelation, ifExists, _) if ifExists => + NoopCommand("UNCACHE TABLE", u.multipartIdentifier) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 0284d5d01ba96..426dff343818b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3632,6 +3632,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } } + /** + * Create an [[UncacheTable]] logical plan. + */ + override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { + UncacheTable( + UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier)), + ctx.EXISTS != null) + } + /** * Create a [[TruncateTable]] command. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 2091d92eb67c9..d13ad977910d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -794,3 +794,11 @@ case class CacheTableAsSelect( plan: LogicalPlan, isLazy: Boolean, options: Map[String, String]) extends Command + +/** + * The logical plan of the UNCACHE TABLE command. + */ +case class UncacheTable( + table: LogicalPlan, + ifExists: Boolean, + isTempView: Boolean = false) extends Command diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 2b3fc6f71a5c0..9bea6517156ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2032,6 +2032,16 @@ class DDLParserSuite extends AnalysisTest { "It is not allowed to add catalog/namespace prefix a.b") } + test("UNCACHE TABLE") { + comparePlans( + parsePlan("UNCACHE TABLE a.b.c"), + UncacheTable(UnresolvedRelation(Seq("a", "b", "c")), ifExists = false)) + + comparePlans( + parsePlan("UNCACHE TABLE IF EXISTS a.b.c"), + UncacheTable(UnresolvedRelation(Seq("a", "b", "c")), ifExists = true)) + } + test("TRUNCATE table") { comparePlans( parsePlan("TRUNCATE TABLE a.b.c"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3ca3461dfbd47..722ca6f992064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -192,15 +192,6 @@ class SparkSqlAstBuilder extends AstBuilder { unquotedPath } - /** - * Create an [[UncacheTableCommand]] logical plan. - */ - override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { - UncacheTableCommand( - visitMultipartIdentifier(ctx.multipartIdentifier), - ctx.EXISTS != null) - } - /** * Create a [[ClearCacheCommand]] logical plan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 3f85a1b0f99d6..2f72af7f4b512 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -17,32 +17,8 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper - -case class UncacheTableCommand( - multipartIdentifier: Seq[String], - ifExists: Boolean) extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val tableName = multipartIdentifier.quoted - table(sparkSession, tableName).foreach { table => - val cascade = !sparkSession.sessionState.catalog.isTempView(multipartIdentifier) - sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade) - } - Seq.empty[Row] - } - - private def table(sparkSession: SparkSession, name: String): Option[DataFrame] = { - try { - Some(sparkSession.table(name)) - } catch { - case ex: AnalysisException if ifExists && ex.getMessage.contains("Table or view not found") => - None - } - } -} /** * Clear all cached data from the in-memory cache. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e4f001d61a767..a097017222b57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, UncacheTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog.SupportsRead @@ -283,6 +283,20 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) => i.copy(table = DDLUtils.readHiveTable(tableMeta)) + case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _) + if DDLUtils.isDatasourceTable(tableMeta) => + c.copy(table = readDataSourceTable(tableMeta, options)) + + case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _) => + c.copy(table = DDLUtils.readHiveTable(tableMeta)) + + case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _) + if DDLUtils.isDatasourceTable(tableMeta) => + u.copy(table = readDataSourceTable(tableMeta, options)) + + case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _) => + u.copy(table = DDLUtils.readHiveTable(tableMeta)) + case UnresolvedCatalogRelation(tableMeta, options, false) if DDLUtils.isDatasourceTable(tableMeta) => readDataSourceTable(tableMeta, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 85107dfc9b2ef..2d8e5b5e286b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -87,3 +87,15 @@ case class CacheTableAsSelectExec( sparkSession.table(tempViewName) } } + +case class UncacheTableExec( + relation: LogicalPlan, + cascade: Boolean) extends V2CommandExec { + override def run(): Seq[InternalRow] = { + val sparkSession = sqlContext.sparkSession + sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, relation, cascade) + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 6020e42b21900..120fa5288dda9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -364,6 +364,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case r: CacheTableAsSelect => CacheTableAsSelectExec(r.tempViewName, r.plan, r.isLazy, r.options) :: Nil + case r: UncacheTable => + UncacheTableExec(r.table, cascade = !r.isTempView) :: Nil + case _ => Nil } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 009c5b3705d2f..f1788e9c31af8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -339,16 +339,6 @@ class SparkSqlParserSuite extends AnalysisTest { "LINES TERMINATED BY only supports newline '\\n' right now") } - test("UNCACHE TABLE") { - assertEqual( - "UNCACHE TABLE a.b.c", - UncacheTableCommand(Seq("a", "b", "c"), ifExists = false)) - - assertEqual( - "UNCACHE TABLE IF EXISTS a.b.c", - UncacheTableCommand(Seq("a", "b", "c"), ifExists = true)) - } - test("CLEAR CACHE") { assertEqual("CLEAR CACHE", ClearCacheCommand) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ff7dc58829fa1..e10233d2573c9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics, UncacheTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema import org.apache.spark.sql.execution._ @@ -231,6 +231,16 @@ case class RelationConversions( assertNoNullTypeInSchema(query.schema) OptimizedCreateHiveTableAsSelectCommand( tableDesc, query, query.output.map(_.name), mode) + + // Cache table + case c @ CacheTable(relation: HiveTableRelation, _, _, _) + if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => + c.copy(table = metastoreCatalog.convert(relation)) + + // Uncache table + case u @ UncacheTable(relation: HiveTableRelation, _, _) + if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => + u.copy(table = metastoreCatalog.convert(relation)) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 81c3f271b18d4..6cb98e92e36fa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -113,7 +113,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto e = intercept[AnalysisException] { sql("UNCACHE TABLE nonexistentTable") }.getMessage - assert(e.contains(s"$expectedErrorMsg nonexistentTable")) + assert(e.contains("Table or view not found: nonexistentTable")) sql("UNCACHE TABLE IF EXISTS nonexistentTable") }