diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 15401c0292e14..66c89cfdc014a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; @@ -43,6 +44,7 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -52,6 +54,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; @@ -76,6 +79,7 @@ import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; /** @@ -395,6 +399,18 @@ private Stream getCommitInstantsToArchive() { // made after the first savepoint present. Option firstSavepoint = table.getCompletedSavepointTimeline().firstInstant(); if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) { + // For Merge-On-Read table, inline or async compaction is enabled + // We need to make sure that there are enough delta commits in the active timeline + // to trigger compaction scheduling, when the trigger strategy of compaction is + // NUM_COMMITS or NUM_AND_TIME. + Option oldestInstantToRetainForCompaction = + (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ + && (config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_COMMITS + || config.getInlineCompactTriggerStrategy() == CompactionTriggerStrategy.NUM_AND_TIME)) + ? CompactionUtils.getOldestInstantToRetainForCompaction( + table.getActiveTimeline(), config.getInlineCompactDeltaCommitMax()) + : Option.empty(); + // Actually do the commits Stream instantToArchiveStream = commitTimeline.getInstants() .filter(s -> { @@ -405,14 +421,21 @@ private Stream getCommitInstantsToArchive() { return oldestPendingCompactionAndReplaceInstant .map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) .orElse(true); - }); - // We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't - // get archived, i.e, instants after the oldestInflight are retained on the timeline - if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) { - instantToArchiveStream = instantToArchiveStream.filter(s -> oldestInflightCommitInstant.map(instant -> - HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) - .orElse(true)); - } + }).filter(s -> { + // We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't + // get archived, i.e, instants after the oldestInflight are retained on the timeline + if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) { + return oldestInflightCommitInstant.map(instant -> + HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp())) + .orElse(true); + } + return true; + }).filter(s -> + oldestInstantToRetainForCompaction.map(instantToRetain -> + HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp())) + .orElse(true) + ); + return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep); } else { return Stream.empty(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 2627454fccb60..d3cc5660bc70a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -128,27 +129,25 @@ private HoodieCompactionPlan scheduleCompaction() { return new HoodieCompactionPlan(); } - private Pair getLatestDeltaCommitInfo() { - Option lastCompaction = table.getActiveTimeline().getCommitTimeline() - .filterCompletedInstants().lastInstant(); - HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline(); - - String latestInstantTs; - final int deltaCommitsSinceLastCompaction; - if (lastCompaction.isPresent()) { - latestInstantTs = lastCompaction.get().getTimestamp(); - deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants(); - } else { - latestInstantTs = deltaCommits.firstInstant().get().getTimestamp(); - deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfterOrEquals(latestInstantTs, Integer.MAX_VALUE).countInstants(); + private Option> getLatestDeltaCommitInfo() { + Option> deltaCommitsInfo = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(table.getActiveTimeline()); + if (deltaCommitsInfo.isPresent()) { + return Option.of(Pair.of( + deltaCommitsInfo.get().getLeft().countInstants(), + deltaCommitsInfo.get().getRight().getTimestamp())); } - return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs); + return Option.empty(); } private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) { boolean compactable; // get deltaCommitsSinceLastCompaction and lastCompactionTs - Pair latestDeltaCommitInfo = getLatestDeltaCommitInfo(); + Option> latestDeltaCommitInfoOption = getLatestDeltaCommitInfo(); + if (!latestDeltaCommitInfoOption.isPresent()) { + return false; + } + Pair latestDeltaCommitInfo = latestDeltaCommitInfoOption.get(); int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax(); int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax(); switch (compactionTriggerStrategy) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 652dbcb155b0e..aafc538213738 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -44,6 +44,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; @@ -71,6 +72,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestUtils.createCompactionCommitInMetadataTable; @@ -125,9 +127,20 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, int minArchivalCommits, int maxArchivalCommits, + int maxDeltaCommits, int maxDeltaCommitsMetadataTable, HoodieTableType tableType) throws Exception { - return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200); + return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, + maxDeltaCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200); + } + + private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, + int minArchivalCommits, + int maxArchivalCommits, + int maxDeltaCommitsMetadataTable, + HoodieTableType tableType) throws Exception { + return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, + 5, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200); } private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, @@ -137,13 +150,14 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, boolean enableArchiveMerge, int archiveFilesBatch, long size) throws Exception { - return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, + return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, 5, maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size); } private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, int minArchivalCommits, int maxArchivalCommits, + int maxDeltaCommits, int maxDeltaCommitsMetadataTable, HoodieTableType tableType, boolean enableArchiveMerge, @@ -153,6 +167,7 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits) + .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits) .withArchiveMergeEnable(enableArchiveMerge) .withArchiveMergeFilesBatchSize(archiveFilesBatch) .withArchiveMergeSmallFileLimit(size) @@ -546,7 +561,7 @@ public void testArchiveRollbacksTestTable(boolean enableMetadata) throws Excepti @ParameterizedTest @ValueSource(booleans = {true, false}) public void testNoArchivalWithInflightCompactionInMiddle(boolean enableMetadata) throws Exception { - HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2, + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(enableMetadata, 2, 4, 2, 2, HoodieTableType.MERGE_ON_READ); // when max archival commits is set to 4, even after 7 commits, if there is an inflight compaction in the middle, archival should not kick in. @@ -946,6 +961,152 @@ public void testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enableMetadata) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig( + enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ); + + // When max archival commits is set to 4, even after 8 delta commits, since the number of delta + // commits is still smaller than 8, the archival should not kick in. + // The archival should only kick in after the 9th delta commit + // instant "00000001" to "00000009" + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 + ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // archival + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); + List originalCommits = commitsList.getKey(); + List commitsAfterArchival = commitsList.getValue(); + + if (i <= 8) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + assertEquals(1, originalCommits.size() - commitsAfterArchival.size()); + assertFalse(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000001"))); + IntStream.range(2, 10).forEach(j -> + assertTrue(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)))); + } + } + + testTable.doCompaction("00000010", Arrays.asList("p1", "p2")); + + // instant "00000011" to "00000019" + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i == 1 + ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // archival + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); + List originalCommits = commitsList.getKey(); + List commitsAfterArchival = commitsList.getValue(); + + // first 9 delta commits before the completed compaction should be archived + IntStream.range(1, 10).forEach(j -> + assertFalse(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)))); + + if (i == 1) { + assertEquals(8, originalCommits.size() - commitsAfterArchival.size()); + // instant from "00000011" should be in the active timeline + assertTrue(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000010"))); + assertTrue(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000011"))); + } else if (i < 8) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + assertEquals(1, originalCommits.size() - commitsAfterArchival.size()); + assertFalse(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000010"))); + // i == 8 -> ["00000011", "00000018"] should be in the active timeline + // i == 9 -> ["00000012", "00000019"] should be in the active timeline + if (i == 9) { + assertFalse(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000011"))); + } + IntStream.range(i - 7, i + 1).forEach(j -> + assertTrue(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000001" + j)))); + } + } + } + + @Test + public void testArchivalAndCompactionInMetadataTable() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + // Test configs where metadata table has more aggressive archival configs than the compaction config + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .retainCommits(1).archiveCommitsWith(2, 4).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) + .withMaxNumDeltaCommitsBeforeCompaction(8) + .retainCommits(1).archiveCommitsWith(2, 4).build()) + .forTable("test-trip-table").build(); + initWriteConfigAndMetatableWriter(writeConfig, true); + + HoodieTableMetaClient metadataTableMetaClient = HoodieTableMetaClient.builder() + .setConf(metaClient.getHadoopConf()) + .setBasePath(HoodieTableMetadata.getMetadataTableBasePath(basePath)) + .setLoadActiveTimelineOnLoad(true).build(); + + for (int i = 1; i <= 16; i++) { + testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT, + i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // archival + archiveAndGetCommitsList(writeConfig); + + metadataTableMetaClient = HoodieTableMetaClient.reload(metadataTableMetaClient); + List metadataTableInstants = metadataTableMetaClient.getActiveTimeline() + .getCommitsTimeline().filterCompletedInstants().getInstants() + .collect(Collectors.toList()); + + if (i <= 7) { + // In the metadata table timeline, the first delta commit is "00000000000000" + // from metadata table init, delta commits "00000001" till "00000007" are added + // later on without archival or compaction + assertEquals(i + 1, metadataTableInstants.size()); + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000000000000"))); + IntStream.range(1, i + 1).forEach(j -> + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)))); + } else if (i <= 14) { + // In the metadata table timeline, the first delta commit is "00000007001" + // from metadata table compaction, after archival, delta commits "00000008" + // till "00000014" are added later on without archival or compaction + assertEquals(i - 6, metadataTableInstants.size()); + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001"))); + IntStream.range(8, i + 1).forEach(j -> + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, + "000000" + String.format("%02d", j))))); + } else if (i == 15) { + // Only delta commits "00000008" till "00000015" are in the active timeline + assertEquals(8, metadataTableInstants.size()); + assertFalse(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001"))); + IntStream.range(8, 16).forEach(j -> + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, + "000000" + String.format("%02d", j))))); + } else { + // i == 16 + // Only commit "00000015001" and delta commit "00000016" are in the active timeline + assertEquals(2, metadataTableInstants.size()); + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000015001"))); + assertTrue(metadataTableInstants.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000016"))); + } + } + } + private Pair, List> archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException { metaClient.reloadActiveTimeline(); HoodieTimeline timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index f3e4dc62837c1..14308d5df3b58 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -59,13 +60,13 @@ public class CompactionUtils { /** * Generate compaction operation from file-slice. * - * @param partitionPath Partition path - * @param fileSlice File Slice + * @param partitionPath Partition path + * @param fileSlice File Slice * @param metricsCaptureFunction Metrics Capture function * @return Compaction Operation */ public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, FileSlice fileSlice, - Option, Map>> metricsCaptureFunction) { + Option, Map>> metricsCaptureFunction) { HoodieCompactionOperation.Builder builder = HoodieCompactionOperation.newBuilder(); builder.setPartitionPath(partitionPath); builder.setFileId(fileSlice.getFileId()); @@ -87,12 +88,12 @@ public static HoodieCompactionOperation buildFromFileSlice(String partitionPath, * Generate compaction plan from file-slices. * * @param partitionFileSlicePairs list of partition file-slice pairs - * @param extraMetadata Extra Metadata - * @param metricsCaptureFunction Metrics Capture function + * @param extraMetadata Extra Metadata + * @param metricsCaptureFunction Metrics Capture function */ public static HoodieCompactionPlan buildFromFileSlices(List> partitionFileSlicePairs, - Option> extraMetadata, - Option, Map>> metricsCaptureFunction) { + Option> extraMetadata, + Option, Map>> metricsCaptureFunction) { HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder(); extraMetadata.ifPresent(builder::setExtraMetadata); @@ -195,10 +196,76 @@ public static Stream getPendingCompactionInstantTimes(HoodieTableMetaClient metaClient) { return metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList()); } + + /** + * Returns a pair of (timeline containing the delta commits after the latest completed + * compaction commit, the completed compaction commit instant), if the latest completed + * compaction commit is present; a pair of (timeline containing all the delta commits, + * the first delta commit instant), if there is no completed compaction commit. + * + * @param activeTimeline Active timeline of a table. + * @return Pair of timeline containing delta commits and an instant. + */ + public static Option> getDeltaCommitsSinceLatestCompaction( + HoodieActiveTimeline activeTimeline) { + Option lastCompaction = activeTimeline.getCommitTimeline() + .filterCompletedInstants().lastInstant(); + HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline(); + + HoodieInstant latestInstant; + if (lastCompaction.isPresent()) { + latestInstant = lastCompaction.get(); + // timeline containing the delta commits after the latest completed compaction commit, + // and the completed compaction commit instant + return Option.of(Pair.of(deltaCommits.findInstantsAfter( + latestInstant.getTimestamp(), Integer.MAX_VALUE), lastCompaction.get())); + } else { + if (deltaCommits.countInstants() > 0) { + latestInstant = deltaCommits.firstInstant().get(); + // timeline containing all the delta commits, and the first delta commit instant + return Option.of(Pair.of(deltaCommits.findInstantsAfterOrEquals( + latestInstant.getTimestamp(), Integer.MAX_VALUE), latestInstant)); + } else { + return Option.empty(); + } + } + } + + /** + * Gets the oldest instant to retain for MOR compaction. + * If there is no completed compaction, + * num delta commits >= "hoodie.compact.inline.max.delta.commits" + * If there is a completed compaction, + * num delta commits after latest completed compaction >= "hoodie.compact.inline.max.delta.commits" + * + * @param activeTimeline Active timeline of a table. + * @param maxDeltaCommits Maximum number of delta commits that trigger the compaction plan, + * i.e., "hoodie.compact.inline.max.delta.commits". + * @return the oldest instant to keep for MOR compaction. + */ + public static Option getOldestInstantToRetainForCompaction( + HoodieActiveTimeline activeTimeline, int maxDeltaCommits) { + Option> deltaCommitsInfoOption = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(activeTimeline); + if (deltaCommitsInfoOption.isPresent()) { + Pair deltaCommitsInfo = deltaCommitsInfoOption.get(); + HoodieTimeline deltaCommitTimeline = deltaCommitsInfo.getLeft(); + int numDeltaCommits = deltaCommitTimeline.countInstants(); + if (numDeltaCommits < maxDeltaCommits) { + return Option.of(deltaCommitsInfo.getRight()); + } else { + // delta commits with the last one to keep + List instants = deltaCommitTimeline.getInstants() + .limit(numDeltaCommits - maxDeltaCommits + 1).collect(Collectors.toList()); + return Option.of(instants.get(instants.size() - 1)); + } + } + return Option.empty(); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index e3c89a377384b..30abe48cb4e19 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -27,6 +27,9 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.versioning.compaction.CompactionPlanMigrator; import org.apache.hudi.common.testutils.CompactionTestUtils.DummyHoodieBaseFile; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; @@ -35,15 +38,20 @@ import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.hudi.common.testutils.CompactionTestUtils.createCompactionPlan; import static org.apache.hudi.common.testutils.CompactionTestUtils.scheduleCompaction; @@ -230,11 +238,95 @@ public void testGetAllPendingCompactionOperationsForEmptyCompactions() throws IO setupAndValidateCompactionOperations(metaClient, false, 0, 0, 0, 0); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testGetDeltaCommitsSinceLatestCompaction(boolean hasCompletedCompaction) { + HoodieActiveTimeline timeline = prepareTimeline(hasCompletedCompaction); + Pair actual = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get(); + if (hasCompletedCompaction) { + Stream instants = actual.getLeft().getInstants(); + assertEquals( + Stream.of( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")) + .collect(Collectors.toList()), + actual.getLeft().getInstants().collect(Collectors.toList())); + assertEquals( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), + actual.getRight()); + } else { + assertEquals( + Stream.of( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "05"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")) + .collect(Collectors.toList()), + actual.getLeft().getInstants().collect(Collectors.toList())); + assertEquals( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), + actual.getRight()); + } + } + + @Test + public void testGetDeltaCommitsSinceLatestCompactionWithEmptyDeltaCommits() { + HoodieActiveTimeline timeline = new MockHoodieActiveTimeline(); + assertEquals(Option.empty(), CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testGetOldestInstantToKeepForCompaction(boolean hasCompletedCompaction) { + HoodieActiveTimeline timeline = prepareTimeline(hasCompletedCompaction); + Option actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 20); + + if (hasCompletedCompaction) { + assertEquals(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), actual.get()); + } else { + assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), actual.get()); + } + + actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 3); + assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), actual.get()); + + actual = CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 2); + assertEquals(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), actual.get()); + } + + @Test + public void testGetOldestInstantToKeepForCompactionWithEmptyDeltaCommits() { + HoodieActiveTimeline timeline = new MockHoodieActiveTimeline(); + assertEquals(Option.empty(), CompactionUtils.getOldestInstantToRetainForCompaction(timeline, 20)); + } + + private HoodieActiveTimeline prepareTimeline(boolean hasCompletedCompaction) { + if (hasCompletedCompaction) { + return new MockHoodieActiveTimeline( + Stream.of("01", "02", "03", "04", "05", "07", "08"), + Stream.of("06"), + Stream.of(Pair.of("09", HoodieTimeline.DELTA_COMMIT_ACTION))); + } else { + return new MockHoodieActiveTimeline( + Stream.of("01", "02", "03", "04", "05", "07", "08"), + Stream.empty(), + Stream.of( + Pair.of("06", HoodieTimeline.COMMIT_ACTION), + Pair.of("09", HoodieTimeline.DELTA_COMMIT_ACTION))); + } + } + /** * Validates if generated compaction plan matches with input file-slices. * * @param input File Slices with partition-path - * @param plan Compaction Plan + * @param plan Compaction Plan */ private void testFileSlicesCompactionPlanEquality(List> input, HoodieCompactionPlan plan) { assertEquals(input.size(), plan.getOperations().size(), "All file-slices present"); @@ -245,12 +337,12 @@ private void testFileSlicesCompactionPlanEquality(List> /** * Validates if generated compaction operation matches with input file slice and partition path. * - * @param slice File Slice - * @param op HoodieCompactionOperation + * @param slice File Slice + * @param op HoodieCompactionOperation * @param expPartitionPath Partition path */ private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op, String expPartitionPath, - int version) { + int version) { assertEquals(expPartitionPath, op.getPartitionPath(), "Partition path is correct"); assertEquals(slice.getBaseInstantTime(), op.getBaseInstantTime(), "Same base-instant"); assertEquals(slice.getFileId(), op.getFileId(), "Same file-id"); @@ -270,4 +362,24 @@ private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompaction protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; } + + class MockHoodieActiveTimeline extends HoodieActiveTimeline { + + public MockHoodieActiveTimeline() { + super(); + this.setInstants(new ArrayList<>()); + } + + public MockHoodieActiveTimeline( + Stream completedDeltaCommits, + Stream completedCompactionCommits, + Stream> inflights) { + super(); + this.setInstants(Stream.concat( + Stream.concat(completedDeltaCommits.map(s -> new HoodieInstant(false, DELTA_COMMIT_ACTION, s)), + completedCompactionCommits.map(s -> new HoodieInstant(false, COMMIT_ACTION, s))), + inflights.map(s -> new HoodieInstant(true, s.getRight(), s.getLeft()))) + .sorted(Comparator.comparing(HoodieInstant::getFileName)).collect(Collectors.toList())); + } + } }