Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -537,15 +537,14 @@ private Stream<HoodieInstant> getInstantsToArchive() {
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
LOG.info("Deleting instants " + archivedInstants);

List<String> pendingInstantFiles = new ArrayList<>();
List<String> completedInstantFiles = new ArrayList<>();
List<HoodieInstant> pendingInstants = new ArrayList<>();
List<HoodieInstant> completedInstants = new ArrayList<>();

for (HoodieInstant instant : archivedInstants) {
String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString();
if (instant.isCompleted()) {
completedInstantFiles.add(filePath);
completedInstants.add(instant);
} else {
pendingInstantFiles.add(filePath);
pendingInstants.add(instant);
}
}

Expand All @@ -556,27 +555,30 @@ private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, Hoo
// other monitors on the timeline(such as the compaction or clustering services) would
// mistakenly recognize the pending file as a pending operation,
// then all kinds of weird bugs occur.
boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles);
success &= deleteArchivedInstantFiles(context, success, completedInstantFiles);
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
if (!pendingInstants.isEmpty()) {
context.foreach(
pendingInstants,
instant -> activeTimeline.deleteInstantFileIfExists(instant),
Math.min(pendingInstants.size(), config.getArchiveDeleteParallelism())
);
}
if (!completedInstants.isEmpty()) {
context.foreach(
completedInstants,
instant -> activeTimeline.deleteInstantFileIfExists(instant),
Math.min(completedInstants.size(), config.getArchiveDeleteParallelism())
);
}

// Remove older meta-data from auxiliary path too
Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
LOG.info("Latest Committed Instant=" + latestCommitted);
if (latestCommitted.isPresent()) {
success &= deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
return deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get());
}
return success;
}

private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List<String> files) {
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false);

for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
success &= result.getValue();
}
return success;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,26 @@ public void deleteCompactionRequested(HoodieInstant instant) {
deleteInstantFile(instant);
}

/**
* Note: This method should only be used in the case that delete requested/inflight instant or empty clean instant,
* and completed commit instant in an archive operation.
*/
public void deleteInstantFileIfExists(HoodieInstant instant) {
LOG.info("Deleting instant " + instant);
Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName());
Path commitFilePath = getInstantFileNamePath(instant.getFileName());
Comment on lines -271 to +275
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to be careful about what commit can be deleted. this API is designed to only delete requested/inflight or empty clean commit, based on the usage. (1 exception is in org.apache.hudi.cli.commands.TestRepairsCommand#testShowFailedCommits where this api is misused for deleting completed commits in tests; we should make separate test helper for that)

We can't delete completed commit instants, which breaks the timeline's integrity. So we should name it properly at the variable as well as the API level.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we should add some doc for this method. inFlightCommitFilePath doesn't represent requested and empty clean instant.

try {
if (metaClient.getFs().exists(inFlightCommitFilePath)) {
boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the core change is renaming a method here: deleteInstantFileIfExists -> deleteInstantIfExists ? If that is true, i would suggest to keep as it is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no.
the major change is that, when need to delete active instant in archive operation, use HoodieActiveTimeline's deleteInstantIfExists instead of calling the naked api (fs.delete). After the changes, HoodieActiveTimeline becomes the standard route to operate active instants.

For this above, merge the origin method deleteInstantFile (used in current class and subclass) and deleteInstantFileIfExists (used outside) to one, and rename to deleteInstantIfExists.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling the naked api (fs.delete).

deleteInstantFile(HoodieInstant instant) also takes a HoodieInstant param, so what's the difference here ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old way to delete active instants when archive uses deleteArchivedInstantFiles -> deleteFilesParallelize in HoodieTimelineArchiver. and deleteFilesParallelize calls fs.delete() directly.

if (metaClient.getFs().exists(commitFilePath)) {
boolean result = metaClient.getFs().delete(commitFilePath, false);
if (result) {
LOG.info("Removed instant " + instant);
} else {
throw new HoodieIOException("Could not delete instant " + instant);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method save one invocation for fs.exists, let's keep it.

}
} else {
LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not exist");
LOG.warn("The commit " + commitFilePath + " to remove does not exist");
}
} catch (IOException e) {
throw new HoodieIOException("Could not remove inflight commit " + inFlightCommitFilePath, e);
throw new HoodieIOException("Could not remove commit " + commitFilePath, e);
}
}

Expand Down