diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index f4937de943e11..629b8115fcd6d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -28,13 +28,13 @@ import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.model.HoodieArchivedLogFile; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; @@ -46,6 +46,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; @@ -514,24 +515,27 @@ private Stream getInstantsToArchive() throws IOException { .setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath())) .setConf(metaClient.getHadoopConf()) .build(); - Option earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant(); - - if (config.shouldArchiveBeyondSavepoint()) { - // There are chances that there could be holes in the timeline due to archival and savepoint interplay. - // So, the first non-savepoint commit in the data timeline is considered as beginning of the active timeline. - Option firstNonSavepointCommit = dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit(); - if (firstNonSavepointCommit.isPresent()) { - String firstNonSavepointCommitTime = firstNonSavepointCommit.get().getTimestamp(); - instants = instants.filter(instant -> - compareTimestamps(instant.getTimestamp(), LESSER_THAN, firstNonSavepointCommitTime)); - } - } else { - // Do not archive the commits that live in data set active timeline. - // This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata for details. - if (earliestActiveDatasetCommit.isPresent()) { - instants = instants.filter(instant -> - compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp())); - } + Option qualifiedEarliestInstant = + TimelineUtils.getEarliestInstantForMetadataArchival( + dataMetaClient.getActiveTimeline(), config.shouldArchiveBeyondSavepoint()); + + // Do not archive the instants after the earliest commit (COMMIT, DELTA_COMMIT, and + // REPLACE_COMMIT only, considering non-savepoint commit only if enabling archive + // beyond savepoint) and the earliest inflight instant (all actions). + // This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata + // for details. + // Note that we cannot blindly use the earliest instant of all actions, because CLEAN and + // ROLLBACK instants are archived separately apart from commits (check + // HoodieTimelineArchiver#getCleanInstantsToArchive). If we do so, a very old completed + // CLEAN or ROLLBACK instant can block the archive of metadata table timeline and causes + // the active timeline of metadata table to be extremely long, leading to performance issues + // for loading the timeline. + if (qualifiedEarliestInstant.isPresent()) { + instants = instants.filter(instant -> + compareTimestamps( + instant.getTimestamp(), + HoodieTimeline.LESSER_THAN, + qualifiedEarliestInstant.get().getTimestamp())); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 72aba1f163816..4d7271364f4f3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -1301,8 +1301,15 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception { .setLoadActiveTimelineOnLoad(true).build(); for (int i = 1; i <= 17; i++) { - testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT, - i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + if (i != 2) { + testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT, + i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } else { + // For i == 2, roll back the first commit "00000001", so the active timeline of the + // data table has one rollback instant + // The completed rollback should not block the archival in the metadata table + testTable.doRollback("00000001", "00000002"); + } // archival archiveAndGetCommitsList(writeConfig); @@ -1323,10 +1330,9 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception { } else if (i == 8) { // i == 8 // The instant "00000000000000" was archived since it's less than - // the earliest instant on the dataset active timeline, - // the dataset active timeline has instants of range [00000001 ~ 00000008] - // because when it does the archiving, no compaction instant on the - // metadata active timeline exists yet. + // the earliest commit on the dataset active timeline, + // the dataset active timeline has instants: + // 00000002.rollback, 00000007.commit, 00000008.commit assertEquals(9, metadataTableInstants.size()); assertTrue(metadataTableInstants.contains( new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001"))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 1f9d416b2b545..368047a787769 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; @@ -39,6 +40,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION; + /** * TimelineUtils provides a common way to query incremental meta-data changes for a hoodie table. * @@ -87,15 +93,15 @@ public static List getDroppedPartitions(HoodieTimeline timeline) { public static List getAffectedPartitions(HoodieTimeline timeline) { return timeline.filterCompletedInstants().getInstantsAsStream().flatMap(s -> { switch (s.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.DELTA_COMMIT_ACTION: + case COMMIT_ACTION: + case DELTA_COMMIT_ACTION: try { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(s).get(), HoodieCommitMetadata.class); return commitMetadata.getPartitionToWriteStats().keySet().stream(); } catch (IOException e) { throw new HoodieIOException("Failed to get partitions written at " + s, e); - } - case HoodieTimeline.REPLACE_COMMIT_ACTION: + } + case REPLACE_COMMIT_ACTION: try { HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes( timeline.getInstantDetails(s).get(), HoodieReplaceCommitMetadata.class); @@ -148,11 +154,11 @@ public static List getAffectedPartitions(HoodieTimeline timeline) { * Get extra metadata for specified key from latest commit/deltacommit/replacecommit(eg. insert_overwrite) instant. */ public static Option getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) { - return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() + return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants() // exclude clustering commits for returning user stored extra metadata .filter(instant -> !isClusteringCommit(metaClient, instant)) .findFirst().map(instant -> - getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); + getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); } /** @@ -170,7 +176,7 @@ public static Option getExtraMetadataFromLatestIncludeClustering(HoodieT */ public static Map> getAllExtraMetadataForKey(HoodieTableMetaClient metaClient, String extraMetadataKey) { return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toMap( - HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, extraMetadataKey, instant))); + HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, extraMetadataKey, instant))); } private static Option getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) { @@ -184,10 +190,10 @@ private static Option getMetadataValue(HoodieTableMetaClient metaClient, throw new HoodieIOException("Unable to parse instant metadata " + instant, e); } } - + public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) { try { - if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { + if (REPLACE_COMMIT_ACTION.equals(instant.getAction())) { // replacecommit is used for multiple operations: insert_overwrite/cluster etc. // Check operation type to see if this instant is related to clustering. HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes( @@ -240,10 +246,53 @@ public static HoodieCommitMetadata getCommitMetadata( HoodieInstant instant, HoodieTimeline timeline) throws IOException { byte[] data = timeline.getInstantDetails(instant).get(); - if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + if (instant.getAction().equals(REPLACE_COMMIT_ACTION)) { return HoodieReplaceCommitMetadata.fromBytes(data, HoodieReplaceCommitMetadata.class); } else { return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); } } + + /** + * Gets the qualified earliest instant from the active timeline of the data table + * for the archival in metadata table. + *

+ * the qualified earliest instant is chosen as the earlier one between the earliest + * commit (COMMIT, DELTA_COMMIT, and REPLACE_COMMIT only, considering non-savepoint + * commit only if enabling archive beyond savepoint) and the earliest inflight + * instant (all actions). + * + * @param dataTableActiveTimeline the active timeline of the data table. + * @param shouldArchiveBeyondSavepoint whether to archive beyond savepoint. + * @return the instant meeting the requirement. + */ + public static Option getEarliestInstantForMetadataArchival( + HoodieActiveTimeline dataTableActiveTimeline, boolean shouldArchiveBeyondSavepoint) { + // This is for commits only, not including CLEAN, ROLLBACK, etc. + // When archive beyond savepoint is enabled, there are chances that there could be holes + // in the timeline due to archival and savepoint interplay. So, the first non-savepoint + // commit in the data timeline is considered as beginning of the active timeline. + Option earliestCommit = shouldArchiveBeyondSavepoint + ? dataTableActiveTimeline.getTimelineOfActions( + CollectionUtils.createSet( + COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, SAVEPOINT_ACTION)) + .getFirstNonSavepointCommit() + : dataTableActiveTimeline.getCommitsTimeline().firstInstant(); + // This is for all instants which are in-flight + Option earliestInflight = + dataTableActiveTimeline.filterInflightsAndRequested().firstInstant(); + + if (earliestCommit.isPresent() && earliestInflight.isPresent()) { + if (earliestCommit.get().compareTo(earliestInflight.get()) < 0) { + return earliestCommit; + } + return earliestInflight; + } else if (earliestCommit.isPresent()) { + return earliestCommit; + } else if (earliestInflight.isPresent()) { + return earliestInflight; + } else { + return Option.empty(); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 5e91118b26962..0c09d91163e86 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -56,8 +56,13 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -136,7 +141,7 @@ public void testGetPartitions() throws IOException { activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap()))); - HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts); + HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts); activeTimeline.createNewInstant(cleanInstant); activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(olderPartition, ts)); } @@ -175,7 +180,7 @@ public void testGetPartitionsUnPartitioned() throws IOException { activeTimeline.createNewInstant(instant); activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, Collections.emptyMap()))); - HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts); + HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts); activeTimeline.createNewInstant(cleanInstant); activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(partitionPath, ts)); } @@ -339,6 +344,81 @@ public void verifyTimeline(List expectedInstants, HoodieTimeline ); } + @Test + public void testGetEarliestInstantForMetadataArchival() throws IOException { + // Empty timeline + assertEquals( + Option.empty(), + TimelineUtils.getEarliestInstantForMetadataArchival( + prepareActiveTimeline(new ArrayList<>()), false)); + + // Earlier request clean action before commits + assertEquals( + Option.of(new HoodieInstant(REQUESTED, CLEAN_ACTION, "003")), + TimelineUtils.getEarliestInstantForMetadataArchival( + prepareActiveTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"), + new HoodieInstant(COMPLETED, CLEAN_ACTION, "002"), + new HoodieInstant(REQUESTED, CLEAN_ACTION, "003"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION, "011"))), false)); + + // No inflight instants + assertEquals( + Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "010")), + TimelineUtils.getEarliestInstantForMetadataArchival( + prepareActiveTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"), + new HoodieInstant(COMPLETED, CLEAN_ACTION, "002"), + new HoodieInstant(COMPLETED, CLEAN_ACTION, "003"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION, "011"))), false)); + + // Rollbacks only + assertEquals( + Option.of(new HoodieInstant(INFLIGHT, ROLLBACK_ACTION, "003")), + TimelineUtils.getEarliestInstantForMetadataArchival( + prepareActiveTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"), + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "002"), + new HoodieInstant(INFLIGHT, ROLLBACK_ACTION, "003"))), false)); + + assertEquals( + Option.empty(), + TimelineUtils.getEarliestInstantForMetadataArchival( + prepareActiveTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"), + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "002"), + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "003"))), false)); + + // With savepoints + HoodieActiveTimeline timeline = prepareActiveTimeline( + Arrays.asList( + new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "003"), + new HoodieInstant(COMPLETED, SAVEPOINT_ACTION, "003"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"), + new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"))); + assertEquals( + Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "003")), + TimelineUtils.getEarliestInstantForMetadataArchival(timeline, false)); + assertEquals( + Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "010")), + TimelineUtils.getEarliestInstantForMetadataArchival(timeline, true)); + } + + private HoodieActiveTimeline prepareActiveTimeline( + List activeInstants) throws IOException { + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true))) + .thenReturn(activeInstants); + return new HoodieActiveTimeline(mockMetaClient); + } + private void verifyExtraMetadataLatestValue(String extraMetadataKey, String expected, boolean includeClustering) { final Option extraLatestValue; if (includeClustering) {