diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index bb814f817d099..a61a5c9008293 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -537,15 +537,14 @@ private Stream getInstantsToArchive() { private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); - List pendingInstantFiles = new ArrayList<>(); - List completedInstantFiles = new ArrayList<>(); + List pendingInstants = new ArrayList<>(); + List completedInstants = new ArrayList<>(); for (HoodieInstant instant : archivedInstants) { - String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString(); if (instant.isCompleted()) { - completedInstantFiles.add(filePath); + completedInstants.add(instant); } else { - pendingInstantFiles.add(filePath); + pendingInstants.add(instant); } } @@ -556,27 +555,30 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo // other monitors on the timeline(such as the compaction or clustering services) would // mistakenly recognize the pending file as a pending operation, // then all kinds of weird bugs occur. - boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles); - success &= deleteArchivedInstantFiles(context, success, completedInstantFiles); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + if (!pendingInstants.isEmpty()) { + context.foreach( + pendingInstants, + instant -> activeTimeline.deleteInstantFileIfExists(instant), + Math.min(pendingInstants.size(), config.getArchiveDeleteParallelism()) + ); + } + if (!completedInstants.isEmpty()) { + context.foreach( + completedInstants, + instant -> activeTimeline.deleteInstantFileIfExists(instant), + Math.min(completedInstants.size(), config.getArchiveDeleteParallelism()) + ); + } // Remove older meta-data from auxiliary path too Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); LOG.info("Latest Committed Instant=" + latestCommitted); if (latestCommitted.isPresent()) { - success &= deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); + return deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); } - return success; - } - - private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List files) { - Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false); - - for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { - LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); - success &= result.getValue(); - } - return success; + return true; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index be351ab8e839e..0ef46031ec1b0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -266,22 +266,26 @@ public void deleteCompactionRequested(HoodieInstant instant) { deleteInstantFile(instant); } + /** + * Note: This method should only be used in the case that delete requested/inflight instant or empty clean instant, + * and completed commit instant in an archive operation. + */ public void deleteInstantFileIfExists(HoodieInstant instant) { LOG.info("Deleting instant " + instant); - Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName()); + Path commitFilePath = getInstantFileNamePath(instant.getFileName()); try { - if (metaClient.getFs().exists(inFlightCommitFilePath)) { - boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false); + if (metaClient.getFs().exists(commitFilePath)) { + boolean result = metaClient.getFs().delete(commitFilePath, false); if (result) { LOG.info("Removed instant " + instant); } else { throw new HoodieIOException("Could not delete instant " + instant); } } else { - LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not exist"); + LOG.warn("The commit " + commitFilePath + " to remove does not exist"); } } catch (IOException e) { - throw new HoodieIOException("Could not remove inflight commit " + inFlightCommitFilePath, e); + throw new HoodieIOException("Could not remove commit " + commitFilePath, e); } }