Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,10 @@ public void archive(HoodieEngineContext context, List<HoodieInstant> instants) t
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
// TODO HUDI-1518 Cleaner now takes care of removing replaced file groups. This call to deleteReplacedFileGroups can be removed.
boolean deleteSuccess = deleteReplacedFileGroups(context, hoodieInstant);
if (!deleteSuccess) {
// throw error and stop archival if deleting replaced file groups failed.
throw new HoodieCommitException("Unable to delete file(s) for " + hoodieInstant.getFileName());
LOG.warn("Unable to delete file(s) for " + hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner");
}
try {
deleteAnyLeftOverMarkerFiles(context, hoodieInstant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
Expand Down Expand Up @@ -72,7 +72,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);

if (partitionsToClean.isEmpty()) {
LOG.info("Nothing to clean here. It is already clean");
LOG.info("Nothing to clean here.");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -111,14 +112,14 @@ public Stream<String> getSavepointedDataFiles(String savepointTime) {
/**
* Returns list of partitions where clean operations needs to be performed.
*
* @param newInstantToRetain New instant to be retained after this cleanup operation
* @param earliestRetainedInstant New instant to be retained after this cleanup operation
* @return list of partitions to scan for cleaning
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException {
public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
switch (config.getCleanerPolicy()) {
case KEEP_LATEST_COMMITS:
return getPartitionPathsForCleanByCommits(newInstantToRetain);
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
case KEEP_LATEST_FILE_VERSIONS:
return getPartitionPathsForFullCleaning();
default:
Expand Down Expand Up @@ -168,10 +169,16 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
} else {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
Expand All @@ -196,13 +203,17 @@ private List<String> getPartitionPathsForFullCleaning() throws IOException {
private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitionPath) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ " file versions. ");
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());

// In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));

List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
Expand All @@ -226,18 +237,7 @@ private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitio
// Delete the remaining files
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
if (nextSlice.getBaseFile().isPresent()) {
HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
deletePaths.add(new CleanFileInfo(dataFile.getPath(), false));
if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
deletePaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
}
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
}
return deletePaths;
Expand Down Expand Up @@ -269,7 +269,11 @@ private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partition

// determine if we have enough commits, to start cleaning.
if (commitTimeline.countInstants() > commitsRetained) {
HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get();
Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
// all replaced file groups before earliestCommitToRetain are eligible to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
// add active files
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
Expand Down Expand Up @@ -322,6 +326,20 @@ private List<CleanFileInfo> getFilesToCleanKeepingLatestCommits(String partition
}
return deletePaths;
}

private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
final Stream<HoodieFileGroup> replacedGroups;
if (earliestCommitToRetain.isPresent()) {
replacedGroups = fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath);
} else {
replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath);
}
return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices)
// do not delete savepointed files (archival will make sure corresponding replacecommit file is not deleted)
.filter(slice -> !slice.getBaseFile().isPresent() || !savepointedFiles.contains(slice.getBaseFile().get().getFileName()))
.flatMap(slice -> getCleanFileInfoForSlice(slice).stream())
.collect(Collectors.toList());
}

/**
* Gets the latest version < instantTime. This version file could still be used by queries.
Expand All @@ -339,6 +357,23 @@ private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
return null;
}

private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) {
List<CleanFileInfo> cleanPaths = new ArrayList<>();
if (nextSlice.getBaseFile().isPresent()) {
HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false));
if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
cleanPaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
}
}
if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
// If merge on read, then clean the log files for the commits as well
cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
return cleanPaths;
}

/**
* Returns files to be cleaned for the given partitionPath based on cleaning policy.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFil
List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>();
switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.REPLACE_COMMIT_ACTION:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like something we should have had from the beginning?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was missed because MOR is not well tested with clustering/insert_overwrite. New test covers this. Btw, There may be other places where we might be missing similar handling of REPLACE_COMMIT. I tried doing an audit, but its very tedious. So, I may have missed some places. Let me know if you find any.

LOG.info("Rolling back commit action.");
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.metadata;

import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
Expand Down Expand Up @@ -498,6 +499,15 @@ public void testSync(HoodieTableType tableType) throws Exception {
writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());

// insert overwrite to test replacecommit
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
records = dataGen.generateInserts(newCommitTime, 5);
HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime);
writeStatuses = replaceResult.getWriteStatuses().collect();
assertNoWriteErrors(writeStatuses);
assertFalse(metadata(client).isInSync());
}

// Enable metadata table and ensure it is synced
Expand Down Expand Up @@ -800,6 +810,7 @@ private void validateMetadata(SparkRDDWriteClient client) throws IOException {

// FileSystemView should expose the same data
List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList());
fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList()));

fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
Expand Down
Loading