Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -213,15 +212,7 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
*/
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
try {
// Because the partition of BaseTableMetadata has been deleted,
// all partition information can only be obtained from FileSystemBackedTableMetadata.
FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(context,
context.getHadoopConf(), config.getBasePath(), config.shouldAssumeDatePartitioning());
return fsBackedTableMetadata.getAllPartitionPaths();
} catch (IOException e) {
return Collections.emptyList();
}
return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
Expand Down Expand Up @@ -319,10 +317,7 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
// Add record bearing added partitions list
List<String> partitionsAdded = getPartitionsAdded(commitMetadata);

// Add record bearing deleted partitions list
List<String> partitionsDeleted = getPartitionsDeleted(commitMetadata);

records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded, partitionsDeleted));
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));

// Update files listing records for each individual partition
List<HoodieRecord<HoodieMetadataPayload>> updatedPartitionFilesRecords =
Expand Down Expand Up @@ -380,21 +375,6 @@ private static List<String> getPartitionsAdded(HoodieCommitMetadata commitMetada
.collect(Collectors.toList());
}

private static List<String> getPartitionsDeleted(HoodieCommitMetadata commitMetadata) {
if (commitMetadata instanceof HoodieReplaceCommitMetadata
&& WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds =
((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds();

return partitionToReplaceFileIds.keySet().stream()
// We need to make sure we properly handle case of non-partitioned tables
.map(HoodieTableMetadataUtil::getPartitionIdentifier)
.collect(Collectors.toList());
}

return Collections.emptyList();
}

/**
* Convert commit action metadata to bloom filter records.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.sql.hudi

import org.apache.hadoop.fs.Path
import org.apache.hudi.avro.model.HoodieCleanMetadata
import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
Expand Down Expand Up @@ -231,6 +233,17 @@ object HoodieSparkSqlTestBase {
metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getRight
}

def getLastCleanMetadata(spark: SparkSession, tablePath: String) = {
val metaClient = HoodieTableMetaClient.builder()
.setConf(spark.sparkContext.hadoopConfiguration)
.setBasePath(tablePath)
.build()

val cleanInstant = metaClient.reloadActiveTimeline().getCleanerTimeline.filterCompletedInstants().lastInstant().get()
TimelineMetadataUtils.deserializeHoodieCleanMetadata(metaClient
.getActiveTimeline.getInstantDetails(cleanInstant).get)
}

private def checkMessageContains(e: Throwable, text: String): Boolean =
e.getMessage.trim.contains(text.trim)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.model.{HoodieCleanMetadata, HoodieCleanPartitionMetadata}
import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.{getLastCleanMetadata, getLastCommitMetadata}
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertTrue

Expand Down Expand Up @@ -114,16 +118,42 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
|location '$tablePath'
|""".stripMargin)

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(URL_ENCODE_PARTITIONING.key(), urlencode)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Append)
.save(tablePath)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")

// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()

val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath)
val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
var totalDeletedFiles = 0
cleanPartitionMeta.foreach(entry =>
{
totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size()
})
assertTrue(totalDeletedFiles > 0)

val partitionPath = if (urlencode) {
PartitionPathEncodeUtils.escapePathName("2021/10/01")
} else {
"2021/10/01"
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))

// show partitions
if (urlencode) {
Expand Down Expand Up @@ -221,11 +251,20 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
"Found duplicate keys 'dt'")
}

// insert data
spark.sql(s"""insert into $tableName values (3, "z5", "v1", "2021-10-01"), (4, "l5", "v1", "2021-10-02")""")

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")

checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02"))
// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()

checkAnswer(s"select id, name, ts, dt from $tableName")(
Seq(2, "l4", "v1", "2021-10-02"),
Seq(4, "l5", "v1", "2021-10-02")
)

// show partitions
checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02"))
Expand Down Expand Up @@ -264,9 +303,36 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")(
"All partition columns need to be specified for Hoodie's partition"
)

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "year,month,day")
.option(HIVE_STYLE_PARTITIONING.key, hiveStyle)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Append)
.save(tablePath)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")

// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()

val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath)
val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
var totalDeletedFiles = 0
cleanPartitionMeta.foreach(entry =>
{
totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size()
})
assertTrue(totalDeletedFiles > 0)

checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02")
)
Expand Down Expand Up @@ -314,9 +380,37 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
| )
|""".stripMargin)

df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "year,month,day")
.option(HIVE_STYLE_PARTITIONING.key, hiveStyle)
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.mode(SaveMode.Append)
.save(tablePath)

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')")

spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""")

// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()

val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath)
val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
var totalDeletedFiles = 0
cleanPartitionMeta.foreach(entry =>
{
totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size()
})
assertTrue(totalDeletedFiles > 0)

// insert data
spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""")

Expand Down