Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ private HoodieCompactionPlan scheduleCompaction() {
return new HoodieCompactionPlan();
}

private Pair<Integer, String> getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
private Pair<Integer, String> getLatestDeltaCommitInfo() {
Option<HoodieInstant> lastCompaction = table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
HoodieTimeline deltaCommits = table.getActiveTimeline().getDeltaCommitTimeline();

String latestInstantTs;
int deltaCommitsSinceLastCompaction = 0;
final int deltaCommitsSinceLastCompaction;
if (lastCompaction.isPresent()) {
latestInstantTs = lastCompaction.get().getTimestamp();
deltaCommitsSinceLastCompaction = deltaCommits.findInstantsAfter(latestInstantTs, Integer.MAX_VALUE).countInstants();
Expand All @@ -143,7 +143,7 @@ private Pair<Integer, String> getLatestDeltaCommitInfo(CompactionTriggerStrategy
private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
boolean compactable;
// get deltaCommitsSinceLastCompaction and lastCompactionTs
Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo(compactionTriggerStrategy);
Pair<Integer, String> latestDeltaCommitInfo = getLatestDeltaCommitInfo();
int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax();
switch (compactionTriggerStrategy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,12 @@ public static String getPreCombineField(Configuration conf) {
final String preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
return preCombineField.equals(FlinkOptions.NO_PRE_COMBINE) ? null : preCombineField;
}

/**
* Returns whether the compaction strategy is based on elapsed delta time.
*/
public static boolean isDeltaTimeCompaction(Configuration conf) {
final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toLowerCase(Locale.ROOT);
return FlinkOptions.TIME_ELAPSED.equals(strategy) || FlinkOptions.NUM_OR_TIME.equals(strategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -221,11 +223,13 @@ public void notifyCheckpointComplete(long checkpointId) {
// the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
final boolean committed = commitInstant(this.instant, checkpointId);

if (tableState.scheduleCompaction) {
// if async compaction is on, schedule the compaction
CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
}

if (committed) {
if (tableState.scheduleCompaction) {
// if async compaction is on, schedule the compaction
writeClient.scheduleCompaction(Option.empty());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add a UT here, for both commits with / without data.

// start new instant.
startInstant();
// sync Hive if is enabled
Expand Down Expand Up @@ -557,6 +561,7 @@ private static class TableState implements Serializable {
final boolean scheduleCompaction;
final boolean syncHive;
final boolean syncMetadata;
final boolean isDeltaTimeCompaction;

private TableState(Configuration conf) {
this.operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
Expand All @@ -566,6 +571,7 @@ private TableState(Configuration conf) {
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
}

public static TableState create(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ public static void main(String[] args) throws Exception {
// judge whether have operation
// to compute the compaction instant time and do compaction.
if (cfg.schedule) {
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
if (compactionInstantTimeOption.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this logic ?

boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
if (!scheduled) {
// do nothing.
LOG.info("No compaction plan for this job ");
return;
}
table.getMetaClient().reloadActiveTimeline();
}
table.getMetaClient().reloadActiveTimeline();
}

// fetch the instant based on the configured execution sequence
Expand Down
34 changes: 32 additions & 2 deletions hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.util;

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
Expand Down Expand Up @@ -46,10 +47,36 @@ public class CompactionUtil {

private static final Logger LOG = LoggerFactory.getLogger(CompactionUtil.class);

/**
* Schedules a new compaction instant.
*
* @param metaClient The metadata client
* @param writeClient The write client
* @param deltaTimeCompaction Whether the compaction is trigger by elapsed delta time
* @param committed Whether the last instant was committed successfully
*/
public static void scheduleCompaction(
HoodieTableMetaClient metaClient,
HoodieFlinkWriteClient<?> writeClient,
boolean deltaTimeCompaction,
boolean committed) {
if (committed) {
writeClient.scheduleCompaction(Option.empty());
} else if (deltaTimeCompaction) {
// if there are no new commits and the compaction trigger strategy is based on elapsed delta time,
// schedules the compaction anyway.
metaClient.reloadActiveTimeline();
Option<String> compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
if (compactionInstantTime.isPresent()) {
writeClient.scheduleCompactionAtInstant(compactionInstantTime.get(), Option.empty());
}
}
}

/**
* Gets compaction Instant time.
*/
public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) {
public static Option<String> getCompactionInstantTime(HoodieTableMetaClient metaClient) {
Option<HoodieInstant> firstPendingInstant = metaClient.getCommitsTimeline()
.filterPendingExcludingCompaction().firstInstant();
Option<HoodieInstant> lastCompleteInstant = metaClient.getActiveTimeline().getWriteTimeline()
Expand All @@ -59,8 +86,11 @@ public static String getCompactionInstantTime(HoodieTableMetaClient metaClient)
String lastCompleteTimestamp = lastCompleteInstant.get().getTimestamp();
// Committed and pending compaction instants should have strictly lower timestamps
return StreamerUtil.medianInstantTime(firstPendingTimestamp, lastCompleteTimestamp);
} else if (!lastCompleteInstant.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change ? For what reason the lastCompleteInstant can be null ?

LOG.info("No instants to schedule the compaction plan");
return Option.empty();
} else {
return HoodieActiveTimeline.createNewInstantTime();
return Option.of(HoodieActiveTimeline.createNewInstantTime());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,14 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw
/**
* Returns the median instant time between the given two instant time.
*/
public static String medianInstantTime(String highVal, String lowVal) {
public static Option<String> medianInstantTime(String highVal, String lowVal) {
try {
long high = HoodieActiveTimeline.parseInstantTime(highVal).getTime();
long low = HoodieActiveTimeline.parseInstantTime(lowVal).getTime();
ValidationUtils.checkArgument(high > low,
"Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]");
long median = low + (high - low) / 2;
return HoodieActiveTimeline.formatInstantTime(new Date(median));
return low >= median ? Option.empty() : Option.of(HoodieActiveTimeline.formatInstantTime(new Date(median)));
} catch (ParseException e) {
throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,17 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
// infer changelog mode
CompactionUtil.inferChangelogMode(conf, metaClient);

HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);

boolean scheduled = false;
// judge whether have operation
// To compute the compaction instant time and do compaction.
String compactionInstantTime = CompactionUtil.getCompactionInstantTime(metaClient);
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
boolean scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
Option<String> compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient);
if (compactionInstantTimeOption.isPresent()) {
scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty());
}

String compactionInstantTime = compactionInstantTimeOption.get();

assertTrue(scheduled, "The compaction plan should be scheduled");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ public void checkpointFunction(long checkpointId) throws Exception {
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
}

public void endInput() {
writeFunction.endInput();
}

public void checkpointComplete(long checkpointId) {
stateInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
coordinator.notifyCheckpointComplete(checkpointId);
Expand All @@ -248,6 +252,11 @@ public void subTaskFails(int taskID) throws Exception {
public void close() throws Exception {
coordinator.close();
ioManager.close();
bucketAssignerFunction.close();
writeFunction.close();
if (compactFunctionWrapper != null) {
compactFunctionWrapper.close();
}
}

public StreamWriteOperatorCoordinator getCoordinator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieFlinkTable;
Expand All @@ -33,14 +35,16 @@
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -60,16 +64,24 @@ public class TestCompactionUtil {
@TempDir
File tempFile;

@BeforeEach
void beforeEach() throws IOException {
beforeEach(Collections.emptyMap());
}

void beforeEach(Map<String, String> options) throws IOException {
this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
options.forEach((k, v) -> conf.setString(k, v));

StreamerUtil.initTableIfNotExists(conf);

this.table = FlinkTables.createTable(conf);
this.metaClient = table.getMetaClient();
}

@Test
void rollbackCompaction() {
void rollbackCompaction() throws Exception {
beforeEach();
List<String> oriInstants = IntStream.range(0, 3)
.mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
List<HoodieInstant> instants = metaClient.getActiveTimeline()
Expand All @@ -88,7 +100,8 @@ void rollbackCompaction() {
}

@Test
void rollbackEarliestCompaction() {
void rollbackEarliestCompaction() throws Exception {
beforeEach();
conf.setInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS, 0);
List<String> oriInstants = IntStream.range(0, 3)
.mapToObj(i -> generateCompactionPlan()).collect(Collectors.toList());
Expand All @@ -109,6 +122,33 @@ void rollbackEarliestCompaction() {
assertThat(instantTime, is(oriInstants.get(0)));
}

@Test
void testScheduleCompaction() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), FlinkOptions.TIME_ELAPSED);
options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), "0");
beforeEach(options);

// write a commit with data first
TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf);

HoodieFlinkWriteClient<?> writeClient = StreamerUtil.createWriteClient(conf);
CompactionUtil.scheduleCompaction(metaClient, writeClient, true, true);

Option<HoodieInstant> pendingCompactionInstant = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant();
assertTrue(pendingCompactionInstant.isPresent(), "A compaction plan expects to be scheduled");

// write another commit with data and start a new instant
TestData.writeDataAsBatch(TestData.DATA_SET_INSERT, conf);
TimeUnit.SECONDS.sleep(3); // in case the instant time interval is too close
writeClient.startCommit();

CompactionUtil.scheduleCompaction(metaClient, writeClient, true, false);
int numCompactionCommits = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants();
assertThat("Two compaction plan expects to be scheduled", numCompactionCommits, is(2));
}

/**
* Generates a compaction plan on the timeline and returns its instant time.
*/
Expand Down
31 changes: 31 additions & 0 deletions hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,37 @@ public static void writeData(
funcWrapper.close();
}

/**
* Write a list of row data with Hoodie format base on the given configuration.
*
* <p>The difference with {@link #writeData} is that it flush data using #endInput, and it
* does not generate inflight instant.
*
* @param dataBuffer The data buffer to write
* @param conf The flink configuration
* @throws Exception if error occurs
*/
public static void writeDataAsBatch(
List<RowData> dataBuffer,
Configuration conf) throws Exception {
StreamWriteFunctionWrapper<RowData> funcWrapper = new StreamWriteFunctionWrapper<>(
conf.getString(FlinkOptions.PATH),
conf);
funcWrapper.openFunction();

for (RowData rowData : dataBuffer) {
funcWrapper.invoke(rowData);
}

// this triggers the data write and event send
funcWrapper.endInput();

final OperatorEvent nextEvent = funcWrapper.getNextEvent();
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);

funcWrapper.close();
}

private static String toStringSafely(Object obj) {
return obj == null ? "null" : obj.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testInitTableIfNotExists() throws IOException {
void testMedianInstantTime() {
String higher = "20210705125921";
String lower = "20210705125806";
String median1 = StreamerUtil.medianInstantTime(higher, lower);
String median1 = StreamerUtil.medianInstantTime(higher, lower).get();
assertThat(median1, is("20210705125843"));
// test symmetry
assertThrows(IllegalArgumentException.class,
Expand Down