getFilesToCleanKeepingLatestCommits(String partition
*
* This policy is the default.
*/
- private List getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
+ private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
List deletePaths = new ArrayList<>();
@@ -285,6 +289,7 @@ private List getFilesToCleanKeepingLatestCommits(String partition
.collect(Collectors.toList());
// determine if we have enough commits, to start cleaning.
+ boolean toDeletePartition = false;
if (commitTimeline.countInstants() > commitsRetained) {
Option earliestCommitToRetainOption = getEarliestCommitToRetain();
HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
@@ -350,8 +355,12 @@ private List getFilesToCleanKeepingLatestCommits(String partition
}
}
}
+ // if there are no valid file groups for the partition, mark it to be deleted
+ if (fileGroups.isEmpty()) {
+ toDeletePartition = true;
+ }
}
- return deletePaths;
+ return Pair.of(toDeletePartition, deletePaths);
}
/**
@@ -362,10 +371,10 @@ private List getFilesToCleanKeepingLatestCommits(String partition
* @param partitionPath partition path to check
* @return list of files to clean
*/
- private List getFilesToCleanKeepingLatestHours(String partitionPath) {
+ private Pair> getFilesToCleanKeepingLatestHours(String partitionPath) {
return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
}
-
+
private List getReplacedFilesEligibleToClean(List savepointedFiles, String partitionPath, Option earliestCommitToRetain) {
final Stream replacedGroups;
if (earliestCommitToRetain.isPresent()) {
@@ -416,9 +425,9 @@ private List getCleanFileInfoForSlice(FileSlice nextSlice) {
/**
* Returns files to be cleaned for the given partitionPath based on cleaning policy.
*/
- public List getDeletePaths(String partitionPath) {
+ public Pair> getDeletePaths(String partitionPath) {
HoodieCleaningPolicy policy = config.getCleanerPolicy();
- List deletePaths;
+ Pair> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
@@ -428,8 +437,10 @@ public List getDeletePaths(String partitionPath) {
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
}
- LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath);
-
+ LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath);
+ if (deletePaths.getKey()) {
+ LOG.info("Partition " + partitionPath + " to be deleted");
+ }
return deletePaths;
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
index 415c12a6407c6..9861506909980 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
@@ -259,7 +259,7 @@ private void createReplace(String instantTime, WriteOperationType writeOperation
private void createCleanMetadata(String instantTime) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
- CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
+ CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
HoodieCleanStat cleanStats = new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
index b31eb7b96d948..149aef03e238a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
@@ -18,24 +18,32 @@
package org.apache.hudi.table.action.commit;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
+import org.apache.hudi.exception.HoodieDeletePartitionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
public class SparkDeletePartitionCommitActionExecutor>
extends SparkInsertOverwriteCommitActionExecutor {
@@ -50,16 +58,35 @@ public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
@Override
public HoodieWriteMetadata> execute() {
- HoodieTimer timer = new HoodieTimer().startTimer();
- context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
- Map> partitionToReplaceFileIds = HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitions).distinct()
- .mapToPair(partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
- HoodieWriteMetadata> result = new HoodieWriteMetadata<>();
- result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
- result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
- result.setWriteStatuses(context.emptyHoodieData());
- this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
- this.commitOnAutoCommit(result);
- return result;
+ try {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
+ Map> partitionToReplaceFileIds =
+ HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitions).distinct()
+ .mapToPair(partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
+ HoodieWriteMetadata> result = new HoodieWriteMetadata<>();
+ result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+ result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+ result.setWriteStatuses(context.emptyHoodieData());
+
+ // created requested
+ HoodieInstant dropPartitionsInstant = new HoodieInstant(REQUESTED, REPLACE_COMMIT_ACTION, instantTime);
+ if (!table.getMetaClient().getFs().exists(new Path(table.getMetaClient().getMetaPath(),
+ dropPartitionsInstant.getFileName()))) {
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
+ .setOperationType(WriteOperationType.DELETE_PARTITION.name())
+ .setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
+ .build();
+ table.getMetaClient().getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant,
+ TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+ }
+
+ this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())),
+ instantTime);
+ this.commitOnAutoCommit(result);
+ return result;
+ } catch (Exception e) {
+ throw new HoodieDeletePartitionException("Failed to drop partitions for commit time " + instantTime, e);
+ }
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index d7d0d2631bf38..dbfcac23271b2 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -18,7 +18,17 @@
package org.apache.hudi.client.functional;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Time;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -32,6 +42,7 @@
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -89,16 +100,6 @@
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -1727,6 +1728,61 @@ public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Except
}
}
+ @Test
+ public void testDeletePartitions() throws Exception {
+ init(HoodieTableType.COPY_ON_WRITE);
+
+ int maxCommits = 1;
+ HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
+ .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
+ .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
+ .build();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+ client.startCommitWithTime(newCommitTime);
+ List records = dataGen.generateInserts(newCommitTime, 10);
+ List upsertRecords = new ArrayList<>();
+ for (HoodieRecord entry : records) {
+ if (entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
+ || entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) {
+ upsertRecords.add(entry);
+ }
+ }
+ List writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ validateMetadata(client);
+
+ // delete partitions
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
+ client.startCommitWithTime(newCommitTime);
+ client.deletePartitions(singletonList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH), newCommitTime);
+ validateMetadata(client);
+
+ // add 1 more commit
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
+ client.startCommitWithTime(newCommitTime);
+ records = dataGen.generateInserts(newCommitTime, 10);
+ upsertRecords = new ArrayList<>();
+ for (HoodieRecord entry : records) {
+ if (entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) {
+ upsertRecords.add(entry);
+ }
+ }
+ writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+
+ // trigger clean which will actually triggger deletion of the partition
+ newCommitTime = HoodieActiveTimeline.createNewInstantTime(5000);
+ HoodieCleanMetadata cleanMetadata = client.clean(newCommitTime);
+ validateMetadata(client);
+ assertEquals(1, metadata(client).getAllPartitionPaths().size());
+ }
+ }
+
/**
* Test various error scenarios.
*/
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index acd7e835eedc4..b4d6aefa71fe6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -79,7 +79,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);
- // 2nd write batch; 4 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit
+ // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
for (int i = 5; i < 9; i++) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
client.startCommitWithTime(instantTime);
@@ -97,7 +97,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
// verify records
final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(5, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()),
- "should only have the 4 records from the 3rd partition.");
+ "should only have the 5 records from the 3rd partition.");
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 71e4b4b4e6e3f..ab3d504fa4b14 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -687,7 +687,7 @@ public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnl
public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly, boolean isEmpty) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
- CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
+ CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
if (inflightOnly) {
HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan);
} else {
diff --git a/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc b/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc
index 877b7259188f7..3cb096d48bd7a 100644
--- a/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieCleanPartitionMetadata.avsc
@@ -24,6 +24,7 @@
{"name": "policy", "type": "string"},
{"name": "deletePathPatterns", "type": {"type": "array", "items": "string"}},
{"name": "successDeleteFiles", "type": {"type": "array", "items": "string"}},
- {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}}
+ {"name": "failedDeleteFiles", "type": {"type": "array", "items": "string"}},
+ {"name": "isPartitionDeleted", "type":["null", "boolean"], "default": null }
]
}
diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
index c4481c2cd804c..e4c8638c86e6f 100644
--- a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
+++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
@@ -92,6 +92,14 @@
}
}}],
"default" : null
+ },
+ {
+ "name": "partitionsToBeDeleted",
+ "doc": "partitions to be deleted",
+ "type":["null",
+ { "type":"array", "items":"string"}
+ ],
+ "default": null
}
]
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java
index e9de502f78bbf..fa5d80419434b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java
@@ -47,19 +47,22 @@ public class HoodieCleanStat implements Serializable {
private final List failedDeleteBootstrapBaseFiles;
// Earliest commit that was retained in this clean
private final String earliestCommitToRetain;
+ // set to true if partition is deleted
+ private final boolean isPartitionDeleted;
public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deletePathPatterns,
List successDeleteFiles, List failedDeleteFiles, String earliestCommitToRetain) {
this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain,
CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(),
- CollectionUtils.createImmutableList());
+ CollectionUtils.createImmutableList(), false);
}
public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deletePathPatterns,
List successDeleteFiles, List failedDeleteFiles,
String earliestCommitToRetain, List deleteBootstrapBasePathPatterns,
List successDeleteBootstrapBaseFiles,
- List failedDeleteBootstrapBaseFiles) {
+ List failedDeleteBootstrapBaseFiles,
+ boolean isPartitionDeleted) {
this.policy = policy;
this.partitionPath = partitionPath;
this.deletePathPatterns = deletePathPatterns;
@@ -69,6 +72,7 @@ public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deleteBootstrapBasePathPatterns;
private List successDeleteBootstrapBaseFiles;
private List failedDeleteBootstrapBaseFiles;
+ private boolean isPartitionDeleted;
public Builder withPolicy(HoodieCleaningPolicy policy) {
this.policy = policy;
@@ -172,10 +181,15 @@ public Builder withEarliestCommitRetained(Option earliestCommitTo
return this;
}
+ public Builder isPartitionDeleted(boolean isPartitionDeleted) {
+ this.isPartitionDeleted = isPartitionDeleted;
+ return this;
+ }
+
public HoodieCleanStat build() {
return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles,
earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles,
- failedDeleteBootstrapBaseFiles);
+ failedDeleteBootstrapBaseFiles, isPartitionDeleted);
}
}
@@ -190,7 +204,8 @@ public String toString() {
+ ", earliestCommitToRetain='" + earliestCommitToRetain
+ ", deleteBootstrapBasePathPatterns=" + deleteBootstrapBasePathPatterns
+ ", successDeleteBootstrapBaseFiles=" + successDeleteBootstrapBaseFiles
- + ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles + '\''
+ + ", failedDeleteBootstrapBaseFiles=" + failedDeleteBootstrapBaseFiles
+ + ", isPartitionDeleted=" + isPartitionDeleted + '\''
+ '}';
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
index 0010aa21fb1c1..66fdfeb62c207 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.timeline.versioning.clean;
+import java.util.ArrayList;
import java.util.HashMap;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -61,6 +62,6 @@ public HoodieCleanerPlan downgradeFrom(HoodieCleanerPlan plan) {
.collect(Collectors.toList()));
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), filesPerPartition, VERSION,
- new HashMap<>());
+ new HashMap<>(), new ArrayList<>());
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
index e141e9a15499f..fd82109bd4529 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.Path;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -53,7 +54,7 @@ public HoodieCleanerPlan upgradeFrom(HoodieCleanerPlan plan) {
new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), e.getKey()), v).toString(), false))
.collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), new HashMap<>(), VERSION,
- filePathsPerPartition);
+ filePathsPerPartition, new ArrayList<>());
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index a3a1305667f6a..df4e9ac402c6d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -64,13 +64,13 @@ public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime,
for (HoodieCleanStat stat : cleanStats) {
HoodieCleanPartitionMetadata metadata =
new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(),
- stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles());
+ stat.getDeletePathPatterns(), stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), stat.isPartitionDeleted());
partitionMetadataMap.put(stat.getPartitionPath(), metadata);
if ((null != stat.getDeleteBootstrapBasePathPatterns())
&& (!stat.getDeleteBootstrapBasePathPatterns().isEmpty())) {
HoodieCleanPartitionMetadata bootstrapMetadata = new HoodieCleanPartitionMetadata(stat.getPartitionPath(),
stat.getPolicy().name(), stat.getDeleteBootstrapBasePathPatterns(), stat.getSuccessDeleteBootstrapBaseFiles(),
- stat.getFailedDeleteBootstrapBaseFiles());
+ stat.getFailedDeleteBootstrapBaseFiles(), stat.isPartitionDeleted());
partitionBootstrapMetadataMap.put(stat.getPartitionPath(), bootstrapMetadata);
}
totalDeleted += stat.getSuccessDeleteFiles().size();
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 8680a8c83662f..f88986bd79731 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -18,6 +18,13 @@
package org.apache.hudi.metadata;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
@@ -35,14 +42,6 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.storage.HoodieHFileReader;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -222,8 +221,17 @@ protected HoodieMetadataPayload(String key, int type,
* @param partitions The list of partitions
*/
public static HoodieRecord createPartitionListRecord(List partitions) {
+ return createPartitionListRecord(partitions, false);
+ }
+
+ /**
+ * Create and return a {@code HoodieMetadataPayload} to save list of partitions.
+ *
+ * @param partitions The list of partitions
+ */
+ public static HoodieRecord createPartitionListRecord(List partitions, boolean isDeleted) {
Map fileInfo = new HashMap<>();
- partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, false)));
+ partitions.forEach(partition -> fileInfo.put(getPartition(partition), new HoodieMetadataFileInfo(0L, isDeleted)));
HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath());
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 8f543996b9e31..9761e4d13aaaf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -18,6 +18,11 @@
package org.apache.hudi.metadata;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -53,17 +58,10 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nonnull;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -161,10 +159,10 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo
String instantTime) {
List records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
- // Add record bearing partitions list
- ArrayList partitionsList = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
+ // Add record bearing added partitions list
+ ArrayList partitionsAdded = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
- records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList));
+ records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded));
// Update files listing records for each individual partition
List> updatedPartitionFilesRecords =
@@ -317,6 +315,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCl
String instantTime) {
List records = new LinkedList<>();
int[] fileDeleteCount = {0};
+ List deletedPartitions = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
final String partition = getPartition(partitionName);
// Files deleted from a partition
@@ -326,8 +325,16 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCl
records.add(record);
fileDeleteCount[0] += deletedFiles.size();
+ boolean isPartitionDeleted = partitionMetadata.getIsPartitionDeleted();
+ if (isPartitionDeleted) {
+ deletedPartitions.add(partitionName);
+ }
});
+ if (!deletedPartitions.isEmpty()) {
+ // if there are partitions to be deleted, add them to delete list
+ records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, true));
+ }
LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
return records;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 5f9aab84d0e6d..da140bfe3c61a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -284,7 +284,7 @@ public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPla
public HoodieTestTable addClean(String instantTime) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING), EMPTY_STRING, new HashMap<>(),
- CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
+ CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
HoodieCleanStat cleanStats = new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
HoodieTestUtils.DEFAULT_PARTITION_PATHS[RANDOM.nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
@@ -298,7 +298,7 @@ public HoodieTestTable addClean(String instantTime) throws IOException {
public Pair getHoodieCleanMetadata(String commitTime, HoodieTestTableState testTableState) {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING), EMPTY_STRING, new HashMap<>(),
- CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
+ CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
List cleanStats = new ArrayList<>();
for (Map.Entry> entry : testTableState.getPartitionToFileIdMapForCleaner(commitTime).entrySet()) {
cleanStats.add(new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index fc83cebc945d4..210e041c6430b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -45,8 +45,11 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
+import org.apache.spark.SPARK_VERSION
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 3e7adec7d59bb..ed70644edc8bf 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -47,6 +47,9 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")(
s"$tableName is a non-partitioned table that is not allowed to drop partition")
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
}
test("Purge drop non-partitioned table") {
@@ -71,6 +74,9 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01') purge")(
s"$tableName is a non-partitioned table that is not allowed to drop partition")
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(Seq.empty: _*)
}
Seq(false, true).foreach { urlencode =>
@@ -113,6 +119,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))
+
+ // show partitions
+ if (urlencode) {
+ checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02")))
+ } else {
+ checkAnswer(s"show partitions $tableName")(Seq("2021/10/02"))
+ }
}
}
}
@@ -157,6 +170,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
}
checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02"))
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath"))
+
+ // show partitions
+ if (urlencode) {
+ checkAnswer(s"show partitions $tableName")(Seq(PartitionPathEncodeUtils.escapePathName("2021/10/02")))
+ } else {
+ checkAnswer(s"show partitions $tableName")(Seq("2021/10/02"))
+ }
}
}
}
@@ -189,7 +209,10 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")
- checkAnswer(s"select id, name, ts, dt from $tableName") (Seq(2, "l4", "v1", "2021-10-02"))
+ checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02"))
+
+ // show partitions
+ checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02"))
}
Seq(false, true).foreach { hiveStyle =>
@@ -232,6 +255,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
checkAnswer(s"select id, name, ts, year, month, day from $tableName")(
Seq(2, "l4", "v1", "2021", "10", "02")
)
+
+ // show partitions
+ if (hiveStyle) {
+ checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02"))
+ } else {
+ checkAnswer(s"show partitions $tableName")(Seq("2021/10/02"))
+ }
}
}
}
@@ -274,6 +304,13 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase {
)
assertResult(false)(existsPath(
s"${tmp.getCanonicalPath}/$tableName/year=2021/month=10/day=01"))
+
+ // show partitions
+ if (hiveStyle) {
+ checkAnswer(s"show partitions $tableName")(Seq("year=2021/month=10/day=02"))
+ } else {
+ checkAnswer(s"show partitions $tableName")(Seq("2021/10/02"))
+ }
}
}
}