diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index fcdbacea51e43..fa01ba37e9d7a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -29,13 +29,12 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedRelation} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} import java.net.URI diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index c7afbfe11f998..628f383b6903c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkSqlWriter -import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.fs.FSUtils import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable @@ -29,11 +27,11 @@ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} case class AlterHoodieTableDropPartitionCommand( - tableIdentifier: TableIdentifier, - specs: Seq[TablePartitionSpec], - ifExists : Boolean, - purge : Boolean, - retainData : Boolean) + tableIdentifier: TableIdentifier, + partitionSpecs: Seq[TablePartitionSpec], + ifExists : Boolean, + purge : Boolean, + retainData : Boolean) extends HoodieLeafRunnableCommand with ProvidesHoodieConfig { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -49,7 +47,7 @@ case class AlterHoodieTableDropPartitionCommand( DDLUtils.verifyAlterTableType( sparkSession.sessionState.catalog, hoodieCatalogTable.table, isView = false) - val normalizedSpecs: Seq[Map[String, String]] = specs.map { spec => + val normalizedSpecs: Seq[Map[String, String]] = partitionSpecs.map { spec => normalizePartitionSpec( spec, hoodieCatalogTable.partitionFields, @@ -57,6 +55,8 @@ case class AlterHoodieTableDropPartitionCommand( sparkSession.sessionState.conf.resolver) } + // drop partitions to lazy clean (https://github.com/apache/hudi/pull/4489) + // delete partition files by enabling cleaner and setting retention policies. val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) HoodieSparkSqlWriter.write( @@ -65,17 +65,6 @@ case class AlterHoodieTableDropPartitionCommand( parameters, sparkSession.emptyDataFrame) - - // Recursively delete partition directories - if (purge) { - val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext) - val basePath = hoodieCatalogTable.tableLocation - val fullPartitionPath = FSUtils.getPartitionPath(basePath, partitionsToDrop) - logInfo("Clean partition up " + fullPartitionPath) - val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration) - FSUtils.deleteDir(engineContext, fs, fullPartitionPath, sparkSession.sparkContext.defaultParallelism) - } - sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) logInfo(s"Finish execute alter table drop partition command for $fullTableName") Seq.empty[Row] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index 04936978ed1de..f5349ee5feed4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -18,15 +18,16 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieSparkSqlWriter +import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getPartitionPathToDrop, normalizePartitionSpec} -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} - -import scala.util.control.NonFatal +import org.apache.spark.sql.hudi.ProvidesHoodieConfig +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} /** * Command for truncate hudi table. @@ -34,99 +35,67 @@ import scala.util.control.NonFatal case class TruncateHoodieTableCommand( tableIdentifier: TableIdentifier, partitionSpec: Option[TablePartitionSpec]) - extends HoodieLeafRunnableCommand { + extends HoodieLeafRunnableCommand with ProvidesHoodieConfig { - override def run(spark: SparkSession): Seq[Row] = { + override def run(sparkSession: SparkSession): Seq[Row] = { val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" logInfo(s"start execute truncate table command for $fullTableName") - val hoodieCatalogTable = HoodieCatalogTable(spark, tableIdentifier) - val properties = hoodieCatalogTable.tableConfig.getProps - - try { - // Delete all data in the table directory - val catalog = spark.sessionState.catalog - val table = catalog.getTableMetadata(tableIdentifier) - val tableIdentWithDB = table.identifier.quotedString - - if (table.tableType == CatalogTableType.VIEW) { - throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB") - } - - if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { - throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + - s"for tables that are not partitioned: $tableIdentWithDB") - } - - val basePath = hoodieCatalogTable.tableLocation - val partCols = table.partitionColumnNames - val locations = if (partitionSpec.isEmpty || partCols.isEmpty) { - Seq(basePath) - } else { - val normalizedSpec: Seq[Map[String, String]] = Seq(partitionSpec.map { spec => - normalizePartitionSpec( - spec, - partCols, - table.identifier.quotedString, - spark.sessionState.conf.resolver) - }.get) + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) - val fullPartitionPath = FSUtils.getPartitionPath(basePath, getPartitionPathToDrop(hoodieCatalogTable, normalizedSpec)) + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableIdentifier) + val tableId = table.identifier.quotedString - Seq(fullPartitionPath) - } - - val hadoopConf = spark.sessionState.newHadoopConf() - locations.foreach { location => - val path = new Path(location.toString) - try { - val fs = path.getFileSystem(hadoopConf) - fs.delete(path, true) - fs.mkdirs(path) - } catch { - case NonFatal(e) => - throw new AnalysisException( - s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " + - s"because of ${e.toString}") - } - } - - // Also try to drop the contents of the table from the columnar cache - try { - spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), cascade = true) - } catch { - case NonFatal(_) => - } + if (table.tableType == CatalogTableType.VIEW) { + throw new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE on views: $tableId") + } - if (table.stats.nonEmpty) { - // empty table after truncation - val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0)) - catalog.alterTableStats(tableIdentifier, Some(newStats)) - } - Seq.empty[Row] - } catch { - // TruncateTableCommand will delete the related directories first, and then refresh the table. - // It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step. - // So here ignore this failure, and refresh table later. - case NonFatal(e) => - throw new AnalysisException(s"Exception when attempting to truncate table ${tableIdentifier.quotedString}: " + e) + if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { + throw new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + + s"for tables that are not partitioned: $tableId") } + val basePath = hoodieCatalogTable.tableLocation + val properties = hoodieCatalogTable.tableConfig.getProps + val hadoopConf = sparkSession.sessionState.newHadoopConf() + // If we have not specified the partition, truncate will delete all the data in the table path - // include the hoodie.properties. In this case we should reInit the table. if (partitionSpec.isEmpty) { - val hadoopConf = spark.sessionState.newHadoopConf() + val targetPath = new Path(basePath) + val engineContext = new HoodieSparkEngineContext(sparkSession.sparkContext) + val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration) + FSUtils.deleteDir(engineContext, fs, targetPath, sparkSession.sparkContext.defaultParallelism) + // ReInit hoodie.properties HoodieTableMetaClient.withPropertyBuilder() .fromProperties(properties) .initTable(hadoopConf, hoodieCatalogTable.tableLocation) + } else { + val normalizedSpecs: Seq[Map[String, String]] = Seq(partitionSpec.map { spec => + normalizePartitionSpec( + spec, + hoodieCatalogTable.partitionFields, + hoodieCatalogTable.tableName, + sparkSession.sessionState.conf.resolver) + }.get) + + // drop partitions to lazy clean + val partitionsToDrop = getPartitionPathToDrop(hoodieCatalogTable, normalizedSpecs) + val parameters = buildHoodieDropPartitionsConfig(sparkSession, hoodieCatalogTable, partitionsToDrop) + HoodieSparkSqlWriter.write( + sparkSession.sqlContext, + SaveMode.Append, + parameters, + sparkSession.emptyDataFrame) } // After deleting the data, refresh the table to make sure we don't keep around a stale // file relation in the metastore cache and cached table data in the cache manager. - spark.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString) + sparkSession.catalog.refreshTable(table.identifier.quotedString) + logInfo(s"Finish execute truncate table command for $fullTableName") Seq.empty[Row] } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index b140b6767e1e4..d3d191734eacb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -539,7 +539,7 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic // Rewrite TruncateTableCommand to TruncateHoodieTableCommand case TruncateTableCommand(tableName, partitionSpec) if sparkAdapter.isHoodieTable(tableName, sparkSession) => - new TruncateHoodieTableCommand(tableName, partitionSpec) + TruncateHoodieTableCommand(tableName, partitionSpec) case _ => plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index fdff6928a215f..ecbbadeeb9a28 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -52,7 +52,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { checkAnswer(s"show partitions $tableName")(Seq.empty: _*) } - test("Purge drop non-partitioned table") { + test("Lazy Clean drop non-partitioned table") { val tableName = generateTableName // create table spark.sql( @@ -66,13 +66,14 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { | using hudi | tblproperties ( | primaryKey = 'id', - | preCombineField = 'ts' + | preCombineField = 'ts', + | hoodie.cleaner.commits.retained= '1' | ) |""".stripMargin) // insert data spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") - checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01') purge")( + checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")( s"$tableName is a non-partitioned table that is not allowed to drop partition") // show partitions @@ -131,14 +132,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { } Seq(false, true).foreach { urlencode => - test(s"Purge drop single-partition table' partitions, urlencode: $urlencode") { + test(s"Lazy Clean drop single-partition table' partitions, urlencode: $urlencode") { withTempDir { tmp => val tableName = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$tableName" import spark.implicits._ - val df = Seq((1, "z3", "v1", "2021/10/01"), (2, "l4", "v1", "2021/10/02")) - .toDF("id", "name", "ts", "dt") + val df = Seq((1, "z3", "v1", "2021/10/01")).toDF("id", "name", "ts", "dt") df.write.format("hudi") .option(HoodieWriteConfig.TBL_NAME.key, tableName) @@ -158,17 +158,24 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { s""" |create table $tableName using hudi |location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.cleaner.commits.retained= '1' + | ) |""".stripMargin) // drop 2021-10-01 partition - spark.sql(s"alter table $tableName drop partition (dt='2021/10/01') purge") + spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')") + + spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021/10/02")""") val partitionPath = if (urlencode) { PartitionPathEncodeUtils.escapePathName("2021/10/01") } else { "2021/10/01" } - checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) + checkAnswer(s"select dt from $tableName")(Seq("2021/10/02")) assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) // show partitions @@ -267,14 +274,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { } Seq(false, true).foreach { hiveStyle => - test(s"Purge drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") { + test(s"Lazy Clean drop multi-level partitioned table's partitions, isHiveStylePartitioning: $hiveStyle") { withTempDir { tmp => val tableName = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$tableName" import spark.implicits._ - val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", "2021", "10", "02")) - .toDF("id", "name", "ts", "year", "month", "day") + val df = Seq((1, "z3", "v1", "2021", "10", "01")).toDF("id", "name", "ts", "year", "month", "day") df.write.format("hudi") .option(HoodieWriteConfig.TBL_NAME.key, tableName) @@ -294,14 +300,23 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { s""" |create table $tableName using hudi |location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.cleaner.commits.retained= '1' + | ) |""".stripMargin) // drop 2021-10-01 partition - spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01') purge") + spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") + + // insert data + spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""") checkAnswer(s"select id, name, ts, year, month, day from $tableName")( Seq(2, "l4", "v1", "2021", "10", "02") ) + assertResult(false)(existsPath( s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01"))