diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java index f6cc0c70b65e4..635f8399d3d7f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -56,9 +55,8 @@ import java.util.stream.Stream; import static org.apache.hudi.client.utils.ArchivalUtils.getMinAndMaxInstantsToKeep; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; /** @@ -127,140 +125,104 @@ public boolean archiveIfRequired(HoodieEngineContext context, boolean acquireLoc } } - private Stream getCleanInstantsToArchive() { + private List getCleanAndRollbackInstantsToArchive(HoodieInstant latestCommitInstantToArchive) { HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline() - .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants(); + .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, HoodieTimeline.ROLLBACK_ACTION)) + .filterCompletedInstants(); + + // Since the commit instants to archive is continuous, we can use the latest commit instant to archive as the + // right boundary to collect the clean or rollback instants to archive. + // + // latestCommitInstantToArchive + // v + // | commit1 clean1 commit2 commit3 clean2 commit4 rollback1 commit5 | commit6 clean3 commit7 ... + // | <------------------ instants to archive --------------------> | + // + // CommitInstantsToArchive: commit1, commit2, commit3, commit4, commit5 + // CleanAndRollbackInstantsToArchive: clean1, clean2, rollback1 + return cleanAndRollbackTimeline.getInstantsAsStream() - .collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream() - .map(hoodieInstants -> { - if (hoodieInstants.size() > this.maxInstantsToKeep) { - return hoodieInstants.subList(0, hoodieInstants.size() - this.minInstantsToKeep); - } else { - return Collections.emptyList(); - } - }).flatMap(Collection::stream); + .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN, latestCommitInstantToArchive.getTimestamp())) + .collect(Collectors.toList()); } - private Stream getCommitInstantsToArchive() throws IOException { - // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify - // with logic above to avoid Stream.concat - HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); + private List getCommitInstantsToArchive() throws IOException { + HoodieTimeline completedCommitsTimeline = table.getCompletedCommitsTimeline(); + + if (completedCommitsTimeline.countInstants() <= maxInstantsToKeep) { + return Collections.emptyList(); + } + + // Step1: Get all candidates of earliestInstantToRetain. + List> earliestInstantToRetainCandidates = new ArrayList<>(); - // Get the oldest inflight instant and a completed commit before this inflight instant. - Option oldestPendingInstant = table.getActiveTimeline() + // 1. Earliest commit to retain is the greatest completed commit, that is less than the earliest pending instant. + // In some cases when inflight is the lowest commit then earliest commit to retain will be equal to the earliest + // inflight commit. + Option earliestPendingInstant = table.getActiveTimeline() .getWriteTimeline() .filter(instant -> !instant.isCompleted()) .firstInstant(); - // Oldest commit to retain is the greatest completed commit, that is less than the oldest pending instant. - // In some cases when inflight is the lowest commit then oldest commit to retain will be equal to oldest - // inflight commit. - Option oldestCommitToRetain; - if (oldestPendingInstant.isPresent()) { - Option completedCommitBeforeOldestPendingInstant = - Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants() - .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), - LESSER_THAN, oldestPendingInstant.get().getTimestamp())).findFirst()); - // Check if the completed instant is higher than the oldest inflight instant - // in that case update the oldestCommitToRetain to oldestInflight commit time. - if (!completedCommitBeforeOldestPendingInstant.isPresent()) { - oldestCommitToRetain = oldestPendingInstant; + Option earliestCommitToRetain; + if (earliestPendingInstant.isPresent()) { + Option completedCommitBeforeEarliestPendingInstant = Option.fromJavaOptional(completedCommitsTimeline + .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, earliestPendingInstant.get().getTimestamp())) + .getReverseOrderedInstants().findFirst()); + // Check if the completed instant is higher than the earliest inflight instant + // in that case update the earliestCommitToRetain to earliestInflight commit time. + if (!completedCommitBeforeEarliestPendingInstant.isPresent()) { + earliestCommitToRetain = earliestPendingInstant; } else { - oldestCommitToRetain = completedCommitBeforeOldestPendingInstant; + earliestCommitToRetain = completedCommitBeforeEarliestPendingInstant; } } else { - oldestCommitToRetain = Option.empty(); + earliestCommitToRetain = Option.empty(); } - - // NOTE: We cannot have any holes in the commit timeline. - // We cannot archive any commits which are made after the first savepoint present, - // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled. - Option firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); - Set savepointTimestamps = table.getSavepointTimestamps(); - if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) { - // For Merge-On-Read table, inline or async compaction is enabled - // We need to make sure that there are enough delta commits in the active timeline - // to trigger compaction scheduling, when the trigger strategy of compaction is - // NUM_COMMITS or NUM_AND_TIME. - Option oldestInstantToRetainForCompaction = - (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ - && (config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_COMMITS - || config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_AND_TIME)) - ? CompactionUtils.getOldestInstantToRetainForCompaction( - table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax()) - : Option.empty(); - - // The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned, - // without the replaced files metadata on the timeline, the fs view would expose duplicates for readers. - // Meanwhile, when inline or async clustering is enabled, we need to ensure that there is a commit in the active timeline - // to check whether the file slice generated in pending clustering after archive isn't committed. - Option oldestInstantToRetainForClustering = - ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); - - // Actually do the commits - Stream instantToArchiveStream = commitTimeline.getInstantsAsStream() - .filter(s -> { - if (config.shouldArchiveBeyondSavepoint()) { - // skip savepoint commits and proceed further - return !savepointTimestamps.contains(s.getTimestamp()); - } else { - // if no savepoint present, then don't filter - // stop at first savepoint commit - return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); - } - }).filter(s -> { - // oldestCommitToRetain is the highest completed commit instant that is less than the oldest inflight instant. - // By filtering out any commit >= oldestCommitToRetain, we can ensure there are no gaps in the timeline - // when inflight commits are present. - return oldestCommitToRetain - .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) - .orElse(true); - }).filter(s -> - oldestInstantToRetainForCompaction.map(instantToRetain -> - compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) - .orElse(true) - ).filter(s -> - oldestInstantToRetainForClustering.map(instantToRetain -> - HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) - .orElse(true) - ); - return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep); - } else { - return Stream.empty(); - } - } - - private Stream getInstantsToArchive() throws IOException { - if (config.isMetaserverEnabled()) { - return Stream.empty(); - } - - // For archiving and cleaning instants, we need to include intermediate state files if they exist - HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); - Map, List> groupByTsAction = rawActiveTimeline.getInstantsAsStream() - .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(), - HoodieInstant.getComparableAction(i.getAction())))); - - Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); - - // If metadata table is enabled, do not archive instants which are more recent than the last compaction on the + earliestInstantToRetainCandidates.add(earliestCommitToRetain); + + // 2. For Merge-On-Read table, inline or async compaction is enabled + // We need to make sure that there are enough delta commits in the active timeline + // to trigger compaction scheduling, when the trigger strategy of compaction is + // NUM_COMMITS or NUM_AND_TIME. + Option earliestInstantToRetainForCompaction = + (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ + && (config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_COMMITS + || config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_AND_TIME)) + ? CompactionUtils.getEarliestInstantToRetainForCompaction( + table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax()) + : Option.empty(); + earliestInstantToRetainCandidates.add(earliestInstantToRetainForCompaction); + + // 3. The clustering commit instant can not be archived unless we ensure that the replaced files have been cleaned, + // without the replaced files metadata on the timeline, the fs view would expose duplicates for readers. + // Meanwhile, when inline or async clustering is enabled, we need to ensure that there is a commit in the active timeline + // to check whether the file slice generated in pending clustering after archive isn't committed. + Option earliestInstantToRetainForClustering = + ClusteringUtils.getEarliestInstantToRetainForClustering(table.getActiveTimeline(), table.getMetaClient()); + earliestInstantToRetainCandidates.add(earliestInstantToRetainForClustering); + + // 4. If metadata table is enabled, do not archive instants which are more recent than the last compaction on the // metadata table. if (table.getMetaClient().getTableConfig().isMetadataTableAvailable()) { try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(), config.getBasePath())) { Option latestCompactionTime = tableMetadata.getLatestCompactionTime(); if (!latestCompactionTime.isPresent()) { LOG.info("Not archiving as there is no compaction yet on the metadata table"); - instants = Stream.empty(); + return Collections.emptyList(); } else { LOG.info("Limiting archiving of instants to latest compaction on metadata table at " + latestCompactionTime.get()); - instants = instants.filter(instant -> compareTimestamps(instant.getTimestamp(), LESSER_THAN, - latestCompactionTime.get())); + earliestInstantToRetainCandidates.add(Option.of(new HoodieInstant( + HoodieInstant.State.COMPLETED, COMPACTION_ACTION, latestCompactionTime.get()))); } } catch (Exception e) { throw new HoodieException("Error limiting instant archival based on metadata table", e); } } + // 5. If this is a metadata table, do not archive the commits that live in data set + // active timeline. This is required by metadata table, + // see HoodieTableMetadataUtil#processRollbackMetadata for details. if (table.isMetadataTable()) { HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() .setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath())) @@ -281,16 +243,62 @@ private Stream getInstantsToArchive() throws IOException { // CLEAN or ROLLBACK instant can block the archive of metadata table timeline and causes // the active timeline of metadata table to be extremely long, leading to performance issues // for loading the timeline. - if (qualifiedEarliestInstant.isPresent()) { - instants = instants.filter(instant -> - compareTimestamps( - instant.getTimestamp(), - HoodieTimeline.LESSER_THAN, - qualifiedEarliestInstant.get().getTimestamp())); - } + earliestInstantToRetainCandidates.add(qualifiedEarliestInstant); } - return instants.map(hoodieInstant -> { + // Choose the instant in earliestInstantToRetainCandidates with the smallest + // timestamp as earliestInstantToRetain. + java.util.Optional earliestInstantToRetain = earliestInstantToRetainCandidates + .stream() + .filter(Option::isPresent) + .map(Option::get) + .min(HoodieInstant.COMPARATOR); + + // Step2: We cannot archive any commits which are made after the first savepoint present, + // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled. + Option firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); + Set savepointTimestamps = table.getSavepointTimestamps(); + + Stream instantToArchiveStream = completedCommitsTimeline.getInstantsAsStream() + .filter(s -> { + if (config.shouldArchiveBeyondSavepoint()) { + // skip savepoint commits and proceed further + return !savepointTimestamps.contains(s.getTimestamp()); + } else { + // if no savepoint present, then don't filter + // stop at first savepoint commit + return !firstSavepoint.isPresent() || compareTimestamps(s.getTimestamp(), LESSER_THAN, firstSavepoint.get().getTimestamp()); + } + }).filter(s -> earliestInstantToRetain + .map(instant -> compareTimestamps(s.getTimestamp(), LESSER_THAN, instant.getTimestamp())) + .orElse(true)); + return instantToArchiveStream.limit(completedCommitsTimeline.countInstants() - minInstantsToKeep) + .collect(Collectors.toList()); + } + + private Stream getInstantsToArchive() throws IOException { + if (config.isMetaserverEnabled()) { + return Stream.empty(); + } + + // First get commit instants to archive. + List instantsToArchive = getCommitInstantsToArchive(); + if (!instantsToArchive.isEmpty()) { + HoodieInstant latestCommitInstantToArchive = instantsToArchive.get(instantsToArchive.size() - 1); + // Then get clean and rollback instants to archive. + List cleanAndRollbackInstantsToArchive = + getCleanAndRollbackInstantsToArchive(latestCommitInstantToArchive); + instantsToArchive.addAll(cleanAndRollbackInstantsToArchive); + instantsToArchive.sort(HoodieInstant.COMPARATOR); + } + + // For archive, we need to include instant's all states. + HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); + Map, List> groupByTsAction = rawActiveTimeline.getInstantsAsStream() + .collect(Collectors.groupingBy(i -> Pair.of(i.getTimestamp(), + HoodieInstant.getComparableAction(i.getAction())))); + + return instantsToArchive.stream().map(hoodieInstant -> { List instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction()))); return ActiveAction.fromInstants(instantsToStream); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index d89c876bdfcd1..39d6a9646b5ce 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -506,7 +506,7 @@ public Pair> getDeletePaths(String partitionPath, O } /** - * Returns earliest commit to retain based on cleaning policy. + * Returns the earliest commit to retain based on cleaning policy. */ public Option getEarliestCommitToRetain() { return CleanerUtils.getEarliestCommitToRetain( diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 9fd25bf0f1332..7044bb75130cb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -831,12 +831,12 @@ public void testArchiveRollbacksTestTable(boolean enableMetadata) throws Excepti // only time when archival will kick in List expectedArchivedInstants = new ArrayList<>(); expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000003"))); - expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000004"), HoodieTimeline.ROLLBACK_ACTION)); + expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Collections.singletonList("00000002"), HoodieTimeline.ROLLBACK_ACTION)); List expectedActiveInstants = new ArrayList<>(); expectedActiveInstants.addAll(getActiveCommitInstants( Arrays.asList("00000005", "00000007", "00000009", "00000011"))); expectedActiveInstants.addAll(getActiveCommitInstants( - Arrays.asList("00000006", "00000008", "00000010", "00000012"), HoodieTimeline.ROLLBACK_ACTION)); + Arrays.asList("00000004", "00000006", "00000008", "00000010", "00000012"), HoodieTimeline.ROLLBACK_ACTION)); verifyArchival(expectedArchivedInstants, expectedActiveInstants, commitsAfterArchival); } } @@ -977,25 +977,21 @@ public void testConvertCommitMetadata() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exception { - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 4, 5, 2); - - // min archival commits is 4 and max archival commits is 5 - // (either clean commits has to be > 5 or commits has to be greater than 5) - // and so, after 6th instant, 2 instants will be archived. - // 1,2,3,4,5,6 : after archival -> 1,3,4,5,6 - // (because, 2,3,4,5 and 6 are clean instants and are eligible for archival) - // after 7th and 9th instant no-op wrt archival. After 8th instant, - // archival kicks in when metadata table is enabled. + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 8); + // Min archival commits is 2 and max archival commits is 4. + // When metadata table is not enabled, after 5th write instant, archive will be triggered. + // When metadata table is enabled, after 8th instant (6 write instants + 2 clean instants) >= maxDeltaCommitsMetadataTable, + // archival kicks in when compaction in metadata table triggered. Map cleanStats = new HashMap<>(); cleanStats.put("p1", 1); cleanStats.put("p2", 2); - for (int i = 1; i <= 10; i++) { + for (int i = 1; i <= 8; i++) { if (i == 1) { - testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 20); - } else if (i <= 7 || i == 9) { - testTable.doClean("0000000" + i, cleanStats); + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20); + } else if (i <= 3) { + testTable.doClean(String.format("%08d", i), cleanStats); } else { - testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); } // trigger archival Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); @@ -1005,37 +1001,34 @@ public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exce assertEquals(originalCommits, commitsAfterArchival); } else if (i == 7) { if (!enableMetadata) { - // 1,2,3,4,5,6,7 : after archival -> 1,4,5,6,7 (bcoz, 2,3,4,5,6,7 are clean instants and are eligible for archival) - List expectedActiveInstants = new ArrayList<>(); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); - expectedActiveInstants.addAll( - getActiveCommitInstants(Arrays.asList("00000004", "00000005", "00000006", "00000007"), HoodieTimeline.CLEAN_ACTION)); - verifyArchival(getAllArchivedCommitInstants( - Arrays.asList("00000002", "00000003"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + // do archive: + // clean: 2,3: after archival -> null + // write: 1,4,5,6,7: after archival -> 6, 7 + List expectedActiveInstants = new ArrayList<>(getActiveCommitInstants(Arrays.asList("00000006", "00000007"))); + + List expectedArchiveInstants = new ArrayList<>(); + expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000004", "00000005"))); + expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003"), HoodieTimeline.CLEAN_ACTION)); + + verifyArchival(expectedArchiveInstants, expectedActiveInstants, commitsAfterArchival); } else { - // with metadata enabled, archival in data table is fenced based on compaction in metadata table. Clean commits in data table will not trigger compaction in - // metadata table. - List expectedActiveInstants = new ArrayList<>(); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001"))); - expectedActiveInstants.addAll(getActiveCommitInstants( - Arrays.asList("00000002", "00000003", "00000004", "00000005", "00000006", "00000007"), HoodieTimeline.CLEAN_ACTION)); - verifyArchival(getAllArchivedCommitInstants(Collections.emptyList(), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + // with metadata enabled, archival in data table is fenced based on compaction in metadata table. + assertEquals(originalCommits, commitsAfterArchival); } } else { if (!enableMetadata) { assertEquals(originalCommits, commitsAfterArchival); } else { - if (i == 8) { - // when i == 7 compaction in metadata table will be triggered - // and after wards archival in datatable will kick in when i == 8. - // 1,2,3,4,5,6,7,8 : after archival -> 1,4,5,6,7,8 (bcoz, 2,3,4,5 and 6 are clean commits and are eligible for archival) - List expectedActiveInstants = new ArrayList<>(); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001", "00000008"))); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000004", "00000005", "00000006", "00000007"), HoodieTimeline.CLEAN_ACTION)); - verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003"), HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); - } else { - assertEquals(originalCommits, commitsAfterArchival); - } + // when i == 8 compaction in metadata table will be triggered, and then allow archive: + // clean: 2,3: after archival -> null + // write: 1,4,5,6,7,8: after archival -> 7, 8 + List expectedActiveInstants = new ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008"))); + + List expectedArchiveInstants = new ArrayList<>(); + expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000004", "00000005", "00000006"))); + expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003"), HoodieTimeline.CLEAN_ACTION)); + + verifyArchival(expectedArchiveInstants, expectedActiveInstants, commitsAfterArchival); } } } @@ -1043,40 +1036,43 @@ public void testArchiveTableWithCleanCommits(boolean enableMetadata) throws Exce @Test public void testArchiveRollbacksAndCleanTestTable() throws Exception { - int minArchiveCommits = 4; - int maxArchiveCommits = 9; + int minArchiveCommits = 2; + int maxArchiveCommits = 4; HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, minArchiveCommits, maxArchiveCommits, 2); - // trigger 1 commit to add lot of files so that future cleans can clean them up - testTable.doWriteOperation("00000001", WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20); + // trigger 1 commit to add a lot of files so that future cleans can clean them up + testTable.doWriteOperation(String.format("%08d", 1), WriteOperationType.UPSERT, Arrays.asList("p1", "p2"), Arrays.asList("p1", "p2"), 20); Map partitionToFileDeleteCount = new HashMap<>(); partitionToFileDeleteCount.put("p1", 1); partitionToFileDeleteCount.put("p2", 1); - // we are triggering 10 clean commits. (1 is commit, 2 -> 11 is clean) - for (int i = 2; i <= (maxArchiveCommits + 2); i++) { - testTable.doClean((i > 9 ? ("000000") : ("0000000")) + i, partitionToFileDeleteCount); + + for (int i = 2; i < 5; i++) { + testTable.doClean(String.format("%08d", i), partitionToFileDeleteCount); } - // we are triggering 7 commits and 7 rollbacks for the same - for (int i = 12; i <= (2 * maxArchiveCommits); i += 2) { - testTable.doWriteOperation("000000" + i, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); - testTable.doRollback("000000" + i, "000000" + (i + 1)); + for (int i = 5; i <= 11; i += 2) { + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doRollback(String.format("%08d", i), String.format("%08d", i + 1)); } - // trigger archival + // trigger archival: + // clean: 2,3: after archival -> null + // write: 1,5,7,9,11: after archival -> 9,11 + // rollback: 6,8,10,12: after archival -> 8,10,12 Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); - List originalCommits = commitsList.getKey(); List commitsAfterArchival = commitsList.getValue(); - // out of 10 clean commits, 6 will be archived. 2 to 7. 8 to 11 will be active. - // wrt regular commits, there aren't 9 commits yet and so all of them will be active. List expectedActiveInstants = new ArrayList<>(); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009", "00000010", "00000011"), HoodieTimeline.CLEAN_ACTION)); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000001", "00000012", "00000014", "00000016", "00000018"))); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000013", "00000015", "00000017", "00000019"), HoodieTimeline.ROLLBACK_ACTION)); - verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004", "00000005", "00000006", "00000007"), - HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000010", "00000012"), HoodieTimeline.ROLLBACK_ACTION)); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000009", "00000011"))); + + List expectedArchiveInstants = new ArrayList<>(); + expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000005", "00000007"))); + expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000004"), HoodieTimeline.CLEAN_ACTION)); + expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Collections.singletonList("00000006"), HoodieTimeline.ROLLBACK_ACTION)); + + verifyArchival(expectedArchiveInstants, expectedActiveInstants, commitsAfterArchival); } @ParameterizedTest @@ -1098,13 +1094,13 @@ public void testArchiveCompletedRollbackAndClean(boolean isEmpty, boolean enable int startInstant = 1; List expectedArchivedInstants = new ArrayList<>(); for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) { - createCleanMetadata(startInstant + "", false, false, isEmpty || i % 2 == 0); - expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, startInstant + "")); + createCleanMetadata(String.format("%02d", startInstant), false, false, isEmpty || i % 2 == 0); + expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, String.format("%02d", startInstant))); } for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) { createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false, isEmpty || i % 2 == 0); - expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION, startInstant + "")); + expectedArchivedInstants.add(new HoodieInstant(State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION, String.format("%02d", startInstant))); } if (enableMetadataTable) { @@ -1121,8 +1117,11 @@ public void testArchiveCompletedRollbackAndClean(boolean isEmpty, boolean enable Stream currentInstants = metaClient.getActiveTimeline().reload().getInstantsAsStream(); Map> actionInstantMap = currentInstants.collect(Collectors.groupingBy(HoodieInstant::getAction)); - assertTrue(actionInstantMap.containsKey("clean"), "Clean Action key must be preset"); - assertEquals(minInstantsToKeep, actionInstantMap.get("clean").size(), "Should have min instant"); + // The commit order is: clean, clean, clean, ..., commit, rollback, commit, rollback ... + // So after archive, actionInstantMap will contain commit and rollback, + // the number will be equal to minInstantsToKeep + assertTrue(actionInstantMap.containsKey("commit"), "Commit Action key must be preset"); + assertEquals(minInstantsToKeep, actionInstantMap.get("commit").size(), "Should have min instant"); assertTrue(actionInstantMap.containsKey("rollback"), "Rollback Action key must be preset"); assertEquals(minInstantsToKeep, actionInstantMap.get("rollback").size(), "Should have min instant"); @@ -1136,43 +1135,6 @@ public void testArchiveCompletedRollbackAndClean(boolean isEmpty, boolean enable expectedArchivedInstants.forEach(entry -> assertTrue(metaClient.getArchivedTimeline().containsInstant(entry))); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testArchiveInflightClean(boolean enableMetadataTable) throws Exception { - init(); - HoodieWriteConfig cfg = - HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) - .withParallelism(2, 2).forTable("test-trip-table") - .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) - .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() - .withRemoteServerPort(timelineServicePort).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build()) - .build(); - metaClient = HoodieTableMetaClient.reload(metaClient); - - createCleanMetadata("10", false); - createCleanMetadata("11", false); - HoodieInstant notArchivedInstant1 = createCleanMetadata("12", false); - HoodieInstant notArchivedInstant2 = createCleanMetadata("13", false); - HoodieInstant notArchivedInstant3 = createCleanMetadata("14", true); - - if (enableMetadataTable) { - // Simulate a compaction commit in metadata table timeline - // so the archival in data table can happen - createCompactionCommitInMetadataTable(hadoopConf, wrapperFs, basePath, "14"); - } - - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); - - archiver.archiveIfRequired(context); - - List notArchivedInstants = metaClient.getActiveTimeline().reload().getInstants(); - assertEquals(3, notArchivedInstants.size(), "Not archived instants should be 3"); - assertEquals(notArchivedInstants, Arrays.asList(notArchivedInstant1, notArchivedInstant2, notArchivedInstant3), ""); - } - @Test public void testArchiveTableWithMetadataTableCompaction() throws Exception { HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 4, 5, 7); @@ -1637,14 +1599,14 @@ private Pair, List> archiveAndGetCommitsList( } private void verifyArchival(List expectedArchivedInstants, List expectedActiveInstants, List commitsAfterArchival) { - Collections.sort(expectedActiveInstants, Comparator.comparing(HoodieInstant::getTimestamp)); - Collections.sort(commitsAfterArchival, Comparator.comparing(HoodieInstant::getTimestamp)); + expectedActiveInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp)); + commitsAfterArchival.sort(Comparator.comparing(HoodieInstant::getTimestamp)); assertEquals(expectedActiveInstants, commitsAfterArchival); expectedArchivedInstants.forEach(entry -> assertFalse(commitsAfterArchival.contains(entry))); HoodieArchivedTimeline archivedTimeline = new HoodieArchivedTimeline(metaClient); List actualArchivedInstants = archivedTimeline.getInstants(); - Collections.sort(actualArchivedInstants, Comparator.comparing(HoodieInstant::getTimestamp)); - Collections.sort(expectedArchivedInstants, Comparator.comparing(HoodieInstant::getTimestamp)); + actualArchivedInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp)); + expectedArchivedInstants.sort(Comparator.comparing(HoodieInstant::getTimestamp)); assertEquals(actualArchivedInstants, expectedArchivedInstants); HoodieTimeline timeline = metaClient.getActiveTimeline(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index e50431c7398b9..894db11a2d11d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -236,14 +236,14 @@ public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClien } /** - * Returns the oldest instant to retain. - * Make sure the clustering instant won't be archived before cleaned, and the oldest inflight clustering instant has a previous commit. + * Returns the earliest instant to retain. + * Make sure the clustering instant won't be archived before cleaned, and the earliest inflight clustering instant has a previous commit. * * @param activeTimeline The active timeline * @param metaClient The meta client - * @return the oldest instant to retain for clustering + * @return the earliest instant to retain for clustering */ - public static Option getOldestInstantToRetainForClustering( + public static Option getEarliestInstantToRetainForClustering( HoodieActiveTimeline activeTimeline, HoodieTableMetaClient metaClient) throws IOException { Option oldestInstantToRetain = Option.empty(); HoodieTimeline replaceTimeline = activeTimeline.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.REPLACE_COMMIT_ACTION)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index 0f41f1314e1f7..6b74dd869a1ec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -339,7 +339,7 @@ public static Option> getDeltaCommitsSinceLa } /** - * Gets the oldest instant to retain for MOR compaction. + * Gets the earliest instant to retain for MOR compaction. * If there is no completed compaction, * num delta commits >= "hoodie.compact.inline.max.delta.commits" * If there is a completed compaction, @@ -348,9 +348,9 @@ public static Option> getDeltaCommitsSinceLa * @param activeTimeline Active timeline of a table. * @param maxDeltaCommits Maximum number of delta commits that trigger the compaction plan, * i.e., "hoodie.compact.inline.max.delta.commits". - * @return the oldest instant to keep for MOR compaction. + * @return the earliest instant to keep for MOR compaction. */ - public static Option getOldestInstantToRetainForCompaction( + public static Option getEarliestInstantToRetainForCompaction( HoodieActiveTimeline activeTimeline, int maxDeltaCommits) { Option> deltaCommitsInfoOption = CompactionUtils.getDeltaCommitsSinceLatestCompaction(activeTimeline); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java index 4e76d25f41fce..9028fe63fd481 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java @@ -146,7 +146,7 @@ public void testGetOldestInstantToRetainForClustering() throws IOException { HoodieInstant inflightInstant3 = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant3, Option.empty()); HoodieInstant completedInstant3 = metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty()); metaClient.reloadActiveTimeline(); - Option actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); + Option actual = ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); assertTrue(actual.isPresent()); assertEquals(clusterTime1, actual.get().getTimestamp(), "no clean in timeline, retain first replace commit"); @@ -168,7 +168,7 @@ public void testGetOldestInstantToRetainForClustering() throws IOException { metaClient.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant4, TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata)); metaClient.reloadActiveTimeline(); - actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); + actual = ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); assertEquals(clusterTime3, actual.get().getTimestamp(), "retain the first replace commit after the earliestInstantToRetain "); } @@ -206,7 +206,7 @@ public void testGetOldestInstantToRetainForClusteringKeepFileVersion() throws IO metaClient.getActiveTimeline().transitionReplaceInflightToComplete(inflightInstant3, Option.empty()); metaClient.reloadActiveTimeline(); - Option actual = ClusteringUtils.getOldestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); + Option actual = ClusteringUtils.getEarliestInstantToRetainForClustering(metaClient.getActiveTimeline(), metaClient); assertEquals(clusterTime2, actual.get().getTimestamp(), "retain the first replace commit after the last complete clean "); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index b7855bec76738..7347e2ab696fb 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -285,7 +285,7 @@ public void testGetDeltaCommitsSinceLatestCompactionWithEmptyDeltaCommits() { @ValueSource(booleans = {true, false}) public void testGetOldestInstantToKeepForCompaction(boolean hasCompletedCompaction) { HoodieActiveTimeline timeline = prepareTimeline(hasCompletedCompaction); - Option actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 20); + Option actual = CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 20); if (hasCompletedCompaction) { assertEquals(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), actual.get()); @@ -293,17 +293,17 @@ public void testGetOldestInstantToKeepForCompaction(boolean hasCompletedCompacti assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), actual.get()); } - actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 3); + actual = CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 3); assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), actual.get()); - actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 2); + actual = CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 2); assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), actual.get()); } @Test public void testGetOldestInstantToKeepForCompactionWithEmptyDeltaCommits() { HoodieActiveTimeline timeline = new MockHoodieActiveTimeline(); - assertEquals(Option.empty(), CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 20)); + assertEquals(Option.empty(), CompactionUtils.getEarliestInstantToRetainForCompaction(timeline, 20)); } private HoodieActiveTimeline prepareTimeline(boolean hasCompletedCompaction) {