Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,10 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoCleanOnCommit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Added

// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
autoCleanOnCommit();
if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {
syncTableMetadata();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.utilities.functional;

import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
Expand All @@ -26,6 +28,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -100,6 +103,7 @@
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -972,6 +976,86 @@ public void testInlineClustering() throws Exception {
});
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception {
String tableBasePath = dfsBasePath + "/cleanerDeleteReplacedDataWithArchive" + asyncClean;

int totalRecords = 3000;

// Step 1 : Prepare and insert data without archival and cleaner.
// Make sure that there are 6 commits including 2 replacecommits completed.
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
return true;
});

TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);

// Step 2 : Get the first replacecommit and extract the corresponding replaced file IDs.
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
HoodieTimeline replacedTimeline = meta.reloadActiveTimeline().getCompletedReplaceTimeline();
Option<HoodieInstant> firstReplaceHoodieInstant = replacedTimeline.nthFromLastInstant(1);
assertTrue(firstReplaceHoodieInstant.isPresent());

Option<byte[]> firstReplaceHoodieInstantDetails = replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get());
HoodieReplaceCommitMetadata firstReplaceMetadata = HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(), HoodieReplaceCommitMetadata.class);
Map<String, List<String>> partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds();
String partitionName = null;
List replacedFileIDs = null;
for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
partitionName = String.valueOf(entry.getKey());
replacedFileIDs = (List) entry.getValue();
}

assertNotNull(partitionName);
assertNotNull(replacedFileIDs);

// Step 3 : Based to replacedFileIDs , get the corresponding complete path.
ArrayList<String> replacedFilePaths = new ArrayList<>();
Path partitionPath = new Path(meta.getBasePath(), partitionName);
RemoteIterator<LocatedFileStatus> hoodieFiles = meta.getFs().listFiles(partitionPath, true);
while (hoodieFiles.hasNext()) {
LocatedFileStatus f = hoodieFiles.next();
String file = f.getPath().toUri().toString();
for (Object replacedFileID : replacedFileIDs) {
if (file.contains(String.valueOf(replacedFileID))) {
replacedFilePaths.add(file);
}
}
}

assertFalse(replacedFilePaths.isEmpty());

// Step 4 : Insert 1 record and trigger sync/async cleaner and archive.
List<String> configs = getAsyncServicesConfigs(1, "true", "true", "2", "", "");
configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN, asyncClean));
cfg.configs = configs;
cfg.continuousMode = false;
ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();

// Step 5 : Make sure that firstReplaceHoodieInstant is archived.
long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstants().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count();
assertEquals(0, count);

// Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner.
for (String replacedFilePath : replacedFilePaths) {
assertFalse(meta.getFs().exists(new Path(replacedFilePath)));
}
}

private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster,
String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) {
List<String> configs = new ArrayList<>();
Expand Down