diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index dfb2fc8fc1c22..9650ddaebfd93 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -434,10 +434,10 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata me // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + autoCleanOnCommit(); // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); archiveLog.archiveIfRequired(context); - autoCleanOnCommit(); if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) { syncTableMetadata(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index aab02da238dbf..ad52ce3b15f4b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -18,6 +18,8 @@ package org.apache.hudi.utilities.functional; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.DFSPropertiesConfiguration; @@ -26,6 +28,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -100,6 +103,7 @@ import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -972,6 +976,86 @@ public void testInlineClustering() throws Exception { }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception { + String tableBasePath = dfsBasePath + "/cleanerDeleteReplacedDataWithArchive" + asyncClean; + + int totalRecords = 3000; + + // Step 1 : Prepare and insert data without archival and cleaner. + // Make sure that there are 6 commits including 2 replacecommits completed. + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); + cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0")); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + }); + + TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + + // Step 2 : Get the first replacecommit and extract the corresponding replaced file IDs. + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); + HoodieTimeline replacedTimeline = meta.reloadActiveTimeline().getCompletedReplaceTimeline(); + Option firstReplaceHoodieInstant = replacedTimeline.nthFromLastInstant(1); + assertTrue(firstReplaceHoodieInstant.isPresent()); + + Option firstReplaceHoodieInstantDetails = replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get()); + HoodieReplaceCommitMetadata firstReplaceMetadata = HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(), HoodieReplaceCommitMetadata.class); + Map> partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds(); + String partitionName = null; + List replacedFileIDs = null; + for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) { + partitionName = String.valueOf(entry.getKey()); + replacedFileIDs = (List) entry.getValue(); + } + + assertNotNull(partitionName); + assertNotNull(replacedFileIDs); + + // Step 3 : Based to replacedFileIDs , get the corresponding complete path. + ArrayList replacedFilePaths = new ArrayList<>(); + Path partitionPath = new Path(meta.getBasePath(), partitionName); + RemoteIterator hoodieFiles = meta.getFs().listFiles(partitionPath, true); + while (hoodieFiles.hasNext()) { + LocatedFileStatus f = hoodieFiles.next(); + String file = f.getPath().toUri().toString(); + for (Object replacedFileID : replacedFileIDs) { + if (file.contains(String.valueOf(replacedFileID))) { + replacedFilePaths.add(file); + } + } + } + + assertFalse(replacedFilePaths.isEmpty()); + + // Step 4 : Insert 1 record and trigger sync/async cleaner and archive. + List configs = getAsyncServicesConfigs(1, "true", "true", "2", "", ""); + configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3")); + configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, asyncClean)); + cfg.configs = configs; + cfg.continuousMode = false; + ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + + // Step 5 : Make sure that firstReplaceHoodieInstant is archived. + long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstants().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count(); + assertEquals(0, count); + + // Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner. + for (String replacedFilePath : replacedFilePaths) { + assertFalse(meta.getFs().exists(new Path(replacedFilePath))); + } + } + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { List configs = new ArrayList<>();