Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
public enum CompactionTriggerStrategy {
// trigger compaction when reach N delta commits
NUM_COMMITS,
// trigger compaction when reach N delta commits since last compaction request
NUM_COMMITS_AFTER_LAST_REQUEST,
// trigger compaction when time elapsed > N seconds since last compaction
TIME_ELAPSED,
// trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ private Option<Pair<Integer, String>> getLatestDeltaCommitInfo() {
return Option.empty();
}

private Option<Pair<Integer, String>> getLatestDeltaCommitInfoSinceLastCompactionRequest() {
Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
CompactionUtils.getDeltaCommitsSinceLatestCompactionRequest(table.getActiveTimeline());
if (deltaCommitsInfo.isPresent()) {
return Option.of(Pair.of(
deltaCommitsInfo.get().getLeft().countInstants(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the compaction scheduling would be affected by the progress of the compaction executions ? Is that reasonable ? Don't think so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, NUM_COMMITS or TIME_ELAPSED compaction trigger strategy will check the number or time of delta-commits after the last successful compaction. so if the offline compaction application crash for a while(or async compaction is very slow), there will be a lot of compaction request(one request per delta commit) in the timeline, and that will have a side effect on performance.

so this PR provides a new strategy not to check the last successful compaction but check the last compaction request if possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 Any input or question?

deltaCommitsInfo.get().getRight().getTimestamp()));
}
return Option.empty();
}

private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
Expand All @@ -157,6 +168,18 @@ private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy)
LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
}
break;
case NUM_COMMITS_AFTER_LAST_REQUEST:
latestDeltaCommitInfoOption = getLatestDeltaCommitInfoSinceLastCompactionRequest();

if (!latestDeltaCommitInfoOption.isPresent()) {
return false;
}
latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
if (compactable) {
LOG.info(String.format("The delta commits >= %s since the last compaction request, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
}
break;
case TIME_ELAPSED:
compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
if (compactable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@

import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand All @@ -52,6 +55,18 @@ private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, int
.build();
}

private HoodieWriteConfig getConfigDisableComapction(int maxDeltaCommits, int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
return getConfigBuilder(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withScheduleInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits)
.withMaxDeltaSecondsBeforeCompaction(maxDeltaTime)
.withInlineCompactionTriggerStrategy(inlineCompactionType).build())
.build();
}

@Test
public void testCompactionIsNotScheduledEarly() throws Exception {
// Given: make two commits
Expand Down Expand Up @@ -93,6 +108,65 @@ public void testSuccessfulCompactionBasedOnNumCommits() throws Exception {
}
}

@Test
public void testSuccessfulCompactionBasedOnNumAfterCompactionRequest() throws Exception {
// Given: make 4 commits
HoodieWriteConfig cfg = getConfigDisableComapction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
// turn off compaction table service to mock compaction service is down or very slow
List<String> instants = IntStream.range(0, 4).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());

try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 100);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());

// step 1: create and complete 4 delta commit, then create 1 compaction request after this
runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, new ArrayList<>());

String requestInstant = HoodieActiveTimeline.createNewInstantTime();
scheduleCompaction(requestInstant, writeClient, cfg);

metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(metaClient.getActiveTimeline().getInstants()
.filter(hoodieInstant -> hoodieInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)
&& hoodieInstant.getState() == HoodieInstant.State.REQUESTED).count(), 1);

// step 2: try to create another, but this one should fail because the NUM_COMMITS_AFTER_LAST_REQUEST strategy ,
// and will throw a AssertionError due to scheduleCompaction will check if the last instant is a compaction request
requestInstant = HoodieActiveTimeline.createNewInstantTime();
try {
scheduleCompaction(requestInstant, writeClient, cfg);
Assertions.fail();
} catch (AssertionError error) {
//should be here
}

// step 3: compelete another 4 delta commit should be 2 compaction request after this
instants = IntStream.range(0, 4).mapToObj(i -> HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
records = dataGen.generateInsertsForPartition(instants.get(0), 100, "2022/03/15");
for (String instant : instants) {
createNextDeltaCommit(instant, records, writeClient, metaClient, cfg, false);
}
// runNextDeltaCommits(writeClient, readClient, instants, records, cfg, true, gotPendingCompactionInstants);
requestInstant = HoodieActiveTimeline.createNewInstantTime();
scheduleCompaction(requestInstant, writeClient, cfg);

// step 4: restore the table service, complete the last commit, and this commit will trigger all compaction requests
cfg = getConfigForInlineCompaction(4, 60, CompactionTriggerStrategy.NUM_COMMITS_AFTER_LAST_REQUEST);
try (SparkRDDWriteClient<?> newWriteClient = getHoodieWriteClient(cfg)) {
String finalInstant = HoodieActiveTimeline.createNewInstantTime();
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 100), newWriteClient, metaClient, cfg, false);
}

metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
// step 5: there should be only 2 .commit, and no pending compaction.
// the last instant should be delta commit since the compaction request is earlier.
assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().filter(instant -> instant.getAction().equals(HoodieTimeline.COMMIT_ACTION))
.countInstants(), 2);
assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().filterPendingCompactionTimeline().countInstants(), 0);
assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
}
}

@Test
public void testSuccessfulCompactionBasedOnTime() throws Exception {
// Given: make one commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,36 @@ public static Option<Pair<HoodieTimeline, HoodieInstant>> getDeltaCommitsSinceLa
}
}

public static Option<Pair<HoodieTimeline, HoodieInstant>> getDeltaCommitsSinceLatestCompactionRequest(
HoodieActiveTimeline activeTimeline) {
Option<HoodieInstant> lastCompaction = activeTimeline.getCommitTimeline()
.filterCompletedInstants().lastInstant();
Option<HoodieInstant> lastRequestCompaction = activeTimeline.getAllCommitsTimeline()
.filterPendingCompactionTimeline().lastInstant();
if (lastRequestCompaction.isPresent()) {
lastCompaction = lastRequestCompaction;
}
HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline();

HoodieInstant latestInstant;
if (lastCompaction.isPresent()) {
latestInstant = lastCompaction.get();
// timeline containing the delta commits after the latest completed compaction commit,
// and the completed compaction commit instant
return Option.of(Pair.of(deltaCommits.findInstantsAfter(
latestInstant.getTimestamp(), Integer.MAX_VALUE), lastCompaction.get()));
} else {
if (deltaCommits.countInstants() > 0) {
latestInstant = deltaCommits.firstInstant().get();
// timeline containing all the delta commits, and the first delta commit instant
return Option.of(Pair.of(deltaCommits.findInstantsAfterOrEquals(
latestInstant.getTimestamp(), Integer.MAX_VALUE), latestInstant));
} else {
return Option.empty();
}
}
}

/**
* Gets the oldest instant to retain for MOR compaction.
* If there is no completed compaction,
Expand Down