Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,10 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupTempView(ident)
.map(view => c.copy(table = view))
.getOrElse(c)
case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
lookupTempView(ident)
.map(view => c.copy(table = view, isTempView = true))
.getOrElse(c)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down Expand Up @@ -1005,6 +1009,11 @@ class Analyzer(override val catalogManager: CatalogManager)
.map(v2Relation => c.copy(table = v2Relation))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupV2Relation(u.multipartIdentifier, u.options, false)
.map(v2Relation => c.copy(table = v2Relation))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand =>
write.table match {
Expand Down Expand Up @@ -1098,7 +1107,12 @@ class Analyzer(override val catalogManager: CatalogManager)

case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(v2Relation => c.copy(table = v2Relation))
.map(relation => c.copy(table = EliminateSubqueryAliases(relation)))
.getOrElse(c)

case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
lookupRelation(u.multipartIdentifier, u.options, false)
.map(relation => c.copy(table = EliminateSubqueryAliases(relation)))
.getOrElse(c)

// TODO (SPARK-27484): handle streaming write commands when we have them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case CacheTable(u: UnresolvedRelation, _, _, _) =>
failAnalysis(s"Table or view not found for `CACHE TABLE`: ${u.multipartIdentifier.quoted}")
failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

case UncacheTable(u: UnresolvedRelation, _, _) =>
failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand}
import org.apache.spark.sql.catalyst.plans.logical.{DropTable, DropView, LogicalPlan, NoopCommand, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule

/**
Expand All @@ -31,5 +31,7 @@ object ResolveCommandsWithIfExists extends Rule[LogicalPlan] {
NoopCommand("DROP TABLE", u.multipartIdentifier)
case DropView(u: UnresolvedView, ifExists) if ifExists =>
NoopCommand("DROP VIEW", u.multipartIdentifier)
case UncacheTable(u: UnresolvedRelation, ifExists, _) if ifExists =>
NoopCommand("UNCACHE TABLE", u.multipartIdentifier)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3632,6 +3632,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
}

/**
* Create an [[UncacheTable]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTable(
UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier)),
ctx.EXISTS != null)
}

/**
* Create a [[TruncateTable]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,3 +794,11 @@ case class CacheTableAsSelect(
plan: LogicalPlan,
isLazy: Boolean,
options: Map[String, String]) extends Command

/**
* The logical plan of the UNCACHE TABLE command.
*/
case class UncacheTable(
table: LogicalPlan,
ifExists: Boolean,
isTempView: Boolean = false) extends Command
Original file line number Diff line number Diff line change
Expand Up @@ -2032,6 +2032,16 @@ class DDLParserSuite extends AnalysisTest {
"It is not allowed to add catalog/namespace prefix a.b")
}

test("UNCACHE TABLE") {
comparePlans(
parsePlan("UNCACHE TABLE a.b.c"),
UncacheTable(UnresolvedRelation(Seq("a", "b", "c")), ifExists = false))

comparePlans(
parsePlan("UNCACHE TABLE IF EXISTS a.b.c"),
UncacheTable(UnresolvedRelation(Seq("a", "b", "c")), ifExists = true))
}

test("TRUNCATE table") {
comparePlans(
parsePlan("TRUNCATE TABLE a.b.c"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,6 @@ class SparkSqlAstBuilder extends AstBuilder {
unquotedPath
}

/**
* Create an [[UncacheTableCommand]] logical plan.
*/
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
UncacheTableCommand(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.EXISTS != null)
}

/**
* Create a [[ClearCacheCommand]] logical plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,8 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

case class UncacheTableCommand(
multipartIdentifier: Seq[String],
ifExists: Boolean) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableName = multipartIdentifier.quoted
table(sparkSession, tableName).foreach { table =>
val cascade = !sparkSession.sessionState.catalog.isTempView(multipartIdentifier)
sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade)
}
Seq.empty[Row]
}

private def table(sparkSession: SparkSession, name: String): Option[DataFrame] = {
try {
Some(sparkSession.table(name))
} catch {
case ex: AnalysisException if ifExists && ex.getMessage.contains("Table or view not found") =>
None
}
}
}

/**
* Clear all cached data from the in-memory cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.SupportsRead
Expand Down Expand Up @@ -283,6 +283,20 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
c.copy(table = readDataSourceTable(tableMeta, options))

case c @ CacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _) =>
c.copy(table = DDLUtils.readHiveTable(tableMeta))

case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, options, false), _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
u.copy(table = readDataSourceTable(tableMeta, options))

case u @ UncacheTable(UnresolvedCatalogRelation(tableMeta, _, false), _, _) =>
u.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta, options, false)
if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta, options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,15 @@ case class CacheTableAsSelectExec(
sparkSession.table(tempViewName)
}
}

case class UncacheTableExec(
relation: LogicalPlan,
cascade: Boolean) extends V2CommandExec {
override def run(): Seq[InternalRow] = {
val sparkSession = sqlContext.sparkSession
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession, relation, cascade)
Seq.empty
}

override def output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case r: CacheTableAsSelect =>
CacheTableAsSelectExec(r.tempViewName, r.plan, r.isLazy, r.options) :: Nil

case r: UncacheTable =>
UncacheTableExec(r.table, cascade = !r.isTempView) :: Nil

case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,16 +339,6 @@ class SparkSqlParserSuite extends AnalysisTest {
"LINES TERMINATED BY only supports newline '\\n' right now")
}

test("UNCACHE TABLE") {
assertEqual(
"UNCACHE TABLE a.b.c",
UncacheTableCommand(Seq("a", "b", "c"), ifExists = false))

assertEqual(
"UNCACHE TABLE IF EXISTS a.b.c",
UncacheTableCommand(Seq("a", "b", "c"), ifExists = true))
}

test("CLEAR CACHE") {
assertEqual("CLEAR CACHE", ClearCacheCommand)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{CacheTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics, UncacheTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -231,6 +231,16 @@ case class RelationConversions(
assertNoNullTypeInSchema(query.schema)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)

// Cache table
case c @ CacheTable(relation: HiveTableRelation, _, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
c.copy(table = metastoreCatalog.convert(relation))

// Uncache table
case u @ UncacheTable(relation: HiveTableRelation, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
u.copy(table = metastoreCatalog.convert(relation))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
e = intercept[AnalysisException] {
sql("UNCACHE TABLE nonexistentTable")
}.getMessage
assert(e.contains(s"$expectedErrorMsg nonexistentTable"))
assert(e.contains("Table or view not found: nonexistentTable"))
sql("UNCACHE TABLE IF EXISTS nonexistentTable")
}

Expand Down