Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}

import java.net.URI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.sql.hudi.command

import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
Expand All @@ -29,11 +27,11 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

case class AlterHoodieTableDropPartitionCommand(
tableIdentifier: TableIdentifier,
specs: Seq[TablePartitionSpec],
ifExists : Boolean,
purge : Boolean,
retainData : Boolean)
tableIdentifier: TableIdentifier,
partitionSpecs: Seq[TablePartitionSpec],
ifExists : Boolean,
purge : Boolean,
retainData : Boolean)
extends HoodieLeafRunnableCommand with ProvidesHoodieConfig {

override def run(sparkSession: SparkSession): Seq[Row] = {
Expand All @@ -49,14 +47,16 @@ case class AlterHoodieTableDropPartitionCommand(
DDLUtils.verifyAlterTableType(
sparkSession.sessionState.catalog, hoodieCatalogTable.table, isView = false)

val normalizedSpecs: Seq[Map[String, String]] = specs.map { spec =>
val normalizedSpecs: Seq[Map[String, String]] = partitionSpecs.map { spec =>
normalizePartitionSpec(
spec,
hoodieCatalogTable.partitionFields,
hoodieCatalogTable.tableName,
sparkSession.sessionState.conf.resolver)
}

// drop partitions to lazy clean (https://github.com/apache/hudi/pull/4489)
// delete partition files by enabling cleaner and setting retention policies.
val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs)
val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop)
HoodieSparkSqlWriter.write(
Expand All @@ -65,17 +65,6 @@ case class AlterHoodieTableDropPartitionCommand(
parameters,
sparkSession.emptyDataFrame)


// Recursively delete partition directories
if (purge) {
val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext)
val basePath = hoodieCatalogTable.tableLocation
val fullPartitionPath = FSUtils.getPartitionPath(basePath, partitionsToDrop)
logInfo("Clean partition up " + fullPartitionPath)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
FSUtils.deleteDir(engineContext, fs, fullPartitionPath, sparkSession.sparkContext.defaultParallelism)
}

sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
logInfo(s"Finish execute alter table drop partition command for $fullTableName")
Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,115 +18,84 @@
package org.apache.spark.sql.hudi.command

import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieSparkSqlWriter
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop, normalizePartitionSpec}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}

import scala.util.control.NonFatal
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

/**
* Command for truncate hudi table.
*/
case class TruncateHoodieTableCommand(
tableIdentifier: TableIdentifier,
partitionSpec: Option[TablePartitionSpec])
extends HoodieLeafRunnableCommand {
extends HoodieLeafRunnableCommand with ProvidesHoodieConfig {

override def run(spark: SparkSession): Seq[Row] = {
override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
logInfo(s"start execute truncate table command for $fullTableName")

val hoodieCatalogTable = HoodieCatalogTable(spark, tableIdentifier)
val properties = hoodieCatalogTable.tableConfig.getProps

try {
// Delete all data in the table directory
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableIdentifier)
val tableIdentWithDB = table.identifier.quotedString

if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
}

if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables that are not partitioned: $tableIdentWithDB")
}

val basePath = hoodieCatalogTable.tableLocation
val partCols = table.partitionColumnNames
val locations = if (partitionSpec.isEmpty || partCols.isEmpty) {
Seq(basePath)
} else {
val normalizedSpec: Seq[Map[String, String]] = Seq(partitionSpec.map { spec =>
normalizePartitionSpec(
spec,
partCols,
table.identifier.quotedString,
spark.sessionState.conf.resolver)
}.get)
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)

val fullPartitionPath = FSUtils.getPartitionPath(basePath, getPartitionPathToDrop(hoodieCatalogTable, normalizedSpec))
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableIdentifier)
val tableId = table.identifier.quotedString

Seq(fullPartitionPath)
}

val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
val path = new Path(location.toString)
try {
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
fs.mkdirs(path)
} catch {
case NonFatal(e) =>
throw new AnalysisException(
s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " +
s"because of ${e.toString}")
}
}

// Also try to drop the contents of the table from the columnar cache
try {
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), cascade = true)
} catch {
case NonFatal(_) =>
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE on views: $tableId")
}

if (table.stats.nonEmpty) {
// empty table after truncation
val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0))
catalog.alterTableStats(tableIdentifier, Some(newStats))
}
Seq.empty[Row]
} catch {
// TruncateTableCommand will delete the related directories first, and then refresh the table.
// It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step.
// So here ignore this failure, and refresh table later.
case NonFatal(e) =>
throw new AnalysisException(s"Exception when attempting to truncate table ${tableIdentifier.quotedString}: " + e)
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
s"for tables that are not partitioned: $tableId")
}

val basePath = hoodieCatalogTable.tableLocation
val properties = hoodieCatalogTable.tableConfig.getProps
val hadoopConf = sparkSession.sessionState.newHadoopConf()

// If we have not specified the partition, truncate will delete all the data in the table path
// include the hoodie.properties. In this case we should reInit the table.
if (partitionSpec.isEmpty) {
val hadoopConf = spark.sessionState.newHadoopConf()
val targetPath = new Path(basePath)
val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
FSUtils.deleteDir(engineContext, fs, targetPath, sparkSession.sparkContext.defaultParallelism)

// ReInit hoodie.properties
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.initTable(hadoopConf, hoodieCatalogTable.tableLocation)
} else {
val normalizedSpecs: Seq[Map[String, String]] = Seq(partitionSpec.map { spec =>
normalizePartitionSpec(
spec,
hoodieCatalogTable.partitionFields,
hoodieCatalogTable.tableName,
sparkSession.sessionState.conf.resolver)
}.get)

// drop partitions to lazy clean
val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs)
val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop)
HoodieSparkSqlWriter.write(
sparkSession.sqlContext,
SaveMode.Append,
parameters,
sparkSession.emptyDataFrame)
}

// After deleting the data, refresh the table to make sure we don't keep around a stale
// file relation in the metastore cache and cached table data in the cache manager.
spark.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
sparkSession.catalog.refreshTable(table.identifier.quotedString)
logInfo(s"Finish execute truncate table command for $fullTableName")
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand
case TruncateTableCommand(tableName, partitionSpec)
if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
new TruncateHoodieTableCommand(tableName, partitionSpec)
TruncateHoodieTableCommand(tableName, partitionSpec)
case _ => plan
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
}

test("Purge drop non-partitioned table") {
test("Lazy Clean drop non-partitioned table") {
val tableName = generateTableName
// create table
spark.sql(
Expand All @@ -66,13 +66,14 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
| using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| preCombineField = 'ts',
| hoodie.cleaner.commits.retained= '1'
| )
|""".stripMargin)
// insert data
spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""")

checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01') purge")(
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")(
s"$tableName is a non-partitioned table that is not allowed to drop partition")

// show partitions
Expand Down Expand Up @@ -131,14 +132,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
}

Seq(false, true).foreach { urlencode =>
test(s"Purge drop single-partition table' partitions, urlencode: $urlencode") {
test(s"Lazy Clean drop single-partition table' partitions, urlencode: $urlencode") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"

import spark.implicits._
val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02"))
.toDF("id", "name", "ts", "dt")
val df = Seq((1, "z3", "v1", "2021/10/01")).toDF("id", "name", "ts", "dt")

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
Expand All @@ -158,17 +158,24 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
s"""
|create table $tableName using hudi
|location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.cleaner.commits.retained= '1'
| )
|""".stripMargin)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01') purge")
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")

spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021/10/02")""")

val partitionPath = if (urlencode) {
PartitionPathEncodeUtils.escapePathName("2021/10/01")
} else {
"2021/10/01"
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
checkAnswer(s"select dt from $tableName")(Seq("2021/10/02"))
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))

// show partitions
Expand Down Expand Up @@ -267,14 +274,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
}

Seq(false, true).foreach { hiveStyle =>
test(s"Purge drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") {
test(s"Lazy Clean drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"

import spark.implicits._
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10", "02"))
.toDF("id", "name", "ts", "year", "month", "day")
val df = Seq((1, "z3", "v1", "2021", "10", "01")).toDF("id", "name", "ts", "year", "month", "day")

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
Expand All @@ -294,14 +300,23 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
s"""
|create table $tableName using hudi
|location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.cleaner.commits.retained= '1'
| )
|""".stripMargin)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01') purge")
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")

// insert data
spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""")

checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02")
)

assertResult(false)(existsPath(
s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01"))

Expand Down