Skip to content

Commit 0fd9f57

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables
### What changes were proposed in this pull request? This PR proposes to support `CHACHE/UNCACHE TABLE` commands for v2 tables. In addtion, this PR proposes to migrate `CACHE/UNCACHE TABLE` to use `UnresolvedTableOrView` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing). ### Why are the changes needed? To support `CACHE/UNCACHE TABLE` commands for v2 tables. Note that `CACHE/UNCACHE TABLE` for v1 tables/views go through `SparkSession.table` to resolve identifier, which resolves temp views first, so there is no change in the behavior by moving to the new framework. ### Does this PR introduce _any_ user-facing change? Yes. Now the user can run `CACHE/UNCACHE TABLE` commands on v2 tables. ### How was this patch tested? Added/updated existing tests. Closes #30403 from imback82/cache_table. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 2da7259 commit 0fd9f57

File tree

13 files changed

+152
-136
lines changed

13 files changed

+152
-136
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3590,37 +3590,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
35903590
ctx.SERDE != null)
35913591
}
35923592

3593-
/**
3594-
* Create a [[CacheTableStatement]].
3595-
*
3596-
* For example:
3597-
* {{{
3598-
* CACHE [LAZY] TABLE multi_part_name
3599-
* [OPTIONS tablePropertyList] [[AS] query]
3600-
* }}}
3601-
*/
3602-
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) {
3603-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
3604-
3605-
val query = Option(ctx.query).map(plan)
3606-
val tableName = visitMultipartIdentifier(ctx.multipartIdentifier)
3607-
if (query.isDefined && tableName.length > 1) {
3608-
val catalogAndNamespace = tableName.init
3609-
throw new ParseException("It is not allowed to add catalog/namespace " +
3610-
s"prefix ${catalogAndNamespace.quoted} to " +
3611-
"the table name in CACHE TABLE AS SELECT", ctx)
3612-
}
3613-
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
3614-
CacheTableStatement(tableName, query, ctx.LAZY != null, options)
3615-
}
3616-
3617-
/**
3618-
* Create an [[UncacheTableStatement]] logical plan.
3619-
*/
3620-
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
3621-
UncacheTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null)
3622-
}
3623-
36243593
/**
36253594
* Create a [[TruncateTable]] command.
36263595
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -412,22 +412,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends
412412
*/
413413
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement
414414

