diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index f111bb70ef007..c53554d8e04d2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -506,13 +506,7 @@ private Stream getInstantsToArchive() { List instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction()))); if (instantsToStream != null) { - // sorts the instants in natural order to make sure the metadata files be removed - // in HoodieInstant.State sequence: requested -> inflight -> completed, - // this is important because when a COMPLETED metadata file is removed first, - // other monitors on the timeline(such as the compaction or clustering services) would - // mistakenly recognize the pending file as a pending operation, - // then all kinds of weird bugs occur. - return instantsToStream.stream().sorted(); + return instantsToStream.stream(); } else { // if a concurrent writer archived the instant return Stream.empty(); @@ -522,19 +516,29 @@ private Stream getInstantsToArchive() { private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); - boolean success = true; - List instantFiles = archivedInstants.stream().map(archivedInstant -> - new Path(metaClient.getMetaPath(), archivedInstant.getFileName()) - ).map(Path::toString).collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName()); - Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false); + List pendingInstantFiles = new ArrayList<>(); + List completedInstantFiles = new ArrayList<>(); - for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { - LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); - success &= result.getValue(); + for (HoodieInstant instant : archivedInstants) { + String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString(); + if (instant.isCompleted()) { + completedInstantFiles.add(filePath); + } else { + pendingInstantFiles.add(filePath); + } } + context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName()); + // Delete the metadata files + // in HoodieInstant.State sequence: requested -> inflight -> completed, + // this is important because when a COMPLETED metadata file is removed first, + // other monitors on the timeline(such as the compaction or clustering services) would + // mistakenly recognize the pending file as a pending operation, + // then all kinds of weird bugs occur. + boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles); + success &= deleteArchivedInstantFiles(context, success, completedInstantFiles); + // Remove older meta-data from auxiliary path too Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); @@ -545,6 +549,16 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo return success; } + private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List files) { + Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false); + + for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { + LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); + success &= result.getValue(); + } + return success; + } + /** * Remove older instants from auxiliary meta folder. *