Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
Expand All @@ -28,8 +29,7 @@
import org.slf4j.LoggerFactory;

import java.util.List;

import static java.util.stream.Collectors.toList;
import java.util.stream.Collectors;

/**
* Flink hudi compaction source function.
Expand All @@ -53,18 +53,12 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement
protected static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class);

/**
* Compaction instant time.
*/
private final String compactionInstantTime;

/**
* The compaction plan.
* compaction plan instant -> compaction plan
*/
private final HoodieCompactionPlan compactionPlan;
private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;

public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
this.compactionPlan = compactionPlan;
this.compactionInstantTime = compactionInstantTime;
public CompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> compactionPlans) {
this.compactionPlans = compactionPlans;
}

@Override
Expand All @@ -74,11 +68,14 @@ public void open(Configuration parameters) throws Exception {

@Override
public void run(SourceContext sourceContext) throws Exception {
List<CompactionOperation> operations = this.compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanFunction compacting " + operations + " files");
for (CompactionOperation operation : operations) {
sourceContext.collect(new CompactionPlanEvent(compactionInstantTime, operation));
for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
HoodieCompactionPlan compactionPlan = pair.getRight();
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
LOG.info("CompactionPlanFunction compacting " + operations + " files");
for (CompactionOperation operation : operations) {
sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;

import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;

/**
* Configurations for Hoodie Flink compaction.
Expand Down Expand Up @@ -102,7 +102,7 @@ public class FlinkCompactionConfig extends Configuration {
@Parameter(names = {"--seq"}, description = "Compaction plan execution sequence, two options are supported:\n"
+ "1). FIFO: execute the oldest plan first;\n"
+ "2). LIFO: execute the latest plan first, by default LIFO", required = false)
public String compactionSeq = SEQ_LIFO;
public String compactionSeq = SEQ_FIFO;

@Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default")
public Boolean serviceMode = false;
Expand All @@ -111,21 +111,21 @@ public class FlinkCompactionConfig extends Configuration {
description = "Min compaction interval of async compaction service, default 10 minutes")
public Integer minCompactionIntervalSeconds = 600;

@Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+ "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan."
+ "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)."
+ "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan"
+ "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant")
public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName();
@Parameter(names = {"--plan-select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+ "1). num_instants: select plans by specific number of instants, it's the default strategy with 1 instant at a time;\n"
+ "3). all: Select all pending compaction plan;\n"
+ "4). instants: Select the compaction plan by specific instants")
public String compactionPlanSelectStrategy = CompactionPlanStrategy.NUM_INSTANTS;

@Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction."
@Parameter(names = {"--max-num-plans"}, description = "Max number of compaction plan would be selected in compaction."
+ "It's only effective for MultiCompactionPlanSelectStrategy.")
public Integer compactionPlanMaxSelect = 10;
public Integer maxNumCompactionPlans = 1;

@Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than"
+ "one instant in a time by using comma."
+ "It's only effective for InstantCompactionPlanSelectStrategy.")
@Parameter(names = {"--target-instants"}, description = "Specify the compaction plan instants to compact,\n"
+ "Multiple instants are supported by comma separated instant time.\n"
+ "It's only effective for 'instants' plan selection strategy.")
public String compactionPlanInstant;

@Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

package org.apache.hudi.sink.compact;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
Expand All @@ -31,11 +26,10 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
Expand Down Expand Up @@ -228,9 +222,8 @@ private void compact() throws Exception {
}

// fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline();
List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
.select(timeline.filterPendingCompactionTimeline(), cfg);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
List<HoodieInstant> requested = CompactionPlanStrategies.getStrategy(cfg).select(pendingCompactionTimeline);
if (requested.isEmpty()) {
// do nothing.
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
Expand All @@ -240,7 +233,7 @@ private void compact() throws Exception {
List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
compactionInstantTimes.forEach(timestamp -> {
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
if (timeline.containsInstant(inflightInstant)) {
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
Expand All @@ -254,13 +247,11 @@ private void compact() throws Exception {
try {
return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
} catch (IOException e) {
throw new HoodieException(e);
throw new HoodieException("Get compaction plan at instant " + timestamp + " error", e);
}
})
// reject empty compaction plan
.filter(pair -> !(pair.getRight() == null
|| pair.getRight().getOperations() == null
|| pair.getRight().getOperations().isEmpty()))
.filter(pair -> validCompactionPlan(pair.getRight()))
.collect(Collectors.toList());

if (compactionPlans.isEmpty()) {
Expand All @@ -270,7 +261,6 @@ private void compact() throws Exception {
}

List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
for (HoodieInstant instant : instants) {
if (!pendingCompactionTimeline.containsInstant(instant)) {
// this means that the compaction plan was written to auxiliary path(.tmp)
Expand All @@ -297,34 +287,19 @@ private void compact() throws Exception {
}
table.getMetaClient().reloadActiveTimeline();

// use side-output to make operations that is in the same plan to be placed in the same stream
// keyby() cannot sure that different operations are in the different stream
DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
env.addSource(new CompactionPlanSourceFunction(compactionPlans))
.name("compaction_source")
.uid("uid_compaction_source");

SingleOutputStreamOperator<Void> operator = source.rebalance()
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionParallelism)
.process(new ProcessFunction<CompactionCommitEvent, Void>() {
@Override
public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
}
})
.name("group_by_compaction_plan")
.uid("uid_group_by_compaction_plan")
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1);

compactionPlans.forEach(pair ->
operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
.addSink(new CompactionCommitSink(conf))
.name("clean_commits " + pair.getLeft())
.uid("uid_clean_commits_" + pair.getLeft())
.setParallelism(1));

env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
}

Expand All @@ -342,4 +317,8 @@ public void shutDown() {
shutdownAsyncService(false);
}
}

private static boolean validCompactionPlan(HoodieCompactionPlan plan) {
return plan != null && plan.getOperations() != null && plan.getOperations().size() > 0;
}
}

This file was deleted.

This file was deleted.

Loading