415-
/**
416-
* A CACHE TABLE statement, as parsed from SQL
417-
*/
418-
case class CacheTableStatement(
419-
tableName: Seq[String],
420-
plan: Option[LogicalPlan],
421-
isLazy: Boolean,
422-
options: Map[String, String]) extends ParsedStatement
423-
424-
/**
425-
* An UNCACHE TABLE statement, as parsed from SQL
426-
*/
427-
case class UncacheTableStatement(
428-
tableName: Seq[String],
429-
ifExists: Boolean) extends ParsedStatement
430-
431415
/**
432416
* A TRUNCATE TABLE statement, as parsed from SQL
433417
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1984,33 +1984,6 @@ class DDLParserSuite extends AnalysisTest {
19841984
asSerde = true))
19851985
}
19861986

1987-
test("CACHE TABLE") {
1988-
comparePlans(
1989-
parsePlan("CACHE TABLE a.b.c"),
1990-
CacheTableStatement(Seq("a", "b", "c"), None, false, Map.empty))
1991-
1992-
comparePlans(
1993-
parsePlan("CACHE LAZY TABLE a.b.c"),
1994-
CacheTableStatement(Seq("a", "b", "c"), None, true, Map.empty))
1995-
1996-
comparePlans(
1997-
parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"),
1998-
CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY")))
1999-
2000-
intercept("CACHE TABLE a.b.c AS SELECT * FROM testData",
2001-
"It is not allowed to add catalog/namespace prefix a.b")
2002-
}
2003-
2004-
test("UNCACHE TABLE") {
2005-
comparePlans(
2006-
parsePlan("UNCACHE TABLE a.b.c"),
2007-
UncacheTableStatement(Seq("a", "b", "c"), ifExists = false))
2008-
2009-
comparePlans(
2010-
parsePlan("UNCACHE TABLE IF EXISTS a.b.c"),
2011-
UncacheTableStatement(Seq("a", "b", "c"), ifExists = true))
2012-
}
2013-
20141987
test("TRUNCATE table") {
20151988
comparePlans(
20161989
parsePlan("TRUNCATE TABLE a.b.c"),

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -446,20 +446,6 @@ class ResolveSessionCatalog(
446446
ShowCreateTableCommand(ident.asTableIdentifier)
447447
}
448448

449-
case CacheTableStatement(tbl, plan, isLazy, options) =>
450-
val name = if (plan.isDefined) {
451-
// CACHE TABLE ... AS SELECT creates a temp view with the input query.
452-
// Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
453-
tbl
454-
} else {
455-
parseTempViewOrV1Table(tbl, "CACHE TABLE")
456-
}
457-
CacheTableCommand(name.asTableIdentifier, plan, isLazy, options)
458-
459-
case UncacheTableStatement(tbl, ifExists) =>
460-
val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE")
461-
UncacheTableCommand(name.asTableIdentifier, ifExists)
462-
463449
case TruncateTable(ResolvedV1TableIdentifier(ident), partitionSpec) =>
464450
TruncateTableCommand(
465451
ident.asTableIdentifier,
@@ -561,12 +547,9 @@ class ResolveSessionCatalog(
561547
"SHOW VIEWS, only SessionCatalog supports this command.")
562548
}
563549

564-
case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) =>
550+
case ShowTableProperties(ResolvedV1TableOrViewIdentifier(ident), propertyKey) =>
565551
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey)
566552

567-
case ShowTableProperties(r: ResolvedView, propertyKey) =>
568-
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
569-
570553
case DescribeFunction(ResolvedFunc(identifier), extended) =>
571554
DescribeFunctionCommand(identifier.asFunctionIdentifier, extended)
572555

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,40 @@ class SparkSqlAstBuilder extends AstBuilder {
192192
unquotedPath
193193
}
194194

195+
/**
196+
* Create a [[CacheTableCommand]].
197+
*
198+
* For example:
199+
* {{{
200+
* CACHE [LAZY] TABLE multi_part_name
201+
* [OPTIONS tablePropertyList] [[AS] query]
202+
* }}}
203+
*/
204+
override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) {
205+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
206+
207+
val query = Option(ctx.query).map(plan)
208+
val tableName = visitMultipartIdentifier(ctx.multipartIdentifier)
209+
if (query.isDefined && tableName.length > 1) {
210+
val catalogAndNamespace = tableName.init
211+
throw new ParseException("It is not allowed to add catalog/namespace " +
212+
s"prefix ${catalogAndNamespace.quoted} to " +
213+
"the table name in CACHE TABLE AS SELECT", ctx)
214+
}
215+
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
216+
CacheTableCommand(tableName, query, ctx.LAZY != null, options)
217+
}
218+
219+
220+
/**
221+
* Create an [[UncacheTableCommand]] logical plan.
222+
*/
223+
override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) {
224+
UncacheTableCommand(
225+
visitMultipartIdentifier(ctx.multipartIdentifier),
226+
ctx.EXISTS != null)
227+
}
228+
195229
/**
196230
* Create a [[ClearCacheCommand]] logical plan.
197231
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,27 @@ package org.apache.spark.sql.execution.command
1919

2020
import java.util.Locale
2121

22-
import org.apache.spark.sql.{Dataset, Row, SparkSession}
23-
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, SparkSession}
2423
import org.apache.spark.sql.catalyst.plans.QueryPlan
2524
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan}
2625
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
26+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
2727
import org.apache.spark.storage.StorageLevel
2828

2929
case class CacheTableCommand(
30-
tableIdent: TableIdentifier,
30+
multipartIdentifier: Seq[String],
3131
plan: Option[LogicalPlan],
3232
isLazy: Boolean,
3333
options: Map[String, String]) extends RunnableCommand {
34-
require(plan.isEmpty || tableIdent.database.isEmpty,
35-
"Database name is not allowed in CACHE TABLE AS SELECT")
34+
require(plan.isEmpty || multipartIdentifier.length == 1,
35+
"Namespace name is not allowed in CACHE TABLE AS SELECT")
3636

3737
override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq
3838

3939
override def run(sparkSession: SparkSession): Seq[Row] = {
40+
val tableName = multipartIdentifier.quoted
4041
plan.foreach { logicalPlan =>
41-
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString)
42+
Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableName)
4243
}
4344

4445
val storageLevelKey = "storagelevel"
@@ -49,34 +50,46 @@ case class CacheTableCommand(
4950
logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
5051
}
5152

53+
val table = sparkSession.table(tableName)
5254
if (storageLevelValue.nonEmpty) {
53-
sparkSession.catalog.cacheTable(
54-
tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get))
55+
sparkSession.sharedState.cacheManager.cacheQuery(
56+
table,
57+
Some(tableName),
58+
StorageLevel.fromString(storageLevelValue.get))
5559
} else {
56-
sparkSession.catalog.cacheTable(tableIdent.quotedString)
60+
sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName))
5761
}
5862

5963
if (!isLazy) {
6064
// Performs eager caching
61-
sparkSession.table(tableIdent).count()
65+
table.count()
6266
}
6367

6468
Seq.empty[Row]
6569
}
6670
}
6771

68-
6972
case class UncacheTableCommand(
70-
tableIdent: TableIdentifier,
73+
multipartIdentifier: Seq[String],
7174
ifExists: Boolean) extends RunnableCommand {
7275

7376
override def run(sparkSession: SparkSession): Seq[Row] = {
74-
val tableId = tableIdent.quotedString
75-
if (!ifExists || sparkSession.catalog.tableExists(tableId)) {
76-
sparkSession.catalog.uncacheTable(tableId)
77+
val tableName = multipartIdentifier.quoted
78+
table(sparkSession, tableName).foreach { table =>
79+
val cascade = !sparkSession.sessionState.catalog.isTempView(multipartIdentifier)
80+
sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade)
7781
}
7882
Seq.empty[Row]
7983
}
84+
85+
private def table(sparkSession: SparkSession, name: String): Option[DataFrame] = {
86+
try {
87+
Some(sparkSession.table(name))
88+
} catch {
89+
case ex: AnalysisException if ifExists && ex.getMessage.contains("Table or view not found") =>
90+
None
91+
}
92+
}
8093
}
8194

8295
/**

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.executor.DataReadMethod._
2525
import org.apache.spark.executor.DataReadMethod.DataReadMethod
2626
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2727
import org.apache.spark.sql.catalyst.TableIdentifier
28+
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
2829
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
2930
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
3031
import org.apache.spark.sql.catalyst.util.DateTimeConstants
@@ -140,6 +141,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
140141
}
141142
}
142143

144+
test("cache table as select - existing temp view") {
145+
withTempView("tempView") {
146+
sql("CREATE TEMPORARY VIEW tempView as SELECT 1")
147+
val e = intercept[TempTableAlreadyExistsException] {
148+
sql("CACHE TABLE tempView AS SELECT 1")
149+
}
150+
assert(e.getMessage.contains("Temporary view 'tempView' already exists"))
151+
}
152+
}
153+
143154
test("uncaching temp table") {
144155
withTempView("tempTable1", "tempTable2") {
145156
testData.select("key").createOrReplaceTempView("tempTable1")

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
3030
import org.apache.spark.sql.connector.catalog._
3131
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
3232
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
33+
import org.apache.spark.sql.execution.columnar.InMemoryRelation
3334
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
3435
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION}
3536
import org.apache.spark.sql.internal.connector.SimpleTableProvider
@@ -2018,28 +2019,29 @@ class DataSourceV2SQLSuite
20182019
}
20192020
}
20202021

2021-
test("CACHE TABLE") {
2022+
test("CACHE/UNCACHE TABLE") {
20222023
val t = "testcat.ns1.ns2.tbl"
20232024
withTable(t) {
2024-
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
2025+
def isCached(table: String): Boolean = {
2026+
spark.table(table).queryExecution.withCachedData.isInstanceOf[InMemoryRelation]
2027+
}
20252028

2026-
testV1CommandSupportingTempView("CACHE TABLE", t)
2029+
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
2030+
sql(s"CACHE TABLE $t")
2031+
assert(isCached(t))
20272032

2028-
val e = intercept[AnalysisException] {
2029-
sql(s"CACHE LAZY TABLE $t")
2030-
}
2031-
assert(e.message.contains("CACHE TABLE is only supported with temp views or v1 tables"))
2033+
sql(s"UNCACHE TABLE $t")
2034+
assert(!isCached(t))
20322035
}
2033-
}
20342036

2035-
test("UNCACHE TABLE") {
2036-
val t = "testcat.ns1.ns2.tbl"
2037-
withTable(t) {
2038-
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
2039-
2040-
testV1CommandSupportingTempView("UNCACHE TABLE", t)
2041-
testV1CommandSupportingTempView("UNCACHE TABLE", s"IF EXISTS $t")
2037+
// Test a scenario where a table does not exist.
2038+
val e = intercept[AnalysisException] {
2039+
sql(s"UNCACHE TABLE $t")
20422040
}
2041+
assert(e.message.contains("Table or view not found: testcat.ns1.ns2.tbl"))
2042+
2043+
// If "IF EXISTS" is set, UNCACHE TABLE will not throw an exception.
2044+
sql(s"UNCACHE TABLE IF EXISTS $t")
20432045
}
20442046

20452047
test("SHOW COLUMNS") {
@@ -2555,11 +2557,15 @@ class DataSourceV2SQLSuite
25552557
}
25562558
}
25572559

2558-
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
2560+
private def testNotSupportedV2Command(
2561+
sqlCommand: String,
2562+
sqlParams: String,
2563+
sqlCommandInMessage: Option[String] = None): Unit = {
25592564
val e = intercept[AnalysisException] {
25602565
sql(s"$sqlCommand $sqlParams")
25612566
}
2562-
assert(e.message.contains(s"$sqlCommand is not supported for v2 tables"))
2567+
val cmdStr = sqlCommandInMessage.getOrElse(sqlCommand)
2568+
assert(e.message.contains(s"$cmdStr is not supported for v2 tables"))
25632569
}
25642570

25652571
private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {

0 commit comments

Comments
 (0)