-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables #30403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3e532c8
f4ee301
a0687b3
f36bc59
b3fe647
4b2fba0
f232eba
8c0140c
9085189
0bdfcee
fc8a913
7ee6eb0
a5923ab
f22159c
c0e4f3e
47dc974
5e7227b
20b2474
b33d807
4c2d5e2
3c4a0cf
7f5a0b2
d0f49ef
ed1a6db
4e0e82f
911927d
7e788ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,26 +19,27 @@ package org.apache.spark.sql.execution.command | |
|
|
||
| import java.util.Locale | ||
|
|
||
| import org.apache.spark.sql.{Dataset, Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
| import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan} | ||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper | ||
| import org.apache.spark.storage.StorageLevel | ||
|
|
||
| case class CacheTableCommand( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the next thing we can do is to refactor it using the v2 framework (not adding a v2 version). The benefits are: 1. moving the logical plan to catalyst. 2. resolve the table in the analyzer. e.g.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, will do.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One issue I am encountering by moving to the v2 framework (for v2 tables) is the following. When
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, one solution is to follow |
||
| tableIdent: TableIdentifier, | ||
| multipartIdentifier: Seq[String], | ||
| plan: Option[LogicalPlan], | ||
| isLazy: Boolean, | ||
| options: Map[String, String]) extends RunnableCommand { | ||
| require(plan.isEmpty || tableIdent.database.isEmpty, | ||
| "Database name is not allowed in CACHE TABLE AS SELECT") | ||
| require(plan.isEmpty || multipartIdentifier.length == 1, | ||
| "Namespace name is not allowed in CACHE TABLE AS SELECT") | ||
|
|
||
| override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq | ||
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val tableName = multipartIdentifier.quoted | ||
| plan.foreach { logicalPlan => | ||
| Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString) | ||
| Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableName) | ||
| } | ||
|
|
||
| val storageLevelKey = "storagelevel" | ||
|
|
@@ -49,34 +50,46 @@ case class CacheTableCommand( | |
| logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") | ||
| } | ||
|
|
||
| val table = sparkSession.table(tableName) | ||
| if (storageLevelValue.nonEmpty) { | ||
| sparkSession.catalog.cacheTable( | ||
| tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get)) | ||
| sparkSession.sharedState.cacheManager.cacheQuery( | ||
| table, | ||
| Some(tableName), | ||
| StorageLevel.fromString(storageLevelValue.get)) | ||
| } else { | ||
| sparkSession.catalog.cacheTable(tableIdent.quotedString) | ||
| sparkSession.sharedState.cacheManager.cacheQuery(table, Some(tableName)) | ||
| } | ||
|
|
||
| if (!isLazy) { | ||
| // Performs eager caching | ||
| sparkSession.table(tableIdent).count() | ||
| table.count() | ||
| } | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
| } | ||
|
|
||
|
|
||
| case class UncacheTableCommand( | ||
| tableIdent: TableIdentifier, | ||
| multipartIdentifier: Seq[String], | ||
| ifExists: Boolean) extends RunnableCommand { | ||
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val tableId = tableIdent.quotedString | ||
| if (!ifExists || sparkSession.catalog.tableExists(tableId)) { | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| sparkSession.catalog.uncacheTable(tableId) | ||
| val tableName = multipartIdentifier.quoted | ||
| table(sparkSession, tableName).foreach { table => | ||
| val cascade = !sparkSession.sessionState.catalog.isTempView(multipartIdentifier) | ||
| sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade) | ||
| } | ||
| Seq.empty[Row] | ||
| } | ||
|
|
||
| private def table(sparkSession: SparkSession, name: String): Option[DataFrame] = { | ||
| try { | ||
| Some(sparkSession.table(name)) | ||
| } catch { | ||
| case ex: AnalysisException if ifExists && ex.getMessage.contains("Table or view not found") => | ||
| None | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException | |
| import org.apache.spark.sql.connector.catalog._ | ||
| import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership | ||
| import org.apache.spark.sql.execution.columnar.InMemoryRelation | ||
| import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} | ||
| import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG_IMPLEMENTATION} | ||
| import org.apache.spark.sql.internal.connector.SimpleTableProvider | ||
|
|
@@ -2018,28 +2019,29 @@ class DataSourceV2SQLSuite | |
| } | ||
| } | ||
|
|
||
| test("CACHE TABLE") { | ||
| test("CACHE/UNCACHE TABLE") { | ||
imback82 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| val t = "testcat.ns1.ns2.tbl" | ||
| withTable(t) { | ||
| spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") | ||
| def isCached(table: String): Boolean = { | ||
| spark.table(table).queryExecution.withCachedData.isInstanceOf[InMemoryRelation] | ||
| } | ||
|
|
||
| testV1CommandSupportingTempView("CACHE TABLE", t) | ||
| spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") | ||
| sql(s"CACHE TABLE $t") | ||
| assert(isCached(t)) | ||
|
|
||
| val e = intercept[AnalysisException] { | ||
| sql(s"CACHE LAZY TABLE $t") | ||
| } | ||
| assert(e.message.contains("CACHE TABLE is only supported with temp views or v1 tables")) | ||
| sql(s"UNCACHE TABLE $t") | ||
| assert(!isCached(t)) | ||
| } | ||
| } | ||
|
|
||
| test("UNCACHE TABLE") { | ||
| val t = "testcat.ns1.ns2.tbl" | ||
| withTable(t) { | ||
| sql(s"CREATE TABLE $t (id bigint, data string) USING foo") | ||
|
|
||
| testV1CommandSupportingTempView("UNCACHE TABLE", t) | ||
| testV1CommandSupportingTempView("UNCACHE TABLE", s"IF EXISTS $t") | ||
| // Test a scenario where a table does not exist. | ||
| val e = intercept[AnalysisException] { | ||
| sql(s"UNCACHE TABLE $t") | ||
| } | ||
| assert(e.message.contains("Table or view not found: testcat.ns1.ns2.tbl")) | ||
|
|
||
| // If "IF EXISTS" is set, UNCACHE TABLE will not throw an exception. | ||
| sql(s"UNCACHE TABLE IF EXISTS $t") | ||
| } | ||
|
|
||
| test("SHOW COLUMNS") { | ||
|
|
@@ -2555,11 +2557,15 @@ class DataSourceV2SQLSuite | |
| } | ||
| } | ||
|
|
||
| private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { | ||
| private def testNotSupportedV2Command( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessary change. This is minor and let's fix it in your next PR.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, will fix. |
||
| sqlCommand: String, | ||
| sqlParams: String, | ||
| sqlCommandInMessage: Option[String] = None): Unit = { | ||
| val e = intercept[AnalysisException] { | ||
| sql(s"$sqlCommand $sqlParams") | ||
| } | ||
| assert(e.message.contains(s"$sqlCommand is not supported for v2 tables")) | ||
| val cmdStr = sqlCommandInMessage.getOrElse(sqlCommand) | ||
| assert(e.message.contains(s"$cmdStr is not supported for v2 tables")) | ||
| } | ||
|
|
||
| private def testV1Command(sqlCommand: String, sqlParams: String): Unit = { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.