diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 190a5fe1c6064..53888b0b4c507 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -515,13 +515,25 @@ private Stream getInstantsToArchive() { private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); boolean success = true; - List instantFiles = archivedInstants.stream().map(archivedInstant -> - new Path(metaClient.getMetaPath(), archivedInstant.getFileName()) - ).map(Path::toString).collect(Collectors.toList()); - + List nonCommitInstants = new ArrayList<>(); + List commitInstant = new ArrayList<>(); + for (HoodieInstant hoodieInstant : archivedInstants) { + if (hoodieInstant.isCompleted()) { + commitInstant.add(hoodieInstant); + } else { + nonCommitInstants.add(hoodieInstant); + } + } + List nonCommitInstantFiles = mapInstantToFile(nonCommitInstants); + List commitInstantFiles = mapInstantToFile(commitInstant); context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants"); - Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false); - + // Suppose an application crashes while archiving, it leaves some instant files that should be deleted. + // If the commit file is deleted but the in-flight file is left, + // when the application starts, it will scan and get the last pending instant and throw an exception. + // So we need to delete non-commit instant files first. + Map resultDeleteInstantFiles = new HashMap<>(); + resultDeleteInstantFiles.putAll(deleteFilesParallelize(metaClient, nonCommitInstantFiles, context, false)); + resultDeleteInstantFiles.putAll(deleteFilesParallelize(metaClient, commitInstantFiles, context, false)); for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); success &= result.getValue(); @@ -537,6 +549,12 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo return success; } + private List mapInstantToFile(List instants) { + return instants.stream().map(instant -> + new Path(metaClient.getMetaPath(), instant.getFileName()) + ).map(Path::toString).collect(Collectors.toList()); + } + /** * Remove older instants from auxiliary meta folder. *