Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
if isHoodieTable(tableName, sparkSession) =>
DropHoodieTableCommand(tableName, ifExists, isView, purge)
// Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand
case AlterTableDropPartitionCommand(tableName, specs, _, _, _)
case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableDropPartitionCommand(tableName, specs)
AlterHoodieTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
// Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand
// Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand
case AlterTableAddColumnsCommand(tableId, colsToAdd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.hudi.command

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
Expand All @@ -33,7 +35,10 @@ import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

case class AlterHoodieTableDropPartitionCommand(
tableIdentifier: TableIdentifier,
specs: Seq[TablePartitionSpec])
specs: Seq[TablePartitionSpec],
ifExists : Boolean,
purge : Boolean,
retainData : Boolean)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
Expand All @@ -49,41 +54,34 @@ extends RunnableCommand {
sparkSession.sessionState.conf.resolver)
}

val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, normalizedSpecs)
val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs)
val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, partitionsToDrop)
HoodieSparkSqlWriter.write(
sparkSession.sqlContext,
SaveMode.Append,
parameters,
sparkSession.emptyDataFrame)


// Recursively delete partition directories
if (purge) {
val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext)
val basePath = hoodieCatalogTable.tableLocation
val fullPartitionPath = FSUtils.getPartitionPath(basePath, partitionsToDrop)
logInfo("Clean partition up " + fullPartitionPath)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
FSUtils.deleteDir(engineContext, fs, fullPartitionPath, sparkSession.sparkContext.defaultParallelism)
}

sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
Seq.empty[Row]
}

private def buildHoodieConfig(
sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
normalizedSpecs: Seq[Map[String, String]]): Map[String, String] = {
val table = hoodieCatalogTable.table
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
partitionsToDrop: String): Map[String, String] = {
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val partitionsToDrop = normalizedSpecs.map { spec =>
hoodieCatalogTable.partitionFields.map{ partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
} else {
spec(partitionColumn)
}
if (enableHiveStylePartitioning) {
partitionColumn + "=" + encodedPartitionValue
} else {
encodedPartitionValue
}
}.mkString("/")
}.mkString(",")

val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, Map.empty) {
Map(
Expand Down Expand Up @@ -137,4 +135,27 @@ extends RunnableCommand {
normalizedPartSpec.toMap
}

def getPartitionPathToDrop(
hoodieCatalogTable: HoodieCatalogTable,
normalizedSpecs: Seq[Map[String, String]]): String = {
val table = hoodieCatalogTable.table
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
val partitionsToDrop = normalizedSpecs.map { spec =>
hoodieCatalogTable.partitionFields.map { partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
} else {
spec(partitionColumn)
}
if (enableHiveStylePartitioning) {
partitionColumn + "=" + encodedPartitionValue
} else {
encodedPartitionValue
}
}.mkString("/")
}.mkString(",")
partitionsToDrop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command

import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -77,10 +78,9 @@ case class DropHoodieTableCommand(
if (purge) {
logInfo("Clean up " + basePath)
val targetPath = new Path(basePath)
val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
if (fs.exists(targetPath)) {
fs.delete(targetPath, true)
}
FSUtils.deleteDir(engineContext, fs, targetPath, sparkSession.sparkContext.defaultParallelism)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.spark.sql.SaveMode

import scala.util.control.NonFatal

class TestAlterTableDropPartition extends TestHoodieSqlBase {

test("Drop non-partitioned table") {
Expand Down Expand Up @@ -88,7 +87,62 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")

checkAnswer(s"select dt from $tableName") (Seq(s"2021/10/02"))
val partitionPath = if (urlencode) {
PartitionPathEncodeUtils.escapePathName("2021/10/01")
} else {
"2021/10/01"
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))
}
}
}

Seq(false, true).foreach { urlencode =>
test(s"Purge drop single-partition table' partitions, urlencode: $urlencode") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"

import spark.implicits._
val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02"))
.toDF("id", "name", "ts", "dt")

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(URL_ENCODE_PARTITIONING.key(), urlencode)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Overwrite)
.save(tablePath)

// register meta to spark catalog by creating table
spark.sql(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (dt)
|location '$tablePath'
|""".stripMargin)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01') purge")

val partitionPath = if (urlencode) {
PartitionPathEncodeUtils.escapePathName("2021/10/01")
} else {
"2021/10/01"
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))
}
}
}
Expand Down Expand Up @@ -172,4 +226,51 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
}
}
}

Seq(false, true).foreach { hiveStyle =>
test(s"Purge drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") {
Copy link
Contributor

@leesf leesf Dec 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you please also add a test about dropping partition purge for non-partitioned table?

withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"

import spark.implicits._
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10","02"))
.toDF("id", "name", "ts", "year", "month", "day")

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "year,month,day")
.option(HIVE_STYLE_PARTITIONING.key, hiveStyle)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Overwrite)
.save(tablePath)

// register meta to spark catalog by creating table
spark.sql(
s"""
|create table $tableName using hudi
|tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
|)
|partitioned by (year, month, day)
|location '$tablePath'
|""".stripMargin)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01') purge")

checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02")
)
assertResult(false)(existsPath(
s"${tmp.getCanonicalPath}/$tableName/year='2021'/month='10'/day='01'"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here is not ${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01

}
}
}
}