diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index c53554d8e04d2..2992f4abd4c9e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -64,6 +64,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -76,12 +77,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; /** * Archiver to bound the growth of files under .hoodie meta path. @@ -409,9 +412,11 @@ private Stream getCommitInstantsToArchive() { .getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterInflights().firstInstant(); - // We cannot have any holes in the commit timeline. We cannot archive any commits which are - // made after the first savepoint present. + // NOTE: We cannot have any holes in the commit timeline. + // We cannot archive any commits which are made after the first savepoint present, + // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled. Option firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); + Set savepointTimestamps = table.getSavepointTimestamps(); if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) { // For Merge-On-Read table, inline or async compaction is enabled // We need to make sure that there are enough delta commits in the active timeline @@ -428,28 +433,33 @@ private Stream getCommitInstantsToArchive() { // Actually do the commits Stream instantToArchiveStream = commitTimeline.getInstants() .filter(s -> { - // if no savepoint present, then don't filter - return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); + if (config.shouldArchiveBeyondSavepoint()) { + // skip savepoint commits and proceed further + return !savepointTimestamps.contains(s.getTimestamp()); + } else { + // if no savepoint present, then don't filter + // stop at first savepoint commit + return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); + } }).filter(s -> { // Ensure commits >= oldest pending compaction commit is retained return oldestPendingCompactionAndReplaceInstant - .map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) + .map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); }).filter(s -> { // We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't // get archived, i.e, instants after the oldestInflight are retained on the timeline if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) { return oldestInflightCommitInstant.map(instant -> - HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) + compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); } return true; }).filter(s -> oldestInstantToRetainForCompaction.map(instantToRetain -> - HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) + compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) .orElse(true) ); - return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep); } else { return Stream.empty(); @@ -479,7 +489,7 @@ private Stream getInstantsToArchive() { instants = Stream.empty(); } else { LOG.info("Limiting archiving of instants to latest compaction on metadata table at " + latestCompactionTime.get()); - instants = instants.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, + instants = instants.filter(instant -> compareTimestamps(instant.getTimestamp(), LESSER_THAN, latestCompactionTime.get())); } } catch (Exception e) { @@ -487,18 +497,29 @@ private Stream getInstantsToArchive() { } } - // If this is a metadata table, do not archive the commits that live in data set - // active timeline. This is required by metadata table, - // see HoodieTableMetadataUtil#processRollbackMetadata for details. if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) { HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() .setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath())) .setConf(metaClient.getHadoopConf()) .build(); - Option earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp); - if (earliestActiveDatasetCommit.isPresent()) { - instants = instants.filter(instant -> - HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get())); + Option earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant(); + + if (config.shouldArchiveBeyondSavepoint()) { + // There are chances that there could be holes in the timeline due to archival and savepoint interplay. + // So, the first non-savepoint commit in the data timeline is considered as beginning of the active timeline. + Option firstNonSavepointCommit = dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit(); + if (firstNonSavepointCommit.isPresent()) { + String firstNonSavepointCommitTime = firstNonSavepointCommit.get().getTimestamp(); + instants = instants.filter(instant -> + compareTimestamps(instant.getTimestamp(), LESSER_THAN, firstNonSavepointCommitTime)); + } + } else { + // Do not archive the commits that live in data set active timeline. + // This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata for details. + if (earliestActiveDatasetCommit.isPresent()) { + instants = instants.filter(instant -> + compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp())); + } } } @@ -589,7 +610,7 @@ private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thre } List instantsToBeDeleted = - instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(), + instants.stream().filter(instant1 -> compareTimestamps(instant1.getTimestamp(), LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList()); for (HoodieInstant deleteInstant : instantsToBeDeleted) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java index 32bccc3a3d18f..3244b422839ef 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.config.HoodieConfig; import javax.annotation.concurrent.Immutable; + import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -34,8 +35,8 @@ */ @Immutable @ConfigClassProperty(name = "Archival Configs", - groupName = ConfigGroups.Names.WRITE_CLIENT, - description = "Configurations that control archival.") + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "Configurations that control archival.") public class HoodieArchivalConfig extends HoodieConfig { public static final ConfigProperty AUTO_ARCHIVE = ConfigProperty @@ -92,6 +93,13 @@ public class HoodieArchivalConfig extends HoodieConfig { .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's" + " useful when storage scheme doesn't support append operation."); + public static final ConfigProperty ARCHIVE_BEYOND_SAVEPOINT = ConfigProperty + .key("hoodie.archive.beyond.savepoint") + .defaultValue(false) + .sinceVersion("0.12.0") + .withDocumentation("If enabled, archival will proceed beyond savepoint, skipping savepoint commits. " + + "If disabled, archival will stop at the earliest savepoint commit."); + /** * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */ @@ -107,7 +115,9 @@ public class HoodieArchivalConfig extends HoodieConfig { */ @Deprecated public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key(); - /** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */ + /** + * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead + */ @Deprecated private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue(); /** @@ -186,6 +196,11 @@ public HoodieArchivalConfig.Builder withCommitsArchivalBatchSize(int batchSize) return this; } + public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) { + archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT, String.valueOf(archiveBeyondSavepoint)); + return this; + } + public HoodieArchivalConfig build() { archivalConfig.setDefaults(HoodieArchivalConfig.class.getName()); return archivalConfig; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index f787232c50f8f..4902c3861ff91 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1209,7 +1209,11 @@ public boolean isAutoClean() { } public boolean getArchiveMergeEnable() { - return getBoolean(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE); + return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE); + } + + public boolean shouldArchiveBeyondSavepoint() { + return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT); } public long getArchiveMergeSmallFileLimitBytes() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 1e68f820d949e..5ca3aee764afe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -368,10 +368,10 @@ public HoodieTimeline getCompletedSavepointTimeline() { } /** - * Get the list of savepoints in this table. + * Get the list of savepoint timestamps in this table. */ - public List getSavepoints() { - return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + public Set getSavepointTimestamps() { + return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); } public HoodieActiveTimeline getActiveTimeline() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 79eef43b3c00a..f837d08afc02a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -104,7 +104,7 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable hoodieT * Get the list of data file names savepointed. */ public Stream getSavepointedDataFiles(String savepointTime) { - if (!hoodieTable.getSavepoints().contains(savepointTime)) { + if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) { throw new HoodieSavepointException( "Could not get data files for savepoint " + savepointTime + ". No such savepoint."); } @@ -227,7 +227,7 @@ private Pair> getFilesToCleanKeepingLatestVersions( + " file versions. "); List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints - List savepointedFiles = hoodieTable.getSavepoints().stream() + List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() .flatMap(this::getSavepointedDataFiles) .collect(Collectors.toList()); @@ -295,7 +295,7 @@ private Pair> getFilesToCleanKeepingLatestCommits(S List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints - List savepointedFiles = hoodieTable.getSavepoints().stream() + List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() .flatMap(this::getSavepointedDataFiles) .collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 4f41c4a44d9fb..0af05b2d6b0de 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -19,6 +19,7 @@ package org.apache.hudi.io; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.client.utils.MetadataConversionUtils; @@ -44,9 +45,9 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieLockConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -65,7 +66,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; @@ -88,6 +91,7 @@ import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable; +import static org.apache.hudi.config.HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -180,6 +184,33 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, long size, HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, WriteConcurrencyMode writeConcurrencyMode) throws Exception { + return initTestTableAndGetWriteConfig( + enableMetadata, + minArchivalCommits, + maxArchivalCommits, + maxDeltaCommits, + maxDeltaCommitsMetadataTable, + tableType, + enableArchiveMerge, + archiveFilesBatch, + size, + failedWritesCleaningPolicy, + writeConcurrencyMode, + ARCHIVE_BEYOND_SAVEPOINT.defaultValue()); + } + + private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, + int minArchivalCommits, + int maxArchivalCommits, + int maxDeltaCommits, + int maxDeltaCommitsMetadataTable, + HoodieTableType tableType, + boolean enableArchiveMerge, + int archiveFilesBatch, + long size, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + WriteConcurrencyMode writeConcurrencyMode, + boolean archiveProceedBeyondSavepoints) throws Exception { init(tableType); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) @@ -188,7 +219,8 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, .withArchiveMergeEnable(enableArchiveMerge) .withArchiveMergeFilesBatchSize(archiveFilesBatch) .withArchiveMergeSmallFileLimit(size) - .archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build()) + .archiveCommitsWith(minArchivalCommits, maxArchivalCommits) + .withArchiveBeyondSavepoint(archiveProceedBeyondSavepoints).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() @@ -249,6 +281,59 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception { + boolean enableMetadata = false; + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 5, 2, HoodieTableType.COPY_ON_WRITE, + false, 10, 209715200, HoodieFailedWritesCleaningPolicy.EAGER, WriteConcurrencyMode.SINGLE_WRITER, archiveBeyondSavepoint); + + // min archival commits is 2 and max archival commits is 4. and so, after 5th commit, 3 commits will be archived. + for (int i = 1; i < 5; i++) { + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + + // savepoint 3rd commit + String commitToSavepoint = String.format("%08d", 3); + HoodieSavepointMetadata savepointMetadata = testTable.doSavepoint(commitToSavepoint); + testTable.addSavepoint(commitToSavepoint, savepointMetadata); + + for (int i = 5; i < 7; i++) { + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + // trigger archival + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); + List originalCommits = commitsList.getKey(); + List commitsAfterArchival = commitsList.getValue(); + + if (archiveBeyondSavepoint) { + // retains only 2 commits. C3 and C8. and savepointed commit for C3. + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000004", "00000005")), + Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", "00000006")).stream(), getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream()) + .collect(Collectors.toList()), commitsAfterArchival); + } else { + // archives only C1 and C2. stops at first savepointed commit C3. + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002")), + Stream.concat(getActiveCommitInstants(Arrays.asList("00000003", "00000004", "00000005", "00000006")).stream(), + getActiveSavepointedCommitInstants(Arrays.asList("00000003")).stream()) + .collect(Collectors.toList()), commitsAfterArchival); + } + + for (int i = 7; i < 10; i++) { + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + + // once savepoint is removed. C3 will be archived. + testTable.deleteSavepoint(commitToSavepoint); + commitsList = archiveAndGetCommitsList(writeConfig); + originalCommits = commitsList.getKey(); + commitsAfterArchival = commitsList.getValue(); + + metaClient.reloadActiveTimeline(); + verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002","00000003", "00000004", "00000005", "00000006", "00000007")), + getActiveCommitInstants(Arrays.asList("00000008", "00000009")), commitsAfterArchival); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception { @@ -563,13 +648,22 @@ public void testNoArchivalUntilMaxArchiveConfigWithExtraInflightCommits(boolean assertEquals(originalCommits, commitsAfterArchival); } + private static Stream archiveCommitSavepointNoHoleParams() { + return Arrays.stream(new Boolean[][] { + {true, true}, + {false, true}, + {true, false}, + {false, false} + }).map(Arguments::of); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable) throws Exception { + @MethodSource("archiveCommitSavepointNoHoleParams") + public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable, boolean archiveBeyondSavepoint) throws Exception { init(); HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table") - .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).withArchiveBeyondSavepoint(archiveBeyondSavepoint).build()) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withRemoteServerPort(timelineServicePort).build()) @@ -596,14 +690,30 @@ public void testArchiveCommitSavepointNoHole(boolean enableMetadataTable) throws assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); assertTrue(archiver.archiveIfRequired(context)); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); - assertEquals(5, timeline.countInstants(), - "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); - assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")), - "Archived commits should always be safe"); - assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")), - "Archived commits should always be safe"); - assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")), - "Archived commits should always be safe"); + if (archiveBeyondSavepoint) { + // commits in active timeline = 101 and 105. + assertEquals(2, timeline.countInstants(), + "Since archiveBeyondSavepoint config is enabled, we will archive commits 102, 103 "); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")), + "Savepointed commits should always be safe"); + assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")), + "102 expected to be archived"); + assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")), + "103 expected to be archived"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105")), + "104 expected to be archived"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "105")), + "105 expected to be in active timeline"); + } else { + assertEquals(5, timeline.countInstants(), + "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")), + "Archived commits should always be safe"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102")), + "Archived commits should always be safe"); + assertTrue(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "103")), + "Archived commits should always be safe"); + } } @ParameterizedTest @@ -934,7 +1044,7 @@ public void testArchiveCompletedRollbackAndClean(boolean isEmpty, boolean enable HoodieInstant firstInstant = metaClient.reloadActiveTimeline().firstInstant().get(); expectedArchivedInstants = expectedArchivedInstants.stream() .filter(entry -> HoodieTimeline.compareTimestamps(entry.getTimestamp(), HoodieTimeline.LESSER_THAN, firstInstant.getTimestamp() - )).collect(Collectors.toList()); + )).collect(Collectors.toList()); expectedArchivedInstants.forEach(entry -> assertTrue(metaClient.getArchivedTimeline().containsInstant(entry))); } @@ -1283,7 +1393,7 @@ private void verifyArchival(List expectedArchivedInstants, List { // check safety - if (entry.getAction() != HoodieTimeline.ROLLBACK_ACTION) { + if (!entry.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) { assertTrue(timeline.containsOrBeforeTimelineStarts(entry.getTimestamp()), "Archived commits should always be safe"); } } @@ -1315,6 +1425,10 @@ private List getActiveCommitInstants(List commitTimes) { return getActiveCommitInstants(commitTimes, HoodieTimeline.COMMIT_ACTION); } + private List getActiveSavepointedCommitInstants(List commitTimes) { + return getActiveCommitInstants(commitTimes, HoodieTimeline.SAVEPOINT_ACTION); + } + private List getActiveCommitInstants(List commitTimes, String action) { List allInstants = new ArrayList<>(); commitTimes.forEach(entry -> allInstants.add(new HoodieInstant(State.COMPLETED, action, entry))); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java index bf6ce611e805d..9e407aa7660e5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java @@ -28,6 +28,10 @@ import java.util.TreeMap; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; + /** * A set of data/base files + set of log files, that make up an unit for all operations. */ @@ -118,21 +122,22 @@ public HoodieFileGroupId getFileGroupId() { * some log files, that are based off a commit or delta commit. */ private boolean isFileSliceCommitted(FileSlice slice) { - String maxCommitTime = lastInstant.get().getTimestamp(); - return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()) - && HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime); + if (!compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp())) { + return false; + } + return timeline.containsOrBeforeTimelineStarts(slice.getBaseInstantTime()); } /** - * Get all the the file slices including in-flight ones as seen in underlying file-system. + * Get all the file slices including in-flight ones as seen in underlying file system. */ public Stream getAllFileSlicesIncludingInflight() { return fileSlices.values().stream(); } /** - * Get latest file slices including in-flight ones. + * Get the latest file slices including inflight ones. */ public Option getLatestFileSlicesIncludingInflight() { return Option.fromJavaOptional(getAllFileSlicesIncludingInflight().findFirst()); @@ -169,8 +174,7 @@ public Option getLatestDataFile() { * Obtain the latest file slice, upto a instantTime i.e <= maxInstantTime. */ public Option getLatestFileSliceBeforeOrOn(String maxInstantTime) { - return Option.fromJavaOptional(getAllFileSlices().filter(slice -> HoodieTimeline - .compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)).findFirst()); + return Option.fromJavaOptional(getAllFileSlices().filter(slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, maxInstantTime)).findFirst()); } /** @@ -181,7 +185,7 @@ public Option getLatestFileSliceBeforeOrOn(String maxInstantTime) { */ public Option getLatestFileSliceBefore(String maxInstantTime) { return Option.fromJavaOptional(getAllFileSlices().filter( - slice -> HoodieTimeline.compareTimestamps(slice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, maxInstantTime)) + slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN, maxInstantTime)) .findFirst()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index ac1dd007d0527..e7970baf673e8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -35,6 +35,7 @@ import java.util.stream.Stream; import static java.util.Collections.reverse; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; /** * HoodieDefaultTimeline is a default implementation of the HoodieTimeline. It provides methods to inspect a @@ -118,7 +119,7 @@ public HoodieTimeline getContiguousCompletedWriteTimeline() { Option earliestPending = getWriteTimeline().filterInflightsAndRequested().firstInstant(); if (earliestPending.isPresent()) { return getWriteTimeline().filterCompletedInstants() - .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, earliestPending.get().getTimestamp())); + .filter(instant -> compareTimestamps(instant.getTimestamp(), LESSER_THAN, earliestPending.get().getTimestamp())); } return getWriteTimeline().filterCompletedInstants(); } @@ -156,34 +157,34 @@ public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) { @Override public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)).limit(numCommits), + .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)).limit(numCommits), details); } @Override public HoodieTimeline findInstantsAfter(String instantTime) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details); + .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details); } @Override public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime, int numCommits) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, commitTime)) + .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, commitTime)) .limit(numCommits), details); } @Override public HoodieDefaultTimeline findInstantsBefore(String instantTime) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantTime)), + .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN, instantTime)), details); } @Override public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, instantTime)), + .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, instantTime)), details); } @@ -362,11 +363,28 @@ public Stream getReverseOrderedInstants() { @Override public boolean isBeforeTimelineStarts(String instant) { - Option firstCommit = firstInstant(); - return firstCommit.isPresent() - && HoodieTimeline.compareTimestamps(instant, LESSER_THAN, firstCommit.get().getTimestamp()); + Option firstNonSavepointCommit = getFirstNonSavepointCommit(); + return firstNonSavepointCommit.isPresent() + && compareTimestamps(instant, LESSER_THAN, firstNonSavepointCommit.get().getTimestamp()); } + public Option getFirstNonSavepointCommit() { + Option firstCommit = firstInstant(); + Set savepointTimestamps = instants.stream() + .filter(entry -> entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION)) + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toSet()); + Option firstNonSavepointCommit = firstCommit; + if (!savepointTimestamps.isEmpty()) { + // There are chances that there could be holes in the timeline due to archival and savepoint interplay. + // So, the first non-savepoint commit is considered as beginning of the active timeline. + firstNonSavepointCommit = Option.fromJavaOptional(instants.stream() + .filter(entry -> !savepointTimestamps.contains(entry.getTimestamp())) + .findFirst()); + } + return firstNonSavepointCommit; + } + @Override public Option getInstantDetails(HoodieInstant instant) { return details.apply(instant); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index c3fbd97312c0e..e52a2795969ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -305,6 +305,15 @@ public interface HoodieTimeline extends Serializable { */ boolean isBeforeTimelineStarts(String ts); + /** + * First non-savepoint commit in the active data timeline. Examples: + * 1. An active data timeline C1, C2, C3, C4, C5 returns C1. + * 2. If archival is allowed beyond savepoint and let's say C1, C2, C4 have been archived + * while C3, C5 have been savepointed, then for the data timeline + * C3, C3_Savepoint, C5, C5_Savepoint, C6, C7 returns C6. + */ + Option getFirstNonSavepointCommit(); + /** * Read the completed instant details. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java index 8ea9ad94ab46b..91a2019f10b7c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java @@ -18,10 +18,15 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.MockHoodieTimeline; + import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -47,4 +52,25 @@ public void testCommittedFileSlices() { assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("001")); assertTrue((new HoodieFileGroup(fileGroup)).getLatestFileSlice().get().getBaseInstantTime().equals("001")); } + + @Test + public void testCommittedFileSlicesWithSavepointAndHoles() { + MockHoodieTimeline activeTimeline = new MockHoodieTimeline(Stream.of( + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"), + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "01"), + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"), + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), + new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05") // this can be DELTA_COMMIT/REPLACE_COMMIT as well + ).collect(Collectors.toList())); + HoodieFileGroup fileGroup = new HoodieFileGroup("", "data", activeTimeline.filterCompletedAndCompactionInstants()); + for (int i = 0; i < 7; i++) { + HoodieBaseFile baseFile = new HoodieBaseFile("data_1_0" + i); + fileGroup.addBaseFile(baseFile); + } + List allFileSlices = fileGroup.getAllFileSlices().collect(Collectors.toList()); + assertEquals(6, allFileSlices.size()); + assertTrue(!allFileSlices.stream().anyMatch(s -> s.getBaseInstantTime().equals("06"))); + assertEquals(7, fileGroup.getAllFileSlicesIncludingInflight().count()); + assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("05")); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 5692337471a9b..628aeb8e804b8 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -262,6 +262,57 @@ public void testGetContiguousCompletedWriteTimeline() { assertEquals(instant7.getTimestamp(), timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp()); } + @Test + public void testTimelineWithSavepointAndHoles() { + timeline = new MockHoodieTimeline(Stream.of( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05") // this can be DELTA_COMMIT/REPLACE_COMMIT as well + ).collect(Collectors.toList())); + assertTrue(timeline.isBeforeTimelineStarts("00")); + assertTrue(timeline.isBeforeTimelineStarts("01")); + assertTrue(timeline.isBeforeTimelineStarts("02")); + assertTrue(timeline.isBeforeTimelineStarts("03")); + assertTrue(timeline.isBeforeTimelineStarts("04")); + assertFalse(timeline.isBeforeTimelineStarts("05")); + assertFalse(timeline.isBeforeTimelineStarts("06")); + + // with an inflight savepoint in between + timeline = new MockHoodieTimeline(Stream.of( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"), + new HoodieInstant(State.INFLIGHT, HoodieTimeline.SAVEPOINT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05") + ).collect(Collectors.toList())); + assertTrue(timeline.isBeforeTimelineStarts("00")); + assertTrue(timeline.isBeforeTimelineStarts("01")); + assertTrue(timeline.isBeforeTimelineStarts("02")); + assertTrue(timeline.isBeforeTimelineStarts("03")); + assertTrue(timeline.isBeforeTimelineStarts("04")); + assertFalse(timeline.isBeforeTimelineStarts("05")); + assertFalse(timeline.isBeforeTimelineStarts("06")); + + // with a pending replacecommit after savepoints + timeline = new MockHoodieTimeline(Stream.of( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "01"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.SAVEPOINT_ACTION, "03"), + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "05"), + new HoodieInstant(State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, "06") + ).collect(Collectors.toList())); + assertTrue(timeline.isBeforeTimelineStarts("00")); + assertTrue(timeline.isBeforeTimelineStarts("01")); + assertTrue(timeline.isBeforeTimelineStarts("02")); + assertTrue(timeline.isBeforeTimelineStarts("03")); + assertTrue(timeline.isBeforeTimelineStarts("04")); + assertFalse(timeline.isBeforeTimelineStarts("05")); + assertFalse(timeline.isBeforeTimelineStarts("06")); + } + @Test public void testTimelineGetOperations() { List allInstants = getAllInstants(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 290753ef52006..f631ec94b0e4b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -35,6 +36,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.util.Option; @@ -156,6 +158,10 @@ public static void createCommit(String basePath, String instantTime, Option getBaseFileCountsForPaths(String basePath, FileS public static void deleteDeltaCommit(String basePath, String instantTime, FileSystem fs) throws IOException { deleteMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION, fs); } + + public static void deleteSavepointCommit(String basePath, String instantTime, FileSystem fs) throws IOException { + deleteMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_SAVEPOINT_EXTENSION, fs); + deleteMetaFile(basePath, instantTime, HoodieTimeline.SAVEPOINT_EXTENSION, fs); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 412a69c94cca8..1351d1681212d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -99,6 +99,7 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile; +import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightSavepoint; import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile; @@ -109,6 +110,8 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedRollbackFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createRestoreFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createRollbackFile; +import static org.apache.hudi.common.testutils.FileCreateUtils.createSavepointCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.deleteSavepointCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName; import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap; @@ -199,6 +202,12 @@ public HoodieTestTable addCommit(String instantTime, Option getWrittenLogFiles(String instant, Map.Entry> partitionToFilesMeta) { HoodieSavepointMetadata savepointMetadata = new HoodieSavepointMetadata(); - savepointMetadata.setSavepointedAt(Long.valueOf(instant)); + savepointMetadata.setSavepointedAt(12345L); Map partitionMetadataMap = new HashMap<>(); for (Map.Entry> entry : partitionToFilesMeta.entrySet()) { HoodieSavepointPartitionMetadata savepointPartitionMetadata = new HoodieSavepointPartitionMetadata(); @@ -404,6 +413,7 @@ public HoodieSavepointMetadata getSavepointMetadata(String instant, Map completed, Stream inflights) { inflights.map(s -> new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, s))) .sorted(Comparator.comparing(HoodieInstant::getFileName)).collect(Collectors.toList())); } + + public MockHoodieTimeline(List instants) { + super(); + this.setInstants(instants); + } }