From 3e532c8ab4e7e6825bd1903171a09a2338849917 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 10 Nov 2020 22:52:51 -0800 Subject: [PATCH 01/20] initial commit --- .../sql/catalyst/parser/AstBuilder.scala | 4 ++-- .../catalyst/plans/logical/statements.scala | 9 -------- .../catalyst/plans/logical/v2Commands.scala | 12 +++++++++++ .../sql/catalyst/parser/DDLParserSuite.scala | 10 ++++++--- .../analysis/ResolveSessionCatalog.scala | 21 +++++++------------ .../datasources/v2/DataSourceV2Strategy.scala | 3 +++ .../sql/connector/DataSourceV2SQLSuite.scala | 17 +++++++-------- 7 files changed, 40 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 893afc8984e9..414733a0e080 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3312,7 +3312,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a [[CacheTableStatement]]. + * Create a [[CacheTable]]. * * For example: * {{{ @@ -3332,7 +3332,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging "the table name in CACHE TABLE AS SELECT", ctx) } val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) - CacheTableStatement(tableName, query, ctx.LAZY != null, options) + CacheTable(UnresolvedTableOrView(tableName), query, ctx.LAZY != null, options) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 2fc56891cd15..79c0c7f05a6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -347,15 +347,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends */ case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement -/** - * A CACHE TABLE statement, as parsed from SQL - */ -case class CacheTableStatement( - tableName: Seq[String], - plan: Option[LogicalPlan], - isLazy: Boolean, - options: Map[String, String]) extends ParsedStatement - /** * An UNCACHE TABLE statement, as parsed from SQL */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index c1fc0b69354c..d9b9264519cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -629,3 +629,15 @@ case class LoadData( case class ShowCreateTable(child: LogicalPlan, asSerde: Boolean = false) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the CACHE TABLE command. + */ +case class CacheTable( + child: LogicalPlan, + plan: Option[LogicalPlan], + isLazy: Boolean, + options: Map[String, String]) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index be1ac56c4a4a..ff48b42d335a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1595,15 +1595,19 @@ class DDLParserSuite extends AnalysisTest { test("CACHE TABLE") { comparePlans( parsePlan("CACHE TABLE a.b.c"), - CacheTableStatement(Seq("a", "b", "c"), None, false, Map.empty)) + CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), None, false, Map.empty)) comparePlans( parsePlan("CACHE LAZY TABLE a.b.c"), - CacheTableStatement(Seq("a", "b", "c"), None, true, Map.empty)) + CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), None, true, Map.empty)) comparePlans( parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"), - CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY"))) + CacheTable( + UnresolvedTableOrView(Seq("a", "b", "c")), + None, + true, + Map("storageLevel" -> "DISK_ONLY"))) intercept("CACHE TABLE a.b.c AS SELECT * FROM testData", "It is not allowed to add catalog/namespace prefix a.b") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index ff25272aebb5..8a74d7e0848d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -441,15 +441,13 @@ class ResolveSessionCatalog( ShowCreateTableCommand(ident.asTableIdentifier) } - case CacheTableStatement(tbl, plan, isLazy, options) => - val name = if (plan.isDefined) { - // CACHE TABLE ... AS SELECT creates a temp view with the input query. - // Temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name. - tbl - } else { - parseTempViewOrV1Table(tbl, "CACHE TABLE") - } - CacheTableCommand(name.asTableIdentifier, plan, isLazy, options) + // CACHE TABLE ... AS SELECT creates a temp view with the input query. + // Thus, use the identifier in UnresolvedTableOrView directly, + case CacheTable(u: UnresolvedTableOrView, plan, isLazy, options) if plan.isDefined => + CacheTableCommand(u.multipartIdentifier.asTableIdentifier, plan, isLazy, options) + + case CacheTable(ResolvedV1TableOrViewIdentifier(ident), plan, isLazy, options) => + CacheTableCommand(ident.asTableIdentifier, plan, isLazy, options) case UncacheTableStatement(tbl, ifExists) => val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE") @@ -570,12 +568,9 @@ class ResolveSessionCatalog( "SHOW VIEWS, only SessionCatalog supports this command.") } - case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) => + case ShowTableProperties(ResolvedV1TableOrViewIdentifier(ident), propertyKey) => ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey) - case ShowTableProperties(r: ResolvedView, propertyKey) => - ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey) - case DescribeFunction(ResolvedFunc(identifier), extended) => DescribeFunctionCommand(identifier.asFunctionIdentifier, extended) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 48fa88ed550b..391f98d26cc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -289,6 +289,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case ShowCreateTable(_: ResolvedTable, _) => throw new AnalysisException("SHOW CREATE TABLE is not supported for v2 tables.") + case CacheTable(_: ResolvedTable, _, _, _) => + throw new AnalysisException("CACHE TABLE is not supported for v2 tables.") + case _ => Nil } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 68de55f03ba8..303cd492e4a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1963,13 +1963,8 @@ class DataSourceV2SQLSuite val t = "testcat.ns1.ns2.tbl" withTable(t) { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - - testV1CommandSupportingTempView("CACHE TABLE", t) - - val e = intercept[AnalysisException] { - sql(s"CACHE LAZY TABLE $t") - } - assert(e.message.contains("CACHE TABLE is only supported with temp views or v1 tables")) + testNotSupportedV2Command("CACHE TABLE", t) + testNotSupportedV2Command("CACHE LAZY TABLE", t, sqlCommandInMessage = Some("CACHE TABLE")) } } @@ -2486,11 +2481,15 @@ class DataSourceV2SQLSuite } } - private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { + private def testNotSupportedV2Command( + sqlCommand: String, + sqlParams: String, + sqlCommandInMessage: Option[String] = None): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") } - assert(e.message.contains(s"$sqlCommand is not supported for v2 tables")) + val cmdStr = sqlCommandInMessage.getOrElse(sqlCommand) + assert(e.message.contains(s"$cmdStr is not supported for v2 tables")) } private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { From f4ee301ace4e2d54415b3f3b587619765af9aab9 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 11 Nov 2020 21:30:01 -0800 Subject: [PATCH 02/20] Uncache --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 6 ++++-- .../spark/sql/catalyst/plans/logical/statements.scala | 7 ------- .../spark/sql/catalyst/plans/logical/v2Commands.scala | 6 ++++++ .../apache/spark/sql/catalyst/parser/DDLParserSuite.scala | 4 ++-- .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 5 ++--- .../execution/datasources/v2/DataSourceV2Strategy.scala | 3 +++ 6 files changed, 17 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 414733a0e080..5af5e4c15677 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3336,10 +3336,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create an [[UncacheTableStatement]] logical plan. + * Create an [[UncacheTable]] logical plan. */ override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { - UncacheTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null) + UncacheTable( + UnresolvedTableOrView(visitMultipartIdentifier(ctx.multipartIdentifier)), + ctx.EXISTS != null) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 79c0c7f05a6e..94f9e754113e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -347,13 +347,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends */ case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement -/** - * An UNCACHE TABLE statement, as parsed from SQL - */ -case class UncacheTableStatement( - tableName: Seq[String], - ifExists: Boolean) extends ParsedStatement - /** * A TRUNCATE TABLE statement, as parsed from SQL */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d9b9264519cd..2a0117cb477c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -641,3 +641,9 @@ case class CacheTable( override def children: Seq[LogicalPlan] = child :: Nil } +/** + * The logical plan of the UNCACHE TABLE command. + */ +case class UncacheTable(child: LogicalPlan, ifExists: Boolean) extends Command { + override def children: Seq[LogicalPlan] = child :: Nil +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index ff48b42d335a..b17c7a7edd43 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1616,11 +1616,11 @@ class DDLParserSuite extends AnalysisTest { test("UNCACHE TABLE") { comparePlans( parsePlan("UNCACHE TABLE a.b.c"), - UncacheTableStatement(Seq("a", "b", "c"), ifExists = false)) + UncacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), ifExists = false)) comparePlans( parsePlan("UNCACHE TABLE IF EXISTS a.b.c"), - UncacheTableStatement(Seq("a", "b", "c"), ifExists = true)) + UncacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), ifExists = true)) } test("TRUNCATE table") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 8a74d7e0848d..1239c8a1d82c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -449,9 +449,8 @@ class ResolveSessionCatalog( case CacheTable(ResolvedV1TableOrViewIdentifier(ident), plan, isLazy, options) => CacheTableCommand(ident.asTableIdentifier, plan, isLazy, options) - case UncacheTableStatement(tbl, ifExists) => - val name = parseTempViewOrV1Table(tbl, "UNCACHE TABLE") - UncacheTableCommand(name.asTableIdentifier, ifExists) + case UncacheTable(ResolvedV1TableOrViewIdentifier(ident), ifExists) => + UncacheTableCommand(ident.asTableIdentifier, ifExists) case TruncateTableStatement(tbl, partitionSpec) => val v1TableName = parseV1Table(tbl, "TRUNCATE TABLE") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 391f98d26cc2..f19b29bda484 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -292,6 +292,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case CacheTable(_: ResolvedTable, _, _, _) => throw new AnalysisException("CACHE TABLE is not supported for v2 tables.") + case UncacheTable(_: ResolvedTable, _) => + throw new AnalysisException("UNCACHE TABLE is not supported for v2 tables.") + case _ => Nil } } From a0687b3fc8ba5bd9ac19c3a0374cf855e8493f78 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 13 Nov 2020 18:51:06 -0800 Subject: [PATCH 03/20] ResolveCommandsWithIfExists to support uncache table --- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- ...able.scala => ResolveCommandsWithIfExists.scala} | 13 ++++++++----- .../sql/catalyst/plans/logical/v2Commands.scala | 4 ++-- .../datasources/v2/DataSourceV2Strategy.scala | 2 +- 4 files changed, 12 insertions(+), 9 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/{ResolveNoopDropTable.scala => ResolveCommandsWithIfExists.scala} (68%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5834f9bad4a1..a098f10c5bf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -259,7 +259,7 @@ class Analyzer( TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, - Seq(ResolveNoopDropTable) ++ + Seq(ResolveCommandsWithIfExists) ++ postHocResolutionRules: _*), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Remove Unresolved Hints", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala similarity index 68% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala index f9da9174f85e..af08efb871b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala @@ -17,17 +17,20 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable} +import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopCommand, UncacheTable} import org.apache.spark.sql.catalyst.rules.Rule /** - * A rule for handling [[DropTable]] logical plan when the table or temp view is not resolved. - * If "ifExists" flag is set to true, the plan is resolved to [[NoopDropTable]], + * A rule for handling commands when the table or temp view is not resolved. + * These commands support a flag, "ifExists", so that they do not fail when a relation is not + * resolved. If the "ifExists" flag is set to true. the plan is resolved to [[NoopCommand]], * which is a no-op command. */ -object ResolveNoopDropTable extends Rule[LogicalPlan] { +object ResolveCommandsWithIfExists extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists => - NoopDropTable(u.multipartIdentifier) + NoopCommand(u.multipartIdentifier) + case UncacheTable(u: UnresolvedTableOrView, ifExists) if ifExists => + NoopCommand(u.multipartIdentifier) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 2a0117cb477c..bedd4f820250 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -416,9 +416,9 @@ case class DropTable( } /** - * The logical plan for handling non-existing table for DROP TABLE command. + * The logical plan for no-op command handling non-existing table. */ -case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command +case class NoopCommand(multipartIdentifier: Seq[String]) extends Command /** * The logical plan of the ALTER TABLE command. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f19b29bda484..783faae50723 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -231,7 +231,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DropTable(r: ResolvedTable, ifExists, purge) => DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil - case _: NoopDropTable => + case _: NoopCommand => LocalTableScanExec(Nil, Nil) :: Nil case AlterTable(catalog, ident, _, changes) => From f36bc598f7cbfc9e5cad8a9c25d164eae0ba3214 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 17 Nov 2020 22:47:54 -0800 Subject: [PATCH 04/20] Fix tests --- .../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/CachedTableSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 303cd492e4a7..f2d6587edd67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1973,8 +1973,8 @@ class DataSourceV2SQLSuite withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - testV1CommandSupportingTempView("UNCACHE TABLE", t) - testV1CommandSupportingTempView("UNCACHE TABLE", s"IF EXISTS $t") + testNotSupportedV2Command("UNCACHE TABLE", t) + testNotSupportedV2Command("UNCACHE TABLE", s"IF EXISTS $t") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index fc793534641d..8e6ebfcf1096 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -113,7 +113,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto e = intercept[AnalysisException] { sql("UNCACHE TABLE nonexistentTable") }.getMessage - assert(e.contains(s"$expectedErrorMsg default.nonexistentTable")) + assert(e.contains(s"$expectedErrorMsg nonexistentTable")) sql("UNCACHE TABLE IF EXISTS nonexistentTable") } From 4b2fba0a41d4fa52156daa3aac4af98cd4ba0057 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 19 Nov 2020 23:51:00 -0800 Subject: [PATCH 05/20] introduce CacheTableAsSelect --- .../spark/sql/catalyst/parser/AstBuilder.scala | 9 +++++++-- .../sql/catalyst/plans/logical/v2Commands.scala | 10 +++++++++- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 13 ++++++++++--- .../catalyst/analysis/ResolveSessionCatalog.scala | 9 ++++----- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 140b643574c2..aef1517b3c3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3323,7 +3323,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a [[CacheTable]]. + * Create a [[CacheTable]] pr [[CacheTableAsSelect]]. * * For example: * {{{ @@ -3343,7 +3343,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg "the table name in CACHE TABLE AS SELECT", ctx) } val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) - CacheTable(UnresolvedTableOrView(tableName), query, ctx.LAZY != null, options) + val isLazy = ctx.LAZY != null + if (query.isDefined) { + CacheTableAsSelect(tableName.head, query.get, isLazy, options) + } else { + CacheTable(UnresolvedTableOrView(tableName), isLazy, options) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 6f9e2f2c975b..fde811f94056 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -676,12 +676,20 @@ case class ShowCreateTable(child: LogicalPlan, asSerde: Boolean = false) extends */ case class CacheTable( child: LogicalPlan, - plan: Option[LogicalPlan], isLazy: Boolean, options: Map[String, String]) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } +/** + * The logical plan of the CACHE TABLE ... AS SELECT command. + */ +case class CacheTableAsSelect( + tempViewName: String, + plan: LogicalPlan, + isLazy: Boolean, + options: Map[String, String]) extends Command + /** * The logical plan of the UNCACHE TABLE command. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 4685d877597f..77bdc8754d27 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1594,17 +1594,24 @@ class DDLParserSuite extends AnalysisTest { test("CACHE TABLE") { comparePlans( parsePlan("CACHE TABLE a.b.c"), - CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), None, false, Map.empty)) + CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), false, Map.empty)) + + comparePlans( + parsePlan("CACHE TABLE t AS SELECT * FROM testData"), + CacheTableAsSelect( + "t", + Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData"))), + false, + Map.empty)) comparePlans( parsePlan("CACHE LAZY TABLE a.b.c"), - CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), None, true, Map.empty)) + CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), true, Map.empty)) comparePlans( parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"), CacheTable( UnresolvedTableOrView(Seq("a", "b", "c")), - None, true, Map("storageLevel" -> "DISK_ONLY"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7e4d917b3b5d..1aff94ffba35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -443,12 +443,11 @@ class ResolveSessionCatalog( } // CACHE TABLE ... AS SELECT creates a temp view with the input query. - // Thus, use the identifier in UnresolvedTableOrView directly, - case CacheTable(u: UnresolvedTableOrView, plan, isLazy, options) if plan.isDefined => - CacheTableCommand(u.multipartIdentifier.asTableIdentifier, plan, isLazy, options) + case CacheTableAsSelect(tempViewName, plan, isLazy, options) => + CacheTableCommand(TableIdentifier(tempViewName), Some(plan), isLazy, options) - case CacheTable(ResolvedV1TableOrViewIdentifier(ident), plan, isLazy, options) => - CacheTableCommand(ident.asTableIdentifier, plan, isLazy, options) + case CacheTable(ResolvedV1TableOrViewIdentifier(ident), isLazy, options) => + CacheTableCommand(ident.asTableIdentifier, None, isLazy, options) case UncacheTable(ResolvedV1TableOrViewIdentifier(ident), ifExists) => UncacheTableCommand(ident.asTableIdentifier, ifExists) From f232eba628ef006e90a1c27e426e966df499f5d7 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 20 Nov 2020 19:42:43 -0800 Subject: [PATCH 06/20] Address PR comments --- .../ResolveCommandsWithIfExists.scala | 4 +- .../catalyst/plans/logical/v2Commands.scala | 2 +- .../spark/sql/execution/CacheTableUtils.scala | 36 +++++++++ .../spark/sql/execution/command/cache.scala | 17 +--- .../datasources/v2/CacheTableExec.scala | 79 +++++++++++++++++++ .../datasources/v2/DataSourceV2Strategy.scala | 8 +- .../sql/connector/DataSourceV2SQLSuite.scala | 32 +++++--- 7 files changed, 148 insertions(+), 30 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/CacheTableUtils.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala index af08efb871b2..097fb292fc7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala @@ -29,8 +29,8 @@ import org.apache.spark.sql.catalyst.rules.Rule object ResolveCommandsWithIfExists extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists => - NoopCommand(u.multipartIdentifier) + NoopCommand("DROP TABLE", u.multipartIdentifier) case UncacheTable(u: UnresolvedTableOrView, ifExists) if ifExists => - NoopCommand(u.multipartIdentifier) + NoopCommand("UNCACHE TABLE", u.multipartIdentifier) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index fde811f94056..9d29f1a16762 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -419,7 +419,7 @@ case class DropTable( /** * The logical plan for no-op command handling non-existing table. */ -case class NoopCommand(multipartIdentifier: Seq[String]) extends Command +case class NoopCommand(command: String, multipartIdentifier: Seq[String]) extends Command /** * The logical plan of the ALTER TABLE command. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheTableUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheTableUtils.scala new file mode 100644 index 000000000000..917fd78c5f0c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheTableUtils.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.Locale + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap + +object CacheTableUtils extends Logging { + def getStorageLevel(options: Map[String, String]): Option[String] = { + val storageLevelKey = "storagelevel" + val storageLevelValue = + CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT)) + val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey) + if (withoutStorageLevel.nonEmpty) { + logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") + } + storageLevelValue + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index f99dc8d9f1a8..0a117bfffc99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -17,13 +17,11 @@ package org.apache.spark.sql.execution.command -import java.util.Locale - import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.CacheTableUtils import org.apache.spark.storage.StorageLevel case class CacheTableCommand( @@ -41,17 +39,10 @@ case class CacheTableCommand( Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString) } - val storageLevelKey = "storagelevel" - val storageLevelValue = - CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT)) - val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey) - if (withoutStorageLevel.nonEmpty) { - logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") - } - - if (storageLevelValue.nonEmpty) { + val optStorageLevel = CacheTableUtils.getStorageLevel(options) + if (optStorageLevel.nonEmpty) { sparkSession.catalog.cacheTable( - tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get)) + tableIdent.quotedString, StorageLevel.fromString(optStorageLevel.get)) } else { sparkSession.catalog.cacheTable(tableIdent.quotedString) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala new file mode 100644 index 000000000000..87eecbc90a74 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} +import org.apache.spark.sql.execution.CacheTableUtils +import org.apache.spark.storage.StorageLevel + +/** + * Physical plan node for caching a table. + */ +case class CacheTableExec( + session: SparkSession, + catalog: TableCatalog, + table: Table, + ident: Identifier, + isLazy: Boolean, + options: Map[String, String]) extends V2CommandExec { + override def run(): Seq[InternalRow] = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper + + val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) + val df = Dataset.ofRows(session, v2Relation) + val tableName = Some(ident.quoted) + val optStorageLevel = CacheTableUtils.getStorageLevel(options) + if (optStorageLevel.nonEmpty) { + session.sharedState.cacheManager.cacheQuery( + df, tableName, StorageLevel.fromString(optStorageLevel.get)) + } else { + session.sharedState.cacheManager.cacheQuery(df, tableName) + } + + if (!isLazy) { + // Performs eager caching. + df.count() + } + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} + +/** + * Physical plan node for uncaching a table. + */ +case class UncacheTableExec( + session: SparkSession, + catalog: TableCatalog, + table: Table, + ident: Identifier) extends V2CommandExec { + override def run(): Seq[InternalRow] = { + val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) + val df = Dataset.ofRows(session, v2Relation) + // Cascade should be true unless a temporary view is uncached. + session.sharedState.cacheManager.uncacheQuery(df, cascade = true) + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f2ccd7a025dc..974d8b4d42de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -299,11 +299,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case ShowCreateTable(_: ResolvedTable, _) => throw new AnalysisException("SHOW CREATE TABLE is not supported for v2 tables.") - case CacheTable(_: ResolvedTable, _, _, _) => - throw new AnalysisException("CACHE TABLE is not supported for v2 tables.") + case CacheTable(r: ResolvedTable, isLazy, options) => + CacheTableExec(session, r.catalog, r.table, r.identifier, isLazy, options) :: Nil - case UncacheTable(_: ResolvedTable, _) => - throw new AnalysisException("UNCACHE TABLE is not supported for v2 tables.") + case UncacheTable(r: ResolvedTable, _) => + UncacheTableExec(session, r.catalog, r.table, r.identifier) :: Nil case _ => Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 0af7060c5dfc..e9e3b83e0480 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -27,9 +27,11 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.NoopCommand import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership +import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION} import org.apache.spark.sql.internal.connector.SimpleTableProvider @@ -710,6 +712,7 @@ class DataSourceV2SQLSuite test("DropTable: basic") { val tableName = "testcat.ns1.ns2.tbl" + sql(s"EXPLAIN EXTENDED DROP TABLE IF EXISTS $tableName").show(false) val ident = Identifier.of(Array("ns1", "ns2"), "tbl") sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source") assert(catalog("testcat").asTableCatalog.tableExists(ident) === true) @@ -1940,23 +1943,32 @@ class DataSourceV2SQLSuite } } - test("CACHE TABLE") { + test("CACHE/UNCACHE TABLE") { val t = "testcat.ns1.ns2.tbl" withTable(t) { + def isCached(table: String): Boolean = { + spark.table(table).queryExecution.withCachedData.isInstanceOf[InMemoryRelation] + } + spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - testNotSupportedV2Command("CACHE TABLE", t) - testNotSupportedV2Command("CACHE LAZY TABLE", t, sqlCommandInMessage = Some("CACHE TABLE")) + sql(s"CACHE TABLE $t") + assert(isCached(t)) + + sql(s"UNCACHE TABLE $t") + assert(!isCached(t)) } - } - test("UNCACHE TABLE") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + // Test a scenario where a table does not exist. + val e = intercept[AnalysisException] { + sql(s"UNCACHE TABLE $t") + } + assert(e.message.contains("Table or view not found: testcat.ns1.ns2.tbl")) - testNotSupportedV2Command("UNCACHE TABLE", t) - testNotSupportedV2Command("UNCACHE TABLE", s"IF EXISTS $t") + // If "IF EXISTS" is set, UNCACHE TABLE will be a no-op. + val noop = sql(s"UNCACHE TABLE IF EXISTS $t").queryExecution.optimizedPlan.collect { + case n @ NoopCommand("UNCACHE TABLE", Seq("testcat", "ns1", "ns2", "tbl")) => n } + assert(noop.length == 1) } test("SHOW COLUMNS") { From 8c0140cdd778c7e28d8b537725b9c5fc322f5433 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 20 Nov 2020 23:00:02 -0800 Subject: [PATCH 07/20] Rename to command name --- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 9d29f1a16762..9245d8b5de7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -419,7 +419,7 @@ case class DropTable( /** * The logical plan for no-op command handling non-existing table. */ -case class NoopCommand(command: String, multipartIdentifier: Seq[String]) extends Command +case class NoopCommand(commandName: String, multipartIdentifier: Seq[String]) extends Command /** * The logical plan of the ALTER TABLE command. From 0bdfcee94ce484114161930ab78ec1197251cb51 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sun, 22 Nov 2020 12:25:54 -0800 Subject: [PATCH 08/20] Address PR comments --- .../scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../spark/sql/execution/datasources/v2/CacheTableExec.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index aef1517b3c3d..c9f08487bacb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3323,7 +3323,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a [[CacheTable]] pr [[CacheTableAsSelect]]. + * Create a [[CacheTable]] or [[CacheTableAsSelect]]. * * For example: * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index 87eecbc90a74..ee9878e594ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -70,7 +70,6 @@ case class UncacheTableExec( override def run(): Seq[InternalRow] = { val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) val df = Dataset.ofRows(session, v2Relation) - // Cascade should be true unless a temporary view is uncached. session.sharedState.cacheManager.uncacheQuery(df, cascade = true) Seq.empty } From 7ee6eb0a4fcd637ebf36f0250cc5aac8108c26bc Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 23 Nov 2020 10:03:37 -0800 Subject: [PATCH 09/20] Address PR comments --- .../spark/sql/catalyst/parser/AstBuilder.scala | 12 ++++++------ .../execution/datasources/v2/CacheTableExec.scala | 9 ++++----- .../datasources/v2/DataSourceV2Strategy.scala | 6 +++--- .../sql/execution/datasources/v2/DropTableExec.scala | 3 +-- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ae31703bc877..d7fd9185ec7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3336,15 +3336,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg val query = Option(ctx.query).map(plan) val tableName = visitMultipartIdentifier(ctx.multipartIdentifier) - if (query.isDefined && tableName.length > 1) { - val catalogAndNamespace = tableName.init - throw new ParseException("It is not allowed to add catalog/namespace " + - s"prefix ${catalogAndNamespace.quoted} to " + - "the table name in CACHE TABLE AS SELECT", ctx) - } val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) val isLazy = ctx.LAZY != null if (query.isDefined) { + if (tableName.length > 1) { + val catalogAndNamespace = tableName.init + throw new ParseException("It is not allowed to add catalog/namespace " + + s"prefix ${catalogAndNamespace.quoted} to " + + "the table name in CACHE TABLE AS SELECT", ctx) + } CacheTableAsSelect(tableName.head, query.get, isLazy, options) } else { CacheTable(UnresolvedTableOrView(tableName), isLazy, options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index ee9878e594ee..badc167b11cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} @@ -28,7 +28,6 @@ import org.apache.spark.storage.StorageLevel * Physical plan node for caching a table. */ case class CacheTableExec( - session: SparkSession, catalog: TableCatalog, table: Table, ident: Identifier, @@ -37,6 +36,7 @@ case class CacheTableExec( override def run(): Seq[InternalRow] = { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper + val session = sqlContext.sparkSession val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) val df = Dataset.ofRows(session, v2Relation) val tableName = Some(ident.quoted) @@ -63,14 +63,13 @@ case class CacheTableExec( * Physical plan node for uncaching a table. */ case class UncacheTableExec( - session: SparkSession, catalog: TableCatalog, table: Table, ident: Identifier) extends V2CommandExec { override def run(): Seq[InternalRow] = { + val session = sqlContext.sparkSession val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - val df = Dataset.ofRows(session, v2Relation) - session.sharedState.cacheManager.uncacheQuery(df, cascade = true) + session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f59a943933bd..b814ea776128 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -232,7 +232,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat throw new AnalysisException("Describing columns is not supported for v2 tables.") case DropTable(r: ResolvedTable, ifExists, purge) => - DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil + DropTableExec(r.catalog, r.table, r.identifier, ifExists, purge) :: Nil case _: NoopCommand => LocalTableScanExec(Nil, Nil) :: Nil @@ -303,10 +303,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat throw new AnalysisException("SHOW CREATE TABLE is not supported for v2 tables.") case CacheTable(r: ResolvedTable, isLazy, options) => - CacheTableExec(session, r.catalog, r.table, r.identifier, isLazy, options) :: Nil + CacheTableExec(r.catalog, r.table, r.identifier, isLazy, options) :: Nil case UncacheTable(r: ResolvedTable, _) => - UncacheTableExec(session, r.catalog, r.table, r.identifier) :: Nil + UncacheTableExec(r.catalog, r.table, r.identifier) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index 068475fc56f4..9f6a2189354c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -27,7 +26,6 @@ import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} * Physical plan node for dropping a table. */ case class DropTableExec( - session: SparkSession, catalog: TableCatalog, table: Table, ident: Identifier, @@ -36,6 +34,7 @@ case class DropTableExec( override def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { + val session = sqlContext.sparkSession val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) catalog.dropTable(ident, purge) From a5923ab0e5c525a998f3e0f630fb3de0bf460071 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 24 Nov 2020 00:03:40 -0800 Subject: [PATCH 10/20] Address PR comments --- .../scala/org/apache/spark/sql/CachedTableSuite.scala | 11 +++++++++++ .../spark/sql/connector/DataSourceV2SQLSuite.scala | 1 - 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 6313370476c9..ef3f4daa6dc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.executor.DataReadMethod._ import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH} import org.apache.spark.sql.catalyst.util.DateTimeConstants @@ -140,6 +141,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } } + test("cache table as select - existing temp view") { + withTempView("tempView") { + sql("CREATE TEMPORARY VIEW tempView as SELECT 1") + val e = intercept[TempTableAlreadyExistsException] { + sql("CACHE TABLE tempView AS SELECT 1") + } + assert(e.getMessage.contains("Temporary view 'tempView' already exists")) + } + } + test("uncaching temp table") { withTempView("tempTable1", "tempTable2") { testData.select("key").createOrReplaceTempView("tempTable1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 2c56ff30fc32..468f1dc7d148 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -712,7 +712,6 @@ class DataSourceV2SQLSuite test("DropTable: basic") { val tableName = "testcat.ns1.ns2.tbl" - sql(s"EXPLAIN EXTENDED DROP TABLE IF EXISTS $tableName").show(false) val ident = Identifier.of(Array("ns1", "ns2"), "tbl") sql(s"CREATE TABLE $tableName USING foo AS SELECT id, data FROM source") assert(catalog("testcat").asTableCatalog.tableExists(ident) === true) From c0e4f3ee4a1760e97ee999aa542fbe7cae72db50 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 24 Nov 2020 21:20:16 -0800 Subject: [PATCH 11/20] Address PR comments --- .../ResolveCommandsWithIfExists.scala | 4 +- .../sql/catalyst/parser/AstBuilder.scala | 18 ++--- .../catalyst/plans/logical/v2Commands.scala | 19 +---- .../sql/catalyst/parser/DDLParserSuite.scala | 17 ++-- .../analysis/ResolveSessionCatalog.scala | 12 +-- .../spark/sql/execution/CacheTableUtils.scala | 36 --------- .../spark/sql/execution/command/cache.scala | 47 +++++++---- .../datasources/v2/CacheTableExec.scala | 77 ------------------- .../datasources/v2/DataSourceV2Strategy.scala | 6 -- .../spark/sql/internal/CatalogImpl.scala | 7 +- .../sql/connector/DataSourceV2SQLSuite.scala | 8 +- 11 files changed, 62 insertions(+), 189 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/CacheTableUtils.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala index 097fb292fc7c..ad076a19f836 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopCommand, UncacheTable} +import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopCommand} import org.apache.spark.sql.catalyst.rules.Rule /** @@ -30,7 +30,5 @@ object ResolveCommandsWithIfExists extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists => NoopCommand("DROP TABLE", u.multipartIdentifier) - case UncacheTable(u: UnresolvedTableOrView, ifExists) if ifExists => - NoopCommand("UNCACHE TABLE", u.multipartIdentifier) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 81f647bc3bb0..9484d5545247 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3340,17 +3340,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg val tableName = visitMultipartIdentifier(ctx.multipartIdentifier) val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) val isLazy = ctx.LAZY != null - if (query.isDefined) { - if (tableName.length > 1) { - val catalogAndNamespace = tableName.init - throw new ParseException("It is not allowed to add catalog/namespace " + - s"prefix ${catalogAndNamespace.quoted} to " + - "the table name in CACHE TABLE AS SELECT", ctx) - } - CacheTableAsSelect(tableName.head, query.get, isLazy, options) - } else { - CacheTable(UnresolvedTableOrView(tableName), isLazy, options) + if (query.isDefined && tableName.length > 1) { + val catalogAndNamespace = tableName.init + throw new ParseException("It is not allowed to add catalog/namespace " + + s"prefix ${catalogAndNamespace.quoted} to " + + "the table name in CACHE TABLE AS SELECT", ctx) } + CacheTable(tableName, query, isLazy, options) } /** @@ -3358,7 +3354,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { UncacheTable( - UnresolvedTableOrView(visitMultipartIdentifier(ctx.multipartIdentifier)), + visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 83a1b5dd6afc..e623d80cfdac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -684,25 +684,12 @@ case class TruncateTable( * The logical plan of the CACHE TABLE command. */ case class CacheTable( - child: LogicalPlan, - isLazy: Boolean, - options: Map[String, String]) extends Command { - override def children: Seq[LogicalPlan] = child :: Nil -} - -/** - * The logical plan of the CACHE TABLE ... AS SELECT command. - */ -case class CacheTableAsSelect( - tempViewName: String, - plan: LogicalPlan, + tableName: Seq[String], + plan: Option[LogicalPlan], isLazy: Boolean, options: Map[String, String]) extends Command /** * The logical plan of the UNCACHE TABLE command. */ -case class UncacheTable(child: LogicalPlan, ifExists: Boolean) extends Command { - override def children: Seq[LogicalPlan] = child :: Nil -} - +case class UncacheTable(tableName: Seq[String], ifExists: Boolean) extends Command diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0b278088d661..d9ccf7fab484 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1594,24 +1594,25 @@ class DDLParserSuite extends AnalysisTest { test("CACHE TABLE") { comparePlans( parsePlan("CACHE TABLE a.b.c"), - CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), false, Map.empty)) + CacheTable(Seq("a", "b", "c"), None, false, Map.empty)) comparePlans( parsePlan("CACHE TABLE t AS SELECT * FROM testData"), - CacheTableAsSelect( - "t", - Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData"))), + CacheTable( + Seq("t"), + Some(Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData")))), false, Map.empty)) comparePlans( parsePlan("CACHE LAZY TABLE a.b.c"), - CacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), true, Map.empty)) + CacheTable(Seq("a", "b", "c"), None, true, Map.empty)) comparePlans( parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"), CacheTable( - UnresolvedTableOrView(Seq("a", "b", "c")), + Seq("a", "b", "c"), + None, true, Map("storageLevel" -> "DISK_ONLY"))) @@ -1622,11 +1623,11 @@ class DDLParserSuite extends AnalysisTest { test("UNCACHE TABLE") { comparePlans( parsePlan("UNCACHE TABLE a.b.c"), - UncacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), ifExists = false)) + UncacheTable(Seq("a", "b", "c"), ifExists = false)) comparePlans( parsePlan("UNCACHE TABLE IF EXISTS a.b.c"), - UncacheTable(UnresolvedTableOrView(Seq("a", "b", "c")), ifExists = true)) + UncacheTable(Seq("a", "b", "c"), ifExists = true)) } test("TRUNCATE table") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 879a227dd0d8..cfc4d0db2871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -442,15 +442,11 @@ class ResolveSessionCatalog( ShowCreateTableCommand(ident.asTableIdentifier) } - // CACHE TABLE ... AS SELECT creates a temp view with the input query. - case CacheTableAsSelect(tempViewName, plan, isLazy, options) => - CacheTableCommand(TableIdentifier(tempViewName), Some(plan), isLazy, options) + case CacheTable(multipartIdent, plan, isLazy, options) => + CacheTableCommand(multipartIdent, plan, isLazy, options) - case CacheTable(ResolvedV1TableOrViewIdentifier(ident), isLazy, options) => - CacheTableCommand(ident.asTableIdentifier, None, isLazy, options) - - case UncacheTable(ResolvedV1TableOrViewIdentifier(ident), ifExists) => - UncacheTableCommand(ident.asTableIdentifier, ifExists) + case UncacheTable(multipartIdent, ifExists) => + UncacheTableCommand(multipartIdent, ifExists) case TruncateTable(ResolvedV1TableIdentifier(ident), partitionSpec) => TruncateTableCommand( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheTableUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheTableUtils.scala deleted file mode 100644 index 917fd78c5f0c..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheTableUtils.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import java.util.Locale - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap - -object CacheTableUtils extends Logging { - def getStorageLevel(options: Map[String, String]): Option[String] = { - val storageLevelKey = "storagelevel" - val storageLevelValue = - CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT)) - val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey) - if (withoutStorageLevel.nonEmpty) { - logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") - } - storageLevelValue - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 0a117bfffc99..04bfda5f0d02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -17,57 +17,72 @@ package org.apache.spark.sql.execution.command +import java.util.Locale + +import scala.util.Try + import org.apache.spark.sql.{Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} -import org.apache.spark.sql.execution.CacheTableUtils +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper import org.apache.spark.storage.StorageLevel case class CacheTableCommand( - tableIdent: TableIdentifier, + multipartIdentifier: Seq[String], plan: Option[LogicalPlan], isLazy: Boolean, options: Map[String, String]) extends RunnableCommand { - require(plan.isEmpty || tableIdent.database.isEmpty, - "Database name is not allowed in CACHE TABLE AS SELECT") + require(plan.isEmpty || multipartIdentifier.length == 1, + "Namespace name is not allowed in CACHE TABLE AS SELECT") override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq override def run(sparkSession: SparkSession): Seq[Row] = { + val tableName = multipartIdentifier.quoted plan.foreach { logicalPlan => - Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString) + Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableName) + } + + val storageLevelKey = "storagelevel" + val storageLevelValue = + CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT)) + val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey) + if (withoutStorageLevel.nonEmpty) { + logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") } - val optStorageLevel = CacheTableUtils.getStorageLevel(options) - if (optStorageLevel.nonEmpty) { + if (storageLevelValue.nonEmpty) { sparkSession.catalog.cacheTable( - tableIdent.quotedString, StorageLevel.fromString(optStorageLevel.get)) + tableName, StorageLevel.fromString(storageLevelValue.get)) } else { - sparkSession.catalog.cacheTable(tableIdent.quotedString) + sparkSession.catalog.cacheTable(tableName) } if (!isLazy) { // Performs eager caching - sparkSession.table(tableIdent).count() + sparkSession.table(tableName).count() } Seq.empty[Row] } } - case class UncacheTableCommand( - tableIdent: TableIdentifier, + multipartIdentifier: Seq[String], ifExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - val tableId = tableIdent.quotedString - if (!ifExists || sparkSession.catalog.tableExists(tableId)) { - sparkSession.catalog.uncacheTable(tableId) + val tableName = multipartIdentifier.quoted + if (!ifExists || tableExists(sparkSession, tableName)) { + sparkSession.catalog.uncacheTable(tableName) } Seq.empty[Row] } + + private def tableExists(sparkSession: SparkSession, name: String): Boolean = { + Try(sparkSession.table(name)).isSuccess + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala deleted file mode 100644 index badc167b11cf..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.v2 - -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} -import org.apache.spark.sql.execution.CacheTableUtils -import org.apache.spark.storage.StorageLevel - -/** - * Physical plan node for caching a table. - */ -case class CacheTableExec( - catalog: TableCatalog, - table: Table, - ident: Identifier, - isLazy: Boolean, - options: Map[String, String]) extends V2CommandExec { - override def run(): Seq[InternalRow] = { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper - - val session = sqlContext.sparkSession - val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - val df = Dataset.ofRows(session, v2Relation) - val tableName = Some(ident.quoted) - val optStorageLevel = CacheTableUtils.getStorageLevel(options) - if (optStorageLevel.nonEmpty) { - session.sharedState.cacheManager.cacheQuery( - df, tableName, StorageLevel.fromString(optStorageLevel.get)) - } else { - session.sharedState.cacheManager.cacheQuery(df, tableName) - } - - if (!isLazy) { - // Performs eager caching. - df.count() - } - - Seq.empty - } - - override def output: Seq[Attribute] = Seq.empty -} - -/** - * Physical plan node for uncaching a table. - */ -case class UncacheTableExec( - catalog: TableCatalog, - table: Table, - ident: Identifier) extends V2CommandExec { - override def run(): Seq[InternalRow] = { - val session = sqlContext.sparkSession - val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) - session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) - Seq.empty - } - - override def output: Seq[Attribute] = Seq.empty -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 55f22a31194a..fbeba4ec9817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -305,12 +305,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case TruncateTable(_: ResolvedTable, _) => throw new AnalysisException("TRUNCATE TABLE is not supported for v2 tables.") - case CacheTable(r: ResolvedTable, isLazy, options) => - CacheTableExec(r.catalog, r.table, r.identifier, isLazy, options) :: Nil - - case UncacheTable(r: ResolvedTable, _) => - UncacheTableExec(r.catalog, r.table, r.identifier) :: Nil - case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 3e216415c281..c5f7aa6f592c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -471,8 +471,11 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def uncacheTable(tableName: String): Unit = { - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - val cascade = !sessionCatalog.isTemporaryTable(tableIdent) + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper + val multipartIdentifier = + sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) + val cascade = (multipartIdentifier.length <= 2) && + !sessionCatalog.isTemporaryTable(multipartIdentifier.asTableIdentifier) sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName), cascade) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index daee65bf7d9e..36dff21930de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.NoopCommand import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership @@ -2041,11 +2040,8 @@ class DataSourceV2SQLSuite } assert(e.message.contains("Table or view not found: testcat.ns1.ns2.tbl")) - // If "IF EXISTS" is set, UNCACHE TABLE will be a no-op. - val noop = sql(s"UNCACHE TABLE IF EXISTS $t").queryExecution.optimizedPlan.collect { - case n @ NoopCommand("UNCACHE TABLE", Seq("testcat", "ns1", "ns2", "tbl")) => n - } - assert(noop.length == 1) + // If "IF EXISTS" is set, UNCACHE TABLE will not throw an exception. + sql(s"UNCACHE TABLE IF EXISTS $t") } test("SHOW COLUMNS") { From 47dc974148e80449f2a770206cb43e8b2cbc9ffa Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 24 Nov 2020 21:23:44 -0800 Subject: [PATCH 12/20] revert to minimize the churn --- .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 9484d5545247..f09d227b42e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3325,7 +3325,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a [[CacheTable]] or [[CacheTableAsSelect]]. + * Create a [[CacheTable]]. * * For example: * {{{ @@ -3338,15 +3338,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg val query = Option(ctx.query).map(plan) val tableName = visitMultipartIdentifier(ctx.multipartIdentifier) - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) - val isLazy = ctx.LAZY != null if (query.isDefined && tableName.length > 1) { val catalogAndNamespace = tableName.init throw new ParseException("It is not allowed to add catalog/namespace " + s"prefix ${catalogAndNamespace.quoted} to " + "the table name in CACHE TABLE AS SELECT", ctx) } - CacheTable(tableName, query, isLazy, options) + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + CacheTable(tableName, query, ctx.LAZY != null, options) } /** From b33d807c0fa771e910a2d8e93ccbccaf5d7db1eb Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 25 Nov 2020 10:04:03 -0800 Subject: [PATCH 13/20] Address comments --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- ...xists.scala => ResolveNoopDropTable.scala} | 11 +++-- .../sql/catalyst/parser/AstBuilder.scala | 33 -------------- .../catalyst/plans/logical/v2Commands.scala | 18 +------- .../sql/catalyst/parser/DDLParserSuite.scala | 39 ---------------- .../analysis/ResolveSessionCatalog.scala | 6 --- .../spark/sql/execution/SparkSqlParser.scala | 34 ++++++++++++++ .../datasources/v2/DataSourceV2Strategy.scala | 2 +- .../sql/execution/SparkSqlParserSuite.scala | 45 ++++++++++++++++++- 9 files changed, 87 insertions(+), 103 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/{ResolveCommandsWithIfExists.scala => ResolveNoopDropTable.scala} (73%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5fe0d3071e66..837686420375 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -257,7 +257,7 @@ class Analyzer(override val catalogManager: CatalogManager) TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, - Seq(ResolveCommandsWithIfExists) ++ + Seq(ResolveNoopDropTable) ++ postHocResolutionRules: _*), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Remove Unresolved Hints", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala similarity index 73% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala index ad076a19f836..f9da9174f85e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCommandsWithIfExists.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveNoopDropTable.scala @@ -17,18 +17,17 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopCommand} +import org.apache.spark.sql.catalyst.plans.logical.{DropTable, LogicalPlan, NoopDropTable} import org.apache.spark.sql.catalyst.rules.Rule /** - * A rule for handling commands when the table or temp view is not resolved. - * These commands support a flag, "ifExists", so that they do not fail when a relation is not - * resolved. If the "ifExists" flag is set to true. the plan is resolved to [[NoopCommand]], + * A rule for handling [[DropTable]] logical plan when the table or temp view is not resolved. + * If "ifExists" flag is set to true, the plan is resolved to [[NoopDropTable]], * which is a no-op command. */ -object ResolveCommandsWithIfExists extends Rule[LogicalPlan] { +object ResolveNoopDropTable extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case DropTable(u: UnresolvedTableOrView, ifExists, _) if ifExists => - NoopCommand("DROP TABLE", u.multipartIdentifier) + NoopDropTable(u.multipartIdentifier) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 10c8ff5556ca..3a5e027979b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3575,39 +3575,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg ctx.SERDE != null) } - /** - * Create a [[CacheTable]]. - * - * For example: - * {{{ - * CACHE [LAZY] TABLE multi_part_name - * [OPTIONS tablePropertyList] [[AS] query] - * }}} - */ - override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - - val query = Option(ctx.query).map(plan) - val tableName = visitMultipartIdentifier(ctx.multipartIdentifier) - if (query.isDefined && tableName.length > 1) { - val catalogAndNamespace = tableName.init - throw new ParseException("It is not allowed to add catalog/namespace " + - s"prefix ${catalogAndNamespace.quoted} to " + - "the table name in CACHE TABLE AS SELECT", ctx) - } - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) - CacheTable(tableName, query, ctx.LAZY != null, options) - } - - /** - * Create an [[UncacheTable]] logical plan. - */ - override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { - UncacheTable( - visitMultipartIdentifier(ctx.multipartIdentifier), - ctx.EXISTS != null) - } - /** * Create a [[TruncateTable]] command. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 13eef3696595..6dbedfefceb9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -417,9 +417,9 @@ case class DropTable( } /** - * The logical plan for no-op command handling non-existing table. + * The logical plan for handling non-existing table for DROP TABLE command. */ -case class NoopCommand(commandName: String, multipartIdentifier: Seq[String]) extends Command +case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command /** * The logical plan of the ALTER TABLE command. @@ -688,17 +688,3 @@ case class TruncateTable( partitionSpec: Option[TablePartitionSpec]) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } - -/** - * The logical plan of the CACHE TABLE command. - */ -case class CacheTable( - tableName: Seq[String], - plan: Option[LogicalPlan], - isLazy: Boolean, - options: Map[String, String]) extends Command - -/** - * The logical plan of the UNCACHE TABLE command. - */ -case class UncacheTable(tableName: Seq[String], ifExists: Boolean) extends Command diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index c8a2a58daffd..6b957a917123 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1907,45 +1907,6 @@ class DDLParserSuite extends AnalysisTest { asSerde = true)) } - test("CACHE TABLE") { - comparePlans( - parsePlan("CACHE TABLE a.b.c"), - CacheTable(Seq("a", "b", "c"), None, false, Map.empty)) - - comparePlans( - parsePlan("CACHE TABLE t AS SELECT * FROM testData"), - CacheTable( - Seq("t"), - Some(Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData")))), - false, - Map.empty)) - - comparePlans( - parsePlan("CACHE LAZY TABLE a.b.c"), - CacheTable(Seq("a", "b", "c"), None, true, Map.empty)) - - comparePlans( - parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"), - CacheTable( - Seq("a", "b", "c"), - None, - true, - Map("storageLevel" -> "DISK_ONLY"))) - - intercept("CACHE TABLE a.b.c AS SELECT * FROM testData", - "It is not allowed to add catalog/namespace prefix a.b") - } - - test("UNCACHE TABLE") { - comparePlans( - parsePlan("UNCACHE TABLE a.b.c"), - UncacheTable(Seq("a", "b", "c"), ifExists = false)) - - comparePlans( - parsePlan("UNCACHE TABLE IF EXISTS a.b.c"), - UncacheTable(Seq("a", "b", "c"), ifExists = true)) - } - test("TRUNCATE table") { comparePlans( parsePlan("TRUNCATE TABLE a.b.c"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 62604115b1a4..582f11a2be8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -446,12 +446,6 @@ class ResolveSessionCatalog( ShowCreateTableCommand(ident.asTableIdentifier) } - case CacheTable(multipartIdent, plan, isLazy, options) => - CacheTableCommand(multipartIdent, plan, isLazy, options) - - case UncacheTable(multipartIdent, ifExists) => - UncacheTableCommand(multipartIdent, ifExists) - case TruncateTable(ResolvedV1TableIdentifier(ident), partitionSpec) => TruncateTableCommand( ident.asTableIdentifier, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a92f0775f1c0..d1c838c2e344 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -192,6 +192,40 @@ class SparkSqlAstBuilder extends AstBuilder { unquotedPath } + /** + * Create a [[CacheTableCommand]]. + * + * For example: + * {{{ + * CACHE [LAZY] TABLE multi_part_name + * [OPTIONS tablePropertyList] [[AS] query] + * }}} + */ + override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = withOrigin(ctx) { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + val query = Option(ctx.query).map(plan) + val tableName = visitMultipartIdentifier(ctx.multipartIdentifier) + if (query.isDefined && tableName.length > 1) { + val catalogAndNamespace = tableName.init + throw new ParseException("It is not allowed to add catalog/namespace " + + s"prefix ${catalogAndNamespace.quoted} to " + + "the table name in CACHE TABLE AS SELECT", ctx) + } + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + CacheTableCommand(tableName, query, ctx.LAZY != null, options) + } + + + /** + * Create an [[UncacheTableCommand]] logical plan. + */ + override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = withOrigin(ctx) { + UncacheTableCommand( + visitMultipartIdentifier(ctx.multipartIdentifier), + ctx.EXISTS != null) + } + /** * Create a [[ClearCacheCommand]] logical plan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 9838b6631a0c..3dfc221a02fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -234,7 +234,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DropTable(r: ResolvedTable, ifExists, purge) => DropTableExec(r.catalog, r.table, r.identifier, ifExists, purge) :: Nil - case _: NoopCommand => + case _: NoopDropTable => LocalTableScanExec(Nil, Nil) :: Nil case AlterTable(catalog, ident, _, changes) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 61c16baedb7c..1a826c00c81f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -337,5 +337,48 @@ class SparkSqlParserSuite extends AnalysisTest { |FROM v """.stripMargin, "LINES TERMINATED BY only supports newline '\\n' right now") - } + } + + test("CACHE TABLE") { + assertEqual( + "CACHE TABLE a.b.c", + CacheTableCommand(Seq("a", "b", "c"), None, false, Map.empty)) + + assertEqual( + "CACHE TABLE t AS SELECT * FROM testData", + CacheTableCommand( + Seq("t"), + Some(Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testData")))), + false, + Map.empty)) + + assertEqual( + "CACHE LAZY TABLE a.b.c", + CacheTableCommand(Seq("a", "b", "c"), None, true, Map.empty)) + + assertEqual( + "CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')", + CacheTableCommand( + Seq("a", "b", "c"), + None, + true, + Map("storageLevel" -> "DISK_ONLY"))) + + intercept("CACHE TABLE a.b.c AS SELECT * FROM testData", + "It is not allowed to add catalog/namespace prefix a.b") + } + + test("UNCACHE TABLE") { + assertEqual( + "UNCACHE TABLE a.b.c", + UncacheTableCommand(Seq("a", "b", "c"), ifExists = false)) + + assertEqual( + "UNCACHE TABLE IF EXISTS a.b.c", + UncacheTableCommand(Seq("a", "b", "c"), ifExists = true)) + } + + test("CLEAR CACHE") { + assertEqual("CLEAR CACHE", ClearCacheCommand) + } } From 4c2d5e23868eaa58e39e2910fb438d30b49f2098 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 25 Nov 2020 10:06:39 -0800 Subject: [PATCH 14/20] revert --- .../org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 6dbedfefceb9..ebf41f6a6e30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -688,3 +688,4 @@ case class TruncateTable( partitionSpec: Option[TablePartitionSpec]) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + From 3c4a0cf4394823800e50d5dbeb0ebef2a1c09e49 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 25 Nov 2020 10:14:19 -0800 Subject: [PATCH 15/20] Fix compilation --- .../test/scala/org/apache/spark/sql/hive/test/TestHive.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index f7c13ea047da..a25c61c96f3d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -596,7 +596,7 @@ private[hive] class TestHiveQueryExecution( override lazy val analyzed: LogicalPlan = sparkSession.withActive { val describedTables = logical match { - case CacheTableCommand(tbl, _, _, _) => tbl :: Nil + case CacheTableCommand(tbl, _, _, _) => tbl.asTableIdentifier :: Nil case _ => Nil } From 7f5a0b25cc5edf9a93e127060f0b6d64d5a105d4 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 25 Nov 2020 10:27:34 -0800 Subject: [PATCH 16/20] fix tests --- .../org/apache/spark/sql/hive/CachedTableSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 8e6ebfcf1096..81c3f271b18d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -364,14 +364,14 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto // Cache the table 'cachedTable' in temp db with qualified table name, // and then check whether the table is cached with expected name sql(s"CACHE TABLE $db.cachedTable OPTIONS('storageLevel' 'MEMORY_ONLY')") - assertCached(sql(s"SELECT * FROM $db.cachedTable"), s"`$db`.`cachedTable`", MEMORY_ONLY) + assertCached(sql(s"SELECT * FROM $db.cachedTable"), s"$db.cachedTable", MEMORY_ONLY) assert(spark.catalog.isCached(s"$db.cachedTable"), s"Table '$db.cachedTable' should be cached.") // Refresh the table 'cachedTable' in temp db with qualified table name, and then check // whether the table is still cached with the same name and storage level. sql(s"REFRESH TABLE $db.cachedTable") - assertCached(sql(s"select * from $db.cachedTable"), s"`$db`.`cachedTable`", MEMORY_ONLY) + assertCached(sql(s"select * from $db.cachedTable"), s"$db.cachedTable", MEMORY_ONLY) assert(spark.catalog.isCached(s"$db.cachedTable"), s"Table '$db.cachedTable' should be cached after refreshing with its qualified name.") @@ -382,7 +382,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto // 'cachedTable', instead of '$db.cachedTable' activateDatabase(db) { sql("REFRESH TABLE cachedTable") - assertCached(sql("SELECT * FROM cachedTable"), s"`$db`.`cachedTable`", MEMORY_ONLY) + assertCached(sql("SELECT * FROM cachedTable"), s"$db.cachedTable", MEMORY_ONLY) assert(spark.catalog.isCached("cachedTable"), s"Table '$db.cachedTable' should be cached after refreshing with its " + "unqualified name.") @@ -403,13 +403,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto // Cache the table 'cachedTable' in default db without qualified table name , and then // check whether the table is cached with expected name. sql("CACHE TABLE cachedTable OPTIONS('storageLevel' 'DISK_ONLY')") - assertCached(sql("SELECT * FROM cachedTable"), "`default`.`cachedTable`", DISK_ONLY) + assertCached(sql("SELECT * FROM cachedTable"), "cachedTable", DISK_ONLY) assert(spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should be cached.") // Refresh the table 'cachedTable' in default db with unqualified table name, and then // check whether the table is still cached with the same name. sql("REFRESH TABLE cachedTable") - assertCached(sql("SELECT * FROM cachedTable"), "`default`.`cachedTable`", DISK_ONLY) + assertCached(sql("SELECT * FROM cachedTable"), "cachedTable", DISK_ONLY) assert(spark.catalog.isCached("cachedTable"), "Table 'cachedTable' should be cached after refreshing with its unqualified name.") @@ -421,7 +421,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto activateDatabase(db) { sql("REFRESH TABLE default.cachedTable") assertCached( - sql("SELECT * FROM default.cachedTable"), "`default`.`cachedTable`", DISK_ONLY) + sql("SELECT * FROM default.cachedTable"), "cachedTable", DISK_ONLY) assert(spark.catalog.isCached("default.cachedTable"), "Table 'cachedTable' should be cached after refreshing with its qualified name.") } From d0f49eff7db0e5775e6ed769fb23e6a1f7cf203a Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 25 Nov 2020 15:40:57 -0800 Subject: [PATCH 17/20] Fix tests --- .../spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 7cc60bb50508..5bf789247808 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -305,7 +305,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val plan = statement.executeQuery("explain select * from test_table") plan.next() plan.next() - assert(plan.getString(1).contains("Scan In-memory table `test_table`")) + assert(plan.getString(1).contains("Scan In-memory table test_table")) val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC") val buf1 = new collection.mutable.ArrayBuffer[Int]() @@ -391,7 +391,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC") plan.next() plan.next() - assert(plan.getString(1).contains("Scan In-memory table `test_table`")) + assert(plan.getString(1).contains("Scan In-memory table test_table")) val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC") val buf = new collection.mutable.ArrayBuffer[Int]() From ed1a6dba3fa40f32e59c6b1922a46057dd274a00 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 25 Nov 2020 18:35:54 -0800 Subject: [PATCH 18/20] fix test --- .../org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 4872906dbfec..b4f921efcac8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -705,7 +705,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1") sql("CACHE TABLE inMemoryTable") testSparkPlanMetrics(spark.table("inMemoryTable"), 1, - Map(1L -> (("Scan In-memory table `inMemoryTable`", Map.empty))) + Map(1L -> (("Scan In-memory table inMemoryTable", Map.empty))) ) sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1") From 4e0e82fba445eb24f712382841a9271e9a5ee9e6 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 25 Nov 2020 20:16:07 -0800 Subject: [PATCH 19/20] Address comments --- .../execution/datasources/v2/DataSourceV2Strategy.scala | 2 +- .../spark/sql/execution/datasources/v2/DropTableExec.scala | 3 ++- .../scala/org/apache/spark/sql/internal/CatalogImpl.scala | 7 ++----- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3dfc221a02fd..eb0d7010041b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -232,7 +232,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat throw new AnalysisException("Describing columns is not supported for v2 tables.") case DropTable(r: ResolvedTable, ifExists, purge) => - DropTableExec(r.catalog, r.table, r.identifier, ifExists, purge) :: Nil + DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil case _: NoopDropTable => LocalTableScanExec(Nil, Nil) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index 9f6a2189354c..068475fc56f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -26,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} * Physical plan node for dropping a table. */ case class DropTableExec( + session: SparkSession, catalog: TableCatalog, table: Table, ident: Identifier, @@ -34,7 +36,6 @@ case class DropTableExec( override def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { - val session = sqlContext.sparkSession val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) catalog.dropTable(ident, purge) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index c5f7aa6f592c..fe77e7f3b0c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -471,11 +471,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def uncacheTable(tableName: String): Unit = { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper - val multipartIdentifier = - sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - val cascade = (multipartIdentifier.length <= 2) && - !sessionCatalog.isTemporaryTable(multipartIdentifier.asTableIdentifier) + val multipartIdent = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) + val cascade = !sessionCatalog.isTempView(multipartIdent) sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName), cascade) } From 7e788cea5c2dd03f71ee30b0106e04d7f036f30f Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 27 Nov 2020 13:30:04 -0800 Subject: [PATCH 20/20] Address PR comments --- .../spark/sql/execution/command/cache.scala | 29 ++++++++++++------- .../spark/sql/internal/CatalogImpl.scala | 4 +-- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 04bfda5f0d02..3f0945d1e817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.command import java.util.Locale -import scala.util.Try - -import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -52,16 +50,19 @@ case class CacheTableCommand( logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") } + val table = sparkSession.table(tableName) if (storageLevelValue.nonEmpty) { - sparkSession.catalog.cacheTable( - tableName, StorageLevel.fromString(storageLevelValue.get)) + sparkSession.sharedState.cacheManager.cacheQuery( + table, + Some(tableName), + StorageLevel.fromString(storageLevelValue.get)) } else { - sparkSession.catalog.cacheTable(tableName) + sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName)) } if (!isLazy) { // Performs eager caching - sparkSession.table(tableName).count() + table.count() } Seq.empty[Row] @@ -74,14 +75,20 @@ case class UncacheTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val tableName = multipartIdentifier.quoted - if (!ifExists || tableExists(sparkSession, tableName)) { - sparkSession.catalog.uncacheTable(tableName) + table(sparkSession, tableName).foreach { table => + val cascade = !sparkSession.sessionState.catalog.isTempView(multipartIdentifier) + sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade) } Seq.empty[Row] } - private def tableExists(sparkSession: SparkSession, name: String): Boolean = { - Try(sparkSession.table(name)).isSuccess + private def table(sparkSession: SparkSession, name: String): Option[DataFrame] = { + try { + Some(sparkSession.table(name)) + } catch { + case ex: AnalysisException if ifExists && ex.getMessage.contains("Table or view not found") => + None + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index fe77e7f3b0c0..3e216415c281 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -471,8 +471,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def uncacheTable(tableName: String): Unit = { - val multipartIdent = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) - val cascade = !sessionCatalog.isTempView(multipartIdent) + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + val cascade = !sessionCatalog.isTemporaryTable(tableIdent) sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName), cascade) }