diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 21a4fb58932b8..43a85dd5ec126 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -158,8 +158,12 @@ private Stream getCommitInstantsToArchive() { // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify // with logic above to avoid Stream.concats HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - Option oldestPendingCompactionInstant = - table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); + + Option oldestPendingCompactionAndReplaceInstant = table.getActiveTimeline() + .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)) + .filter(s -> !s.isCompleted()) + .firstInstant(); + Option oldestInflightCommitInstant = table.getActiveTimeline() .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) @@ -176,7 +180,7 @@ private Stream getCommitInstantsToArchive() { return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); }).filter(s -> { // Ensure commits >= oldest pending compaction commit is retained - return oldestPendingCompactionInstant + return oldestPendingCompactionAndReplaceInstant .map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); }); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index aeef69e53dfd5..21120318e5049 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -243,6 +243,25 @@ public void testArchiveCommitSavepointNoHole() throws Exception { "Archived commits should always be safe"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPendingClusteringWillBlockArchival(boolean enableMetadata) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 5, 2); + HoodieTestDataGenerator.createPendingReplaceFile(basePath, "00000000", wrapperFs.getConf()); + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 2); + // archival + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); + List originalCommits = commitsList.getKey(); + List commitsAfterArchival = commitsList.getValue(); + assertEquals(originalCommits, commitsAfterArchival); + } + + HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); + assertEquals(7, timeline.countInstants(), + "Since we have a pending clustering instant at 00000000, we should never archive any commit after 00000000"); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testArchiveRollbacksTestTable(boolean enableMetadata) throws Exception { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index e988c9df618cc..5bf629ea9417e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -388,6 +388,17 @@ public static void createReplaceFile(String basePath, String instantTime, Config .forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata)); } + public static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) { + Arrays.asList(HoodieTimeline.makeInflightReplaceFileName(instantTime), + HoodieTimeline.makeRequestedReplaceFileName(instantTime)) + .forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata)); + } + + public static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration) { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + createPendingReplaceFile(basePath, instantTime, configuration, commitMetadata); + } + public static void createEmptyCleanRequestedFile(String basePath, String instantTime, Configuration configuration) throws IOException { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"