Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ statement
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) database EXTENDED? db=errorCapturingIdentifier #describeDatabase
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
multipartIdentifier partitionSpec? describeColName? #describeTable
multipartIdentifier partitionSpec? describeColName? #describeTable
| (DESC | DESCRIBE) QUERY? query #describeQuery
| REFRESH TABLE tableIdentifier #refreshTable
| REFRESH TABLE multipartIdentifier #refreshTable
| REFRESH (STRING | .*?) #refreshResource
| CACHE LAZY? TABLE tableIdentifier
(OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2728,4 +2728,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
}

/**
* Create a [[RefreshTableStatement]].
*
* For example:
* {{{
* REFRESH TABLE multi_part_name
* }}}
*/
override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) {
RefreshTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,8 @@ case class AnalyzeColumnStatement(
* A REPAIR TABLE statement, as parsed from SQL
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

/**
* A REFRESH TABLE statement, as parsed from SQL
*/
case class RefreshTableStatement(tableName: Seq[String]) extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,12 @@ class DDLParserSuite extends AnalysisTest {
RepairTableStatement(Seq("a", "b", "c")))
}

test("REFRESH TABLE table") {
comparePlans(
parsePlan("REFRESH TABLE a.b.c"),
RefreshTableStatement(Seq("a", "b", "c")))
}

private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
Expand Down Expand Up @@ -282,6 +282,10 @@ class ResolveSessionCatalog(
AlterTableRecoverPartitionsCommand(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")

case RefreshTableStatement(tableName) =>
val v1TableName = parseV1Table(tableName, "REFRESH TABLE")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have TableCatalog.invalidateTable to implement a v2 REFRESH TABLE command.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed by TableCatalog.loadTable? OrTableCatalog.loadTable is called lazily (you don't call it inside RefreshTableExec - in this case, same behavior as UNCACHE TABLE)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not followed by loadTable. It should invalidate and load the table lazily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I adde v2 command in this PR.

RefreshTable(v1TableName.asTableIdentifier)
}

private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
ShowCreateTableCommand(table)
}

/**
* Create a [[RefreshTable]] logical plan.
*/
override def visitRefreshTable(ctx: RefreshTableContext): LogicalPlan = withOrigin(ctx) {
RefreshTable(visitTableIdentifier(ctx.tableIdentifier))
}

/**
* Create a [[RefreshResource]] logical plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,29 +1185,32 @@ class DataSourceV2SQLSuite
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")

val e = intercept[AnalysisException] {
sql(s"ANALYZE TABLE $t COMPUTE STATISTICS")
}
assert(e.message.contains("ANALYZE TABLE is only supported with v1 tables"))

val e2 = intercept[AnalysisException] {
sql(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS")
}
assert(e2.message.contains("ANALYZE TABLE is only supported with v1 tables"))
testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS")
testV1Command("ANALYZE TABLE", s"$t COMPUTE STATISTICS FOR ALL COLUMNS")
}
}

test("MSCK REPAIR TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1Command("MSCK REPAIR TABLE", t)
}
}

val e = intercept[AnalysisException] {
sql(s"MSCK REPAIR TABLE $t")
}
assert(e.message.contains("MSCK REPAIR TABLE is only supported with v1 tables"))
test("REFRESH TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
testV1Command("REFRESH TABLE", t)
}
}

private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
}
assert(e.message.contains(s"$sqlCommand is only supported with v1 tables"))
}

private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
Expand Down