From 13c31f3b8138f805cf6be3a118bdd8f21e529122 Mon Sep 17 00:00:00 2001 From: Yann Date: Mon, 14 Nov 2022 18:08:04 +0800 Subject: [PATCH 1/6] [MINOR] move logic for deleting active instant to HoodieActiveTimeline --- .../hudi/client/HoodieTimelineArchiver.java | 36 ++++++++++++------- .../table/timeline/HoodieActiveTimeline.java | 18 ++++++---- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index bb814f817d09..84f3d06ebd33 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -48,6 +48,7 @@ import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; @@ -537,15 +538,14 @@ private Stream getInstantsToArchive() { private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); - List pendingInstantFiles = new ArrayList<>(); - List completedInstantFiles = new ArrayList<>(); + List pendingInstants = new ArrayList<>(); + List completedInstants = new ArrayList<>(); for (HoodieInstant instant : archivedInstants) { - String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString(); if (instant.isCompleted()) { - completedInstantFiles.add(filePath); + completedInstants.add(instant); } else { - pendingInstantFiles.add(filePath); + pendingInstants.add(instant); } } @@ -556,8 +556,8 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo // other monitors on the timeline(such as the compaction or clustering services) would // mistakenly recognize the pending file as a pending operation, // then all kinds of weird bugs occur. - boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles); - success &= deleteArchivedInstantFiles(context, success, completedInstantFiles); + boolean success = deleteArchivedInstants(context, metaClient, pendingInstants); + success &= deleteArchivedInstants(context, metaClient, completedInstants); // Remove older meta-data from auxiliary path too Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) @@ -569,12 +569,24 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo return success; } - private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List files) { - Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false); + private boolean deleteArchivedInstants(HoodieEngineContext context, + HoodieTableMetaClient metaClient, + List instants) { + if (instants.isEmpty()) { + return true; + } - for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { - LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); - success &= result.getValue(); + Map result = context.mapToPair( + instants, + instant -> ImmutablePair.of( + instant, + metaClient.getActiveTimeline().deleteInstantFileIfExists(instant, false)), + Math.min(instants.size(), config.getArchiveDeleteParallelism())); + + boolean success = true; + for (Map.Entry entry : result.entrySet()) { + LOG.info("Archived and deleted instant " + entry.getKey().toString() + " : " + entry.getValue()); + success &= entry.getValue(); } return success; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index be351ab8e839..8f1049e3d5c0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -267,21 +267,27 @@ public void deleteCompactionRequested(HoodieInstant instant) { } public void deleteInstantFileIfExists(HoodieInstant instant) { + deleteInstantFileIfExists(instant, true); + } + + public boolean deleteInstantFileIfExists(HoodieInstant instant, boolean exceptionIfFailToDelete) { LOG.info("Deleting instant " + instant); - Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName()); + Path commitFilePath = getInstantFileNamePath(instant.getFileName()); try { - if (metaClient.getFs().exists(inFlightCommitFilePath)) { - boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false); + if (metaClient.getFs().exists(commitFilePath)) { + boolean result = metaClient.getFs().delete(commitFilePath, false); if (result) { LOG.info("Removed instant " + instant); - } else { + } else if (exceptionIfFailToDelete) { throw new HoodieIOException("Could not delete instant " + instant); } + return result; } else { - LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not exist"); + LOG.warn("The commit " + commitFilePath + " to remove does not exist"); + return true; } } catch (IOException e) { - throw new HoodieIOException("Could not remove inflight commit " + inFlightCommitFilePath, e); + throw new HoodieIOException("Could not remove commit " + commitFilePath, e); } } From d741194e4fc5539d0756f7e97e3faac697570f75 Mon Sep 17 00:00:00 2001 From: Yann Date: Fri, 25 Nov 2022 21:37:45 +0800 Subject: [PATCH 2/6] solve comments --- .../hudi/client/HoodieTimelineArchiver.java | 37 +++++++------------ .../table/timeline/HoodieActiveTimeline.java | 12 +++--- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 84f3d06ebd33..64f473d2103a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -48,7 +48,6 @@ import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; @@ -556,39 +555,29 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo // other monitors on the timeline(such as the compaction or clustering services) would // mistakenly recognize the pending file as a pending operation, // then all kinds of weird bugs occur. - boolean success = deleteArchivedInstants(context, metaClient, pendingInstants); - success &= deleteArchivedInstants(context, metaClient, completedInstants); + deleteArchivedInstants(context, metaClient, pendingInstants); + deleteArchivedInstants(context, metaClient, completedInstants); // Remove older meta-data from auxiliary path too Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); LOG.info("Latest Committed Instant=" + latestCommitted); if (latestCommitted.isPresent()) { - success &= deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); + return deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); } - return success; + return true; } - private boolean deleteArchivedInstants(HoodieEngineContext context, - HoodieTableMetaClient metaClient, - List instants) { - if (instants.isEmpty()) { - return true; - } - - Map result = context.mapToPair( - instants, - instant -> ImmutablePair.of( - instant, - metaClient.getActiveTimeline().deleteInstantFileIfExists(instant, false)), - Math.min(instants.size(), config.getArchiveDeleteParallelism())); - - boolean success = true; - for (Map.Entry entry : result.entrySet()) { - LOG.info("Archived and deleted instant " + entry.getKey().toString() + " : " + entry.getValue()); - success &= entry.getValue(); + private void deleteArchivedInstants(HoodieEngineContext context, + HoodieTableMetaClient metaClient, + List instants) { + if (!instants.isEmpty()) { + context.foreach( + instants, + instant -> metaClient.getActiveTimeline().deleteInstantFileIfExists(instant), + Math.min(instants.size(), config.getArchiveDeleteParallelism()) + ); } - return success; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 8f1049e3d5c0..4507c092a31f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -266,11 +266,11 @@ public void deleteCompactionRequested(HoodieInstant instant) { deleteInstantFile(instant); } + /** + * Note: This should only be used in the case that delete requested/inflight instant + * or empty clean instant. The completed commit instant is not allowed to delete. + */ public void deleteInstantFileIfExists(HoodieInstant instant) { - deleteInstantFileIfExists(instant, true); - } - - public boolean deleteInstantFileIfExists(HoodieInstant instant, boolean exceptionIfFailToDelete) { LOG.info("Deleting instant " + instant); Path commitFilePath = getInstantFileNamePath(instant.getFileName()); try { @@ -278,13 +278,11 @@ public boolean deleteInstantFileIfExists(HoodieInstant instant, boolean exceptio boolean result = metaClient.getFs().delete(commitFilePath, false); if (result) { LOG.info("Removed instant " + instant); - } else if (exceptionIfFailToDelete) { + } else { throw new HoodieIOException("Could not delete instant " + instant); } - return result; } else { LOG.warn("The commit " + commitFilePath + " to remove does not exist"); - return true; } } catch (IOException e) { throw new HoodieIOException("Could not remove commit " + commitFilePath, e); From 4d6c6b84b2a8269cb2b7dd29433cdb30c23d4a82 Mon Sep 17 00:00:00 2001 From: Yann Date: Mon, 28 Nov 2022 10:14:35 +0800 Subject: [PATCH 3/6] update --- .../hudi/client/HoodieTimelineArchiver.java | 28 +++++++++---------- .../table/timeline/HoodieActiveTimeline.java | 4 +-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 64f473d2103a..1e4005e4fefc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -555,8 +555,20 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo // other monitors on the timeline(such as the compaction or clustering services) would // mistakenly recognize the pending file as a pending operation, // then all kinds of weird bugs occur. - deleteArchivedInstants(context, metaClient, pendingInstants); - deleteArchivedInstants(context, metaClient, completedInstants); + if (!pendingInstants.isEmpty()) { + context.foreach( + pendingInstants, + instant -> metaClient.getActiveTimeline().deleteInstantFileIfExists(instant), + Math.min(pendingInstants.size(), config.getArchiveDeleteParallelism()) + ); + } + if (!completedInstants.isEmpty()) { + context.foreach( + completedInstants, + instant -> metaClient.getActiveTimeline().deleteInstantFileIfExists(instant), + Math.min(completedInstants.size(), config.getArchiveDeleteParallelism()) + ); + } // Remove older meta-data from auxiliary path too Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) @@ -568,18 +580,6 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo return true; } - private void deleteArchivedInstants(HoodieEngineContext context, - HoodieTableMetaClient metaClient, - List instants) { - if (!instants.isEmpty()) { - context.foreach( - instants, - instant -> metaClient.getActiveTimeline().deleteInstantFileIfExists(instant), - Math.min(instants.size(), config.getArchiveDeleteParallelism()) - ); - } - } - /** * Remove older instants from auxiliary meta folder. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 4507c092a31f..0ef46031ec1b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -267,8 +267,8 @@ public void deleteCompactionRequested(HoodieInstant instant) { } /** - * Note: This should only be used in the case that delete requested/inflight instant - * or empty clean instant. The completed commit instant is not allowed to delete. + * Note: This method should only be used in the case that delete requested/inflight instant or empty clean instant, + * and completed commit instant in an archive operation. */ public void deleteInstantFileIfExists(HoodieInstant instant) { LOG.info("Deleting instant " + instant); From a7d0518c591b55bfb318bf505b62ba8d8737ecfe Mon Sep 17 00:00:00 2001 From: Yann Date: Mon, 28 Nov 2022 18:00:13 +0800 Subject: [PATCH 4/6] update --- .../java/org/apache/hudi/client/HoodieTimelineArchiver.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 1e4005e4fefc..a61a5c900829 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -555,17 +555,18 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo // other monitors on the timeline(such as the compaction or clustering services) would // mistakenly recognize the pending file as a pending operation, // then all kinds of weird bugs occur. + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); if (!pendingInstants.isEmpty()) { context.foreach( pendingInstants, - instant -> metaClient.getActiveTimeline().deleteInstantFileIfExists(instant), + instant -> activeTimeline.deleteInstantFileIfExists(instant), Math.min(pendingInstants.size(), config.getArchiveDeleteParallelism()) ); } if (!completedInstants.isEmpty()) { context.foreach( completedInstants, - instant -> metaClient.getActiveTimeline().deleteInstantFileIfExists(instant), + instant -> activeTimeline.deleteInstantFileIfExists(instant), Math.min(completedInstants.size(), config.getArchiveDeleteParallelism()) ); } From 4a760cacfc3b8f3182263351e105c5cb461767d3 Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 29 Nov 2022 22:16:17 +0800 Subject: [PATCH 5/6] test CI --- .../hudi/client/HoodieTimelineArchiver.java | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index a61a5c900829..bb814f817d09 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -537,14 +537,15 @@ private Stream getInstantsToArchive() { private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); - List pendingInstants = new ArrayList<>(); - List completedInstants = new ArrayList<>(); + List pendingInstantFiles = new ArrayList<>(); + List completedInstantFiles = new ArrayList<>(); for (HoodieInstant instant : archivedInstants) { + String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString(); if (instant.isCompleted()) { - completedInstants.add(instant); + completedInstantFiles.add(filePath); } else { - pendingInstants.add(instant); + pendingInstantFiles.add(filePath); } } @@ -555,30 +556,27 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo // other monitors on the timeline(such as the compaction or clustering services) would // mistakenly recognize the pending file as a pending operation, // then all kinds of weird bugs occur. - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - if (!pendingInstants.isEmpty()) { - context.foreach( - pendingInstants, - instant -> activeTimeline.deleteInstantFileIfExists(instant), - Math.min(pendingInstants.size(), config.getArchiveDeleteParallelism()) - ); - } - if (!completedInstants.isEmpty()) { - context.foreach( - completedInstants, - instant -> activeTimeline.deleteInstantFileIfExists(instant), - Math.min(completedInstants.size(), config.getArchiveDeleteParallelism()) - ); - } + boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles); + success &= deleteArchivedInstantFiles(context, success, completedInstantFiles); // Remove older meta-data from auxiliary path too Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); LOG.info("Latest Committed Instant=" + latestCommitted); if (latestCommitted.isPresent()) { - return deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); + success &= deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); } - return true; + return success; + } + + private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List files) { + Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false); + + for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { + LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); + success &= result.getValue(); + } + return success; } /** From 65c6fdd3e381b11879a8e3f69635567a05df1b50 Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 30 Nov 2022 11:05:04 +0800 Subject: [PATCH 6/6] Revert "test CI" This reverts commit 4a760cacfc3b8f3182263351e105c5cb461767d3. --- .../hudi/client/HoodieTimelineArchiver.java | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index bb814f817d09..a61a5c900829 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -537,15 +537,14 @@ private Stream getInstantsToArchive() { private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); - List pendingInstantFiles = new ArrayList<>(); - List completedInstantFiles = new ArrayList<>(); + List pendingInstants = new ArrayList<>(); + List completedInstants = new ArrayList<>(); for (HoodieInstant instant : archivedInstants) { - String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString(); if (instant.isCompleted()) { - completedInstantFiles.add(filePath); + completedInstants.add(instant); } else { - pendingInstantFiles.add(filePath); + pendingInstants.add(instant); } } @@ -556,27 +555,30 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo // other monitors on the timeline(such as the compaction or clustering services) would // mistakenly recognize the pending file as a pending operation, // then all kinds of weird bugs occur. - boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles); - success &= deleteArchivedInstantFiles(context, success, completedInstantFiles); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + if (!pendingInstants.isEmpty()) { + context.foreach( + pendingInstants, + instant -> activeTimeline.deleteInstantFileIfExists(instant), + Math.min(pendingInstants.size(), config.getArchiveDeleteParallelism()) + ); + } + if (!completedInstants.isEmpty()) { + context.foreach( + completedInstants, + instant -> activeTimeline.deleteInstantFileIfExists(instant), + Math.min(completedInstants.size(), config.getArchiveDeleteParallelism()) + ); + } // Remove older meta-data from auxiliary path too Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); LOG.info("Latest Committed Instant=" + latestCommitted); if (latestCommitted.isPresent()) { - success &= deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); + return deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); } - return success; - } - - private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List files) { - Map resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false); - - for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { - LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); - success &= result.getValue(); - } - return success; + return true; } /**