Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -80,18 +81,18 @@ public void processElement(StreamRecord<Object> streamRecord) {
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws IOException {
public void notifyCheckpointComplete(long checkpointId) {
try {
scheduleCompaction(checkpointId);
HoodieFlinkTable hoodieTable = writeClient.getHoodieTable();
CompactionUtil.rollbackCompaction(hoodieTable, conf);
scheduleCompaction(hoodieTable, checkpointId);
} catch (Throwable throwable) {
// make it fail safe
LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable);
}
}

private void scheduleCompaction(long checkpointId) throws IOException {
HoodieFlinkTable<?> table = writeClient.getHoodieTable();

private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
// the last instant takes the highest priority.
Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
Expand All @@ -46,7 +45,6 @@ public class HoodieFlinkCompactor {

protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class);

@SuppressWarnings("unchecked, rawtypes")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Expand All @@ -73,17 +71,7 @@ public static void main(String[] args) throws Exception {

// rolls back inflight compaction first
// condition: the schedule compaction is in INFLIGHT state for max delta seconds.
String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline()
.filterPendingCompactionTimeline()
.filter(instant ->
instant.getState() == HoodieInstant.State.INFLIGHT
&& StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
writeClient.rollbackInflightCompaction(inflightInstant, table);
table.getMetaClient().reloadActiveTimeline();
});
CompactionUtil.rollbackCompaction(table, conf);

// judge whether have operation
// to compute the compaction instant time and do compaction.
Expand All @@ -94,6 +82,8 @@ public static void main(String[] args) throws Exception {
LOG.info("No compaction plan for this job ");
return;
}

table.getMetaClient().reloadActiveTimeline();
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
Expand Down
17 changes: 17 additions & 0 deletions hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.HoodieFlinkTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,4 +98,19 @@ public static void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant
throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e);
}
}

public static void rollbackCompaction(HoodieFlinkTable<?> table, Configuration conf) {
String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
.filterPendingCompactionTimeline()
.filter(instant ->
instant.getState() == HoodieInstant.State.INFLIGHT
&& StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
LOG.info("Rollback the pending compaction instant: " + inflightInstant);
table.rollback(table.getContext(), HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true);
table.getMetaClient().reloadActiveTimeline();
});
}
}