diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 12a00181dcf6a..123f790d36df4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -123,13 +123,13 @@ private HoodieCompactionPlan scheduleCompaction() { return new HoodieCompactionPlan(); } - private Pair getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) { + private Pair getLatestDeltaCommitInfo() { Option lastCompaction = table.getActiveTimeline().getCommitTimeline() .filterCompletedInstants().lastInstant(); HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline(); String latestInstantTs; - int deltaCommitsSinceLastCompaction = 0; + final int deltaCommitsSinceLastCompaction; if (lastCompaction.isPresent()) { latestInstantTs = lastCompaction.get().getTimestamp(); deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants(); @@ -143,7 +143,7 @@ private Pair getLatestDeltaCommitInfo(CompactionTriggerStrategy private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) { boolean compactable; // get deltaCommitsSinceLastCompaction and lastCompactionTs - Pair latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy); + Pair latestDeltaCommitInfo = getLatestDeltaCommitInfo(); int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax(); int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax(); switch (compactionTriggerStrategy) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 075736fe38a17..376b36e3dcd80 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -84,4 +84,12 @@ public static String getPreCombineField(Configuration conf) { final String preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD); return preCombineField.equals(FlinkOptions.NO_PRE_COMBINE) ? null : preCombineField; } + + /** + * Returns whether the compaction strategy is based on elapsed delta time. + */ + public static boolean isDeltaTimeCompaction(Configuration conf) { + final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toLowerCase(Locale.ROOT); + return FlinkOptions.TIME_ELAPSED.equals(strategy) || FlinkOptions.NUM_OR_TIME.equals(strategy); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 0af38c41fbc5d..08a04e3f84bef 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -29,11 +29,13 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -221,11 +223,13 @@ public void notifyCheckpointComplete(long checkpointId) { // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) final boolean committed = commitInstant(this.instant, checkpointId); + + if (tableState.scheduleCompaction) { + // if async compaction is on, schedule the compaction + CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed); + } + if (committed) { - if (tableState.scheduleCompaction) { - // if async compaction is on, schedule the compaction - writeClient.scheduleCompaction(Option.empty()); - } // start new instant. startInstant(); // sync Hive if is enabled @@ -557,6 +561,7 @@ private static class TableState implements Serializable { final boolean scheduleCompaction; final boolean syncHive; final boolean syncMetadata; + final boolean isDeltaTimeCompaction; private TableState(Configuration conf) { this.operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); @@ -566,6 +571,7 @@ private TableState(Configuration conf) { this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED); this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); + this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf); } public static TableState create(Configuration conf) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index aebcc7d6ee98f..a22bea9f31b69 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -75,14 +75,16 @@ public static void main(String[] args) throws Exception { // judge whether have operation // to compute the compaction instant time and do compaction. if (cfg.schedule) { - String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); - boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); - if (!scheduled) { - // do nothing. - LOG.info("No compaction plan for this job "); - return; + Option compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); + if (compactionInstantTimeOption.isPresent()) { + boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty()); + if (!scheduled) { + // do nothing. + LOG.info("No compaction plan for this job "); + return; + } + table.getMetaClient().reloadActiveTimeline(); } - table.getMetaClient().reloadActiveTimeline(); } // fetch the instant based on the configured execution sequence diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index e0056f9a1d841..d04937bf7d66f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -46,10 +47,36 @@ public class CompactionUtil { private static final Logger LOG = LoggerFactory.getLogger(CompactionUtil.class); + /** + * Schedules a new compaction instant. + * + * @param metaClient The metadata client + * @param writeClient The write client + * @param deltaTimeCompaction Whether the compaction is trigger by elapsed delta time + * @param committed Whether the last instant was committed successfully + */ + public static void scheduleCompaction( + HoodieTableMetaClient metaClient, + HoodieFlinkWriteClient writeClient, + boolean deltaTimeCompaction, + boolean committed) { + if (committed) { + writeClient.scheduleCompaction(Option.empty()); + } else if (deltaTimeCompaction) { + // if there are no new commits and the compaction trigger strategy is based on elapsed delta time, + // schedules the compaction anyway. + metaClient.reloadActiveTimeline(); + Option compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); + if (compactionInstantTime.isPresent()) { + writeClient.scheduleCompactionAtInstant(compactionInstantTime.get(), Option.empty()); + } + } + } + /** * Gets compaction Instant time. */ - public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) { + public static Option getCompactionInstantTime(HoodieTableMetaClient metaClient) { Option firstPendingInstant = metaClient.getCommitsTimeline() .filterPendingExcludingCompaction().firstInstant(); Option lastCompleteInstant = metaClient.getActiveTimeline().getWriteTimeline() @@ -59,8 +86,11 @@ public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) String lastCompleteTimestamp = lastCompleteInstant.get().getTimestamp(); // Committed and pending compaction instants should have strictly lower timestamps return StreamerUtil.medianInstantTime(firstPendingTimestamp, lastCompleteTimestamp); + } else if (!lastCompleteInstant.isPresent()) { + LOG.info("No instants to schedule the compaction plan"); + return Option.empty(); } else { - return HoodieActiveTimeline.createNewInstantTime(); + return Option.of(HoodieActiveTimeline.createNewInstantTime()); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 3a31253a2c891..867621a66d8f4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -407,14 +407,14 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw /** * Returns the median instant time between the given two instant time. */ - public static String medianInstantTime(String highVal, String lowVal) { + public static Option medianInstantTime(String highVal, String lowVal) { try { long high = HoodieActiveTimeline.parseInstantTime(highVal).getTime(); long low = HoodieActiveTimeline.parseInstantTime(lowVal).getTime(); ValidationUtils.checkArgument(high > low, "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"); long median = low + (high - low) / 2; - return HoodieActiveTimeline.formatInstantTime(new Date(median)); + return low >= median ? Option.empty() : Option.of(HoodieActiveTimeline.formatInstantTime(new Date(median))); } catch (ParseException e) { throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 52002b1180bcc..e0c574bd655fb 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -109,11 +109,17 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { // infer changelog mode CompactionUtil.inferChangelogMode(conf, metaClient); + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + + boolean scheduled = false; // judge whether have operation // To compute the compaction instant time and do compaction. - String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); - boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty()); + Option compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); + if (compactionInstantTimeOption.isPresent()) { + scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty()); + } + + String compactionInstantTime = compactionInstantTimeOption.get(); assertTrue(scheduled, "The compaction plan should be scheduled"); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 54a142a25b4c2..7da4c9e119a39 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -223,6 +223,10 @@ public void checkpointFunction(long checkpointId) throws Exception { stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId); } + public void endInput() { + writeFunction.endInput(); + } + public void checkpointComplete(long checkpointId) { stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.notifyCheckpointComplete(checkpointId); @@ -248,6 +252,11 @@ public void subTaskFails(int taskID) throws Exception { public void close() throws Exception { coordinator.close(); ioManager.close(); + bucketAssignerFunction.close(); + writeFunction.close(); + if (compactFunctionWrapper != null) { + compactFunctionWrapper.close(); + } } public StreamWriteOperatorCoordinator getCoordinator() { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index a5fed83ea15de..9559b8c8c5863 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -20,11 +20,13 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieFlinkTable; @@ -33,14 +35,16 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -60,16 +64,24 @@ public class TestCompactionUtil { @TempDir File tempFile; - @BeforeEach void beforeEach() throws IOException { + beforeEach(Collections.emptyMap()); + } + + void beforeEach(Map options) throws IOException { this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + options.forEach((k, v) -> conf.setString(k, v)); + StreamerUtil.initTableIfNotExists(conf); + this.table = FlinkTables.createTable(conf); this.metaClient = table.getMetaClient(); } @Test - void rollbackCompaction() { + void rollbackCompaction() throws Exception { + beforeEach(); List oriInstants = IntStream.range(0, 3) .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList()); List instants = metaClient.getActiveTimeline() @@ -88,7 +100,8 @@ void rollbackCompaction() { } @Test - void rollbackEarliestCompaction() { + void rollbackEarliestCompaction() throws Exception { + beforeEach(); conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0); List oriInstants = IntStream.range(0, 3) .mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList()); @@ -109,6 +122,33 @@ void rollbackEarliestCompaction() { assertThat(instantTime, is(oriInstants.get(0))); } + @Test + void testScheduleCompaction() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false"); + options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), FlinkOptions.TIME_ELAPSED); + options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0"); + beforeEach(options); + + // write a commit with data first + TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf); + + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + CompactionUtil.scheduleCompaction(metaClient, writeClient, true, true); + + Option pendingCompactionInstant = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant(); + assertTrue(pendingCompactionInstant.isPresent(), "A compaction plan expects to be scheduled"); + + // write another commit with data and start a new instant + TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf); + TimeUnit.SECONDS.sleep(3); // in case the instant time interval is too close + writeClient.startCommit(); + + CompactionUtil.scheduleCompaction(metaClient, writeClient, true, false); + int numCompactionCommits = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants(); + assertThat("Two compaction plan expects to be scheduled", numCompactionCommits, is(2)); + } + /** * Generates a compaction plan on the timeline and returns its instant time. */ diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index e8e177b823626..c1e924056cfa2 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -343,6 +343,37 @@ public static void writeData( funcWrapper.close(); } + /** + * Write a list of row data with Hoodie format base on the given configuration. + * + *

The difference with {@link #writeData} is that it flush data using #endInput, and it + * does not generate inflight instant. + * + * @param dataBuffer The data buffer to write + * @param conf The flink configuration + * @throws Exception if error occurs + */ + public static void writeDataAsBatch( + List dataBuffer, + Configuration conf) throws Exception { + StreamWriteFunctionWrapper funcWrapper = new StreamWriteFunctionWrapper<>( + conf.getString(FlinkOptions.PATH), + conf); + funcWrapper.openFunction(); + + for (RowData rowData : dataBuffer) { + funcWrapper.invoke(rowData); + } + + // this triggers the data write and event send + funcWrapper.endInput(); + + final OperatorEvent nextEvent = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + + funcWrapper.close(); + } + private static String toStringSafely(Object obj) { return obj == null ? "null" : obj.toString(); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index af40a8dd82e60..c05e5b056344a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -80,7 +80,7 @@ void testInitTableIfNotExists() throws IOException { void testMedianInstantTime() { String higher = "20210705125921"; String lower = "20210705125806"; - String median1 = StreamerUtil.medianInstantTime(higher, lower); + String median1 = StreamerUtil.medianInstantTime(higher, lower).get(); assertThat(median1, is("20210705125843")); // test symmetry assertThrows(IllegalArgumentException.class,