Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ statement
| CLEAR CACHE #clearCache
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
tableIdentifier partitionSpec? #loadData
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE multipartIdentifier #repairTable
| op=(ADD | LIST) identifier .*? #manageResource
| SET ROLE .*? #failNativeCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2728,4 +2728,18 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
}

/**
* Create a [[TruncateTableStatement]] command.
*
* For example:
* {{{
* TRUNCATE TABLE multi_part_name [PARTITION (partcol1=val1, partcol2=val2 ...)]
* }}}
*/
override def visitTruncateTable(ctx: TruncateTableContext): LogicalPlan = withOrigin(ctx) {
TruncateTableStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,10 @@ case class AnalyzeColumnStatement(
* A REPAIR TABLE statement, as parsed from SQL
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

/**
* A TRUNCATE TABLE statement, as parsed from SQL
*/
case class TruncateTableStatement(
tableName: Seq[String],
partitionSpec: Option[TablePartitionSpec]) extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,16 @@ class DDLParserSuite extends AnalysisTest {
RepairTableStatement(Seq("a", "b", "c")))
}

test("TRUNCATE table") {
comparePlans(
parsePlan("TRUNCATE TABLE a.b.c"),
TruncateTableStatement(Seq("a", "b", "c"), None))

comparePlans(
parsePlan("TRUNCATE TABLE a.b.c PARTITION(ds='2017-06-10')"),
TruncateTableStatement(Seq("a", "b", "c"), Some(Map("ds" -> "2017-06-10"))))
}

private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand, TruncateTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -282,6 +282,12 @@ class ResolveSessionCatalog(
AlterTableRecoverPartitionsCommand(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")

case TruncateTableStatement(tableName, partitionSpec) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a truncateTable method to TableCatalog? cc @rdblue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is not a catalog operation, it is a table operation. We already have 2 ways to truncate a table: if the table implements SupportsTruncate or SupportsOverwrite. A truncate command should build a write and commit without any commit messages.

We can also add a short-cut trait on Table to avoid the builder.

val v1TableName = parseV1Table(tableName, "TRUNCATE TABLE")
TruncateTableCommand(
v1TableName.asTableIdentifier,
partitionSpec)
}

private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,20 +346,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
)
}

/**
* Create a [[TruncateTableCommand]] command.
*
* For example:
* {{{
* TRUNCATE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
* }}}
*/
override def visitTruncateTable(ctx: TruncateTableContext): LogicalPlan = withOrigin(ctx) {
TruncateTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

/**
* Create a [[CreateDatabaseCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,28 @@ class DataSourceV2SQLSuite
}
}

test("TRUNCATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t (id bigint, data string)
|USING foo
|PARTITIONED BY (id)
""".stripMargin)

val e1 = intercept[AnalysisException] {
sql(s"TRUNCATE TABLE $t")
}
assert(e1.message.contains("TRUNCATE TABLE is only supported with v1 tables"))

val e2 = intercept[AnalysisException] {
sql(s"TRUNCATE TABLE $t PARTITION(id='1')")
}
assert(e2.message.contains("TRUNCATE TABLE is only supported with v1 tables"))
}
}

private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
Expand Down