diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java index 6a4e634bc73a7..ec85978552c23 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java @@ -21,6 +21,8 @@ public enum CompactionTriggerStrategy { // trigger compaction when reach N delta commits NUM_COMMITS, + // trigger compaction when reach N delta commits since last compaction request + NUM_COMMITS_AFTER_LAST_REQUEST, // trigger compaction when time elapsed > N seconds since last compaction TIME_ELAPSED, // trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 0f1c811ee9862..4fb5f9f7ddba5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -140,6 +140,17 @@ private Option> getLatestDeltaCommitInfo() { return Option.empty(); } + private Option> getLatestDeltaCommitInfoSinceLastCompactionRequest() { + Option> deltaCommitsInfo = + CompactionUtils.getDeltaCommitsSinceLatestCompactionRequest(table.getActiveTimeline()); + if (deltaCommitsInfo.isPresent()) { + return Option.of(Pair.of( + deltaCommitsInfo.get().getLeft().countInstants(), + deltaCommitsInfo.get().getRight().getTimestamp())); + } + return Option.empty(); + } + private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) { boolean compactable; // get deltaCommitsSinceLastCompaction and lastCompactionTs @@ -157,6 +168,18 @@ private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax)); } break; + case NUM_COMMITS_AFTER_LAST_REQUEST: + latestDeltaCommitInfoOption = getLatestDeltaCommitInfoSinceLastCompactionRequest(); + + if (!latestDeltaCommitInfoOption.isPresent()) { + return false; + } + latestDeltaCommitInfo = latestDeltaCommitInfoOption.get(); + compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft(); + if (compactable) { + LOG.info(String.format("The delta commits >= %s since the last compaction request, trigger compaction scheduler.", inlineCompactDeltaCommitMax)); + } + break; case TIME_ELAPSED: compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight()); if (compactable) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 7f1046ba90ce4..1f4f568843f20 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -20,15 +20,18 @@ import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.marker.WriteMarkersFactory; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -52,6 +55,18 @@ private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, int .build(); } + private HoodieWriteConfig getConfigDisableComapction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) { + return getConfigBuilder(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withInlineCompaction(false) + .withScheduleInlineCompaction(false) + .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits) + .withMaxDeltaSecondsBeforeCompaction(maxDeltaTime) + .withInlineCompactionTriggerStrategy(inlineCompactionType).build()) + .build(); + } + @Test public void testCompactionIsNotScheduledEarly() throws Exception { // Given: make two commits @@ -93,6 +108,65 @@ public void testSuccessfulCompactionBasedOnNumCommits() throws Exception { } } + @Test + public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest() throws Exception { + // Given: make 4 commits + HoodieWriteConfig cfg = getConfigDisableComapction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST); + // turn off compaction table service to mock compaction service is down or very slow + List instants = IntStream.range(0, 4).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(cfg)) { + List records = dataGen.generateInserts(instants.get(0), 100); + HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath()); + + // step 1: create and complete 4 delta commit, then create 1 compaction request after this + runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>()); + + String requestInstant = HoodieActiveTimeline.createNewInstantTime(); + scheduleCompaction(requestInstant, writeClient, cfg); + + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); + assertEquals(metaClient.getActiveTimeline().getInstants() + .filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION) + && hoodieInstant.getState() == HoodieInstant.State.REQUESTED).count(), 1); + + // step 2: try to create another, but this one should fail because the NUM_COMMITS_AFTER_LAST_REQUEST strategy , + // and will throw a AssertionError due to scheduleCompaction will check if the last instant is a compaction request + requestInstant = HoodieActiveTimeline.createNewInstantTime(); + try { + scheduleCompaction(requestInstant, writeClient, cfg); + Assertions.fail(); + } catch (AssertionError error) { + //should be here + } + + // step 3: compelete another 4 delta commit should be 2 compaction request after this + instants = IntStream.range(0, 4).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList()); + records = dataGen.generateInsertsForPartition(instants.get(0), 100, "2022/03/15"); + for (String instant : instants) { + createNextDeltaCommit(instant, records, writeClient, metaClient, cfg, false); + } + // runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, gotPendingCompactionInstants); + requestInstant = HoodieActiveTimeline.createNewInstantTime(); + scheduleCompaction(requestInstant, writeClient, cfg); + + // step 4: restore the table service, complete the last commit, and this commit will trigger all compaction requests + cfg = getConfigForInlineCompaction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST); + try (SparkRDDWriteClient newWriteClient = getHoodieWriteClient(cfg)) { + String finalInstant = HoodieActiveTimeline.createNewInstantTime(); + createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), newWriteClient, metaClient, cfg, false); + } + + metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); + // step 5: there should be only 2 .commit, and no pending compaction. + // the last instant should be delta commit since the compaction request is earlier. + assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().filter(instant -> instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) + .countInstants(), 2); + assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().filterPendingCompactionTimeline().countInstants(), 0); + assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction()); + } + } + @Test public void testSuccessfulCompactionBasedOnTime() throws Exception { // Given: make one commit diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index 14308d5df3b58..cf9b5fb3ced8c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -237,6 +237,36 @@ public static Option> getDeltaCommitsSinceLa } } + public static Option> getDeltaCommitsSinceLatestCompactionRequest( + HoodieActiveTimeline activeTimeline) { + Option lastCompaction = activeTimeline.getCommitTimeline() + .filterCompletedInstants().lastInstant(); + Option lastRequestCompaction = activeTimeline.getAllCommitsTimeline() + .filterPendingCompactionTimeline().lastInstant(); + if (lastRequestCompaction.isPresent()) { + lastCompaction = lastRequestCompaction; + } + HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline(); + + HoodieInstant latestInstant; + if (lastCompaction.isPresent()) { + latestInstant = lastCompaction.get(); + // timeline containing the delta commits after the latest completed compaction commit, + // and the completed compaction commit instant + return Option.of(Pair.of(deltaCommits.findInstantsAfter( + latestInstant.getTimestamp(), Integer.MAX_VALUE), lastCompaction.get())); + } else { + if (deltaCommits.countInstants() > 0) { + latestInstant = deltaCommits.firstInstant().get(); + // timeline containing all the delta commits, and the first delta commit instant + return Option.of(Pair.of(deltaCommits.findInstantsAfterOrEquals( + latestInstant.getTimestamp(), Integer.MAX_VALUE), latestInstant)); + } else { + return Option.empty(); + } + } + } + /** * Gets the oldest instant to retain for MOR compaction. * If there is no completed compaction,