diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 651385955657..02041690f1de 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -23,6 +23,7 @@ import com.beust.jcommander.Parameter; import org.apache.flink.configuration.Configuration; +import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy; /** * Configurations for Hoodie Flink compaction. @@ -110,6 +111,21 @@ public class FlinkCompactionConfig extends Configuration { description = "Min compaction interval of async compaction service, default 10 minutes") public Integer minCompactionIntervalSeconds = 600; + @Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n" + + "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan." + + "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)." + + "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan" + + "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant") + public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName(); + + @Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction." + + "It's only effective for MultiCompactionPlanSelectStrategy.") + public Integer compactionPlanMaxSelect = 10; + + @Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than" + + "one instant in a time by using comma." + + "It's only effective for InstantCompactionPlanSelectStrategy.") + public String compactionPlanInstant; @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false) public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 546136e416b7..f56b5a2f0fb1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -18,6 +18,11 @@ package org.apache.hudi.sink.compact; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; import org.apache.hudi.async.HoodieAsyncTableService; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; @@ -26,9 +31,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; @@ -42,9 +49,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * Flink hudi compaction program that can be executed manually. @@ -218,74 +228,104 @@ private void compact() throws Exception { } // fetch the instant based on the configured execution sequence - HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - Option requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant(); - if (!requested.isPresent()) { + HoodieTimeline timeline = table.getActiveTimeline(); + List requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy)) + .select(timeline.filterPendingCompactionTimeline(), cfg); + if (requested.isEmpty()) { // do nothing. LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option"); return; } - String compactionInstantTime = requested.get().getTimestamp(); - - HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); - if (timeline.containsInstant(inflightInstant)) { - LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]"); - table.rollbackInflightCompaction(inflightInstant); - table.getMetaClient().reloadActiveTimeline(); - } + List compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + compactionInstantTimes.forEach(timestamp -> { + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp); + if (timeline.containsInstant(inflightInstant)) { + LOG.info("Rollback inflight compaction instant: [" + timestamp + "]"); + table.rollbackInflightCompaction(inflightInstant); + table.getMetaClient().reloadActiveTimeline(); + } + }); - // generate compaction plan + // generate timestamp and compaction plan pair // should support configurable commit metadata - HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( - table.getMetaClient(), compactionInstantTime); - - if (compactionPlan == null || (compactionPlan.getOperations() == null) - || (compactionPlan.getOperations().isEmpty())) { + List> compactionPlans = compactionInstantTimes.stream() + .map(timestamp -> { + try { + return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp)); + } catch (IOException e) { + throw new HoodieException(e); + } + }) + // reject empty compaction plan + .filter(pair -> !(pair.getRight() == null + || pair.getRight().getOperations() == null + || pair.getRight().getOperations().isEmpty())) + .collect(Collectors.toList()); + + if (compactionPlans.isEmpty()) { // No compaction plan, do nothing and return. - LOG.info("No compaction plan for instant " + compactionInstantTime); + LOG.info("No compaction plan for instant " + String.join(",", compactionInstantTimes)); return; } - HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + List instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList()); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - if (!pendingCompactionTimeline.containsInstant(instant)) { - // this means that the compaction plan was written to auxiliary path(.tmp) - // but not the meta path(.hoodie), this usually happens when the job crush - // exceptionally. - - // clean the compaction plan in auxiliary path and cancels the compaction. - - LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" - + "Clean the compaction plan in auxiliary path and cancels the compaction"); - CompactionUtil.cleanInstant(table.getMetaClient(), instant); - return; + for (HoodieInstant instant : instants) { + if (!pendingCompactionTimeline.containsInstant(instant)) { + // this means that the compaction plan was written to auxiliary path(.tmp) + // but not the meta path(.hoodie), this usually happens when the job crush + // exceptionally. + // clean the compaction plan in auxiliary path and cancels the compaction. + LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the compaction plan in auxiliary path and cancels the compaction"); + CompactionUtil.cleanInstant(table.getMetaClient(), instant); + return; + } } // get compactionParallelism. int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 - ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); + ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum()) + : conf.getInteger(FlinkOptions.COMPACTION_TASKS); - LOG.info("Start to compaction for instant " + compactionInstantTime); + LOG.info("Start to compaction for instant " + compactionInstantTimes); // Mark instant as compaction inflight - table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + for (HoodieInstant instant : instants) { + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + } table.getMetaClient().reloadActiveTimeline(); - env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) + // use side-output to make operations that is in the same plan to be placed in the same stream + // keyby() cannot sure that different operations are in the different stream + DataStream source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans)) .name("compaction_source") - .uid("uid_compaction_source") - .rebalance() + .uid("uid_compaction_source"); + + SingleOutputStreamOperator operator = source.rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator<>(new CompactFunction(conf))) .setParallelism(compactionParallelism) - .addSink(new CompactionCommitSink(conf)) - .name("clean_commits") - .uid("uid_clean_commits") + .process(new ProcessFunction() { + @Override + public void processElement(CompactionCommitEvent event, ProcessFunction.Context context, Collector out) { + context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event); + } + }) + .name("group_by_compaction_plan") + .uid("uid_group_by_compaction_plan") .setParallelism(1); - env.execute("flink_hudi_compaction_" + compactionInstantTime); + compactionPlans.forEach(pair -> + operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class))) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits " + pair.getLeft()) + .uid("uid_clean_commits_" + pair.getLeft()) + .setParallelism(1)); + + env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes)); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java new file mode 100644 index 000000000000..8a8c3f6b4eeb --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact; + +import static java.util.stream.Collectors.toList; + +import java.util.List; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.util.collection.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flink hudi compaction source function. + * + *

This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task + * event {@link CompactionPlanEvent} to downstream operators. + * + *

The compaction instant time is specified explicitly with strategies: + * + *

    + *
  • If the timeline has no inflight instants, + * use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()} + * as the instant time;
  • + *
  • If the timeline has inflight instants, + * use the median instant time between [last complete instant time, earliest inflight instant time] + * as the instant time.
  • + *
+ */ +public class MultiCompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction { + + protected static final Logger LOG = LoggerFactory.getLogger(MultiCompactionPlanSourceFunction.class); + + /** + * compaction plan instant -> compaction plan + */ + private final List> compactionPlans; + + public MultiCompactionPlanSourceFunction(List> compactionPlans) { + this.compactionPlans = compactionPlans; + } + + @Override + public void open(Configuration parameters) throws Exception { + // no operation + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + for (Pair pair : compactionPlans) { + HoodieCompactionPlan compactionPlan = pair.getRight(); + List operations = compactionPlan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); + LOG.info("CompactionPlanFunction compacting " + operations + " files"); + for (CompactionOperation operation : operations) { + sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation)); + } + } + } + + @Override + public void close() throws Exception { + // no operation + } + + @Override + public void cancel() { + // no operation + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java new file mode 100644 index 000000000000..23b6708ff304 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; + +/** + * Select all pending compaction plan to compact + */ +public class AllPendingCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + @Override + public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + return pendingCompactionTimeline.getInstants().collect(Collectors.toList()); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java new file mode 100644 index 000000000000..a41fcef19813 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.List; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; + +/** + * CompactionRangeStrategy + */ +public interface CompactionPlanSelectStrategy { + /** + * Define how to select compaction plan to compact + */ + List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config); +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java new file mode 100644 index 000000000000..45382b70c4de --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.sink.compact.HoodieFlinkCompactor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Specify the compaction plan instant to compact + */ +public class InstantCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); + + @Override + public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) { + LOG.warn("None instant is selected"); + return Collections.emptyList(); + } + List instants = Arrays.asList(config.compactionPlanInstant.split(",")); + return pendingCompactionTimeline.getInstants() + .filter(instant -> instants.contains(instant.getTimestamp())) + .collect(Collectors.toList()); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java new file mode 100644 index 000000000000..ee0e93653f87 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.util.CompactionUtil; + +/** + * Select multi compaction plan to compact + */ +public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + @Override + public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + List pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList()); + if (CompactionUtil.isLIFO(config.compactionSeq)) { + Collections.reverse(pendingCompactionPlanInstants); + } + int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size()); + return pendingCompactionPlanInstants.subList(0, range); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java new file mode 100644 index 000000000000..7ca939866cee --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.Collections; +import java.util.List; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.util.CompactionUtil; + +/** + * Select one compaction plan to compact + */ +public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + @Override + public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + Option compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq) + ? pendingCompactionTimeline.lastInstant() + : pendingCompactionTimeline.firstInstant(); + if (compactionPlanInstant.isPresent()) { + return Collections.singletonList(compactionPlanInstant.get()); + } + return Collections.emptyList(); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index e1e86ce32bd8..43e4ed511452 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -18,6 +18,11 @@ package org.apache.hudi.sink.compact; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -25,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; @@ -49,6 +55,7 @@ import java.io.File; import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,6 +74,8 @@ public class ITTestHoodieFlinkCompactor { private static final Map> EXPECTED2 = new HashMap<>(); + private static final Map> EXPECTED3 = new HashMap<>(); + static { EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); @@ -77,6 +86,12 @@ public class ITTestHoodieFlinkCompactor { EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2")); EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3")); EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4")); + + EXPECTED3.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); + EXPECTED3.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); + EXPECTED3.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3")); + EXPECTED3.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4")); + EXPECTED3.put("par5", Arrays.asList("id12,par5,id12,Tony,27,9000,par5", "id13,par5,id13,Jenny,72,10000,par5")); } @TempDir @@ -203,4 +218,106 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce TestData.checkWrittenFullData(tempFile, EXPECTED2); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception { + // Create hoodie table and insert into data. + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + ""); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + + TimeUnit.SECONDS.sleep(3); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkCompactionConfig cfg = new FlinkCompactionConfig(); + cfg.path = tempFile.getAbsolutePath(); + Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); + conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + CompactionUtil.setAvroSchema(conf, metaClient); + CompactionUtil.inferChangelogMode(conf, metaClient); + + List compactionInstantTimeList = new ArrayList<>(2); + + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + + compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); + + // insert a new record to new partition, so that we can generate a new compaction plan + String insertT1ForNewPartition = "insert into t1 values\n" + + "('id12','Tony',27,TIMESTAMP '1970-01-01 00:00:09','par5'),\n" + + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')"; + tableEnv.executeSql(insertT1ForNewPartition).await(); + + TimeUnit.SECONDS.sleep(3); + + compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); + + HoodieFlinkTable table = writeClient.getHoodieTable(); + + List> compactionPlans = new ArrayList<>(2); + for (String compactionInstantTime : compactionInstantTimeList) { + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(table.getMetaClient(), compactionInstantTime); + compactionPlans.add(Pair.of(compactionInstantTime, plan)); + } + + // Mark instant as compaction inflight + for (String compactionInstantTime : compactionInstantTimeList) { + HoodieInstant hoodieInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + table.getActiveTimeline().transitionCompactionRequestedToInflight(hoodieInstant); + } + table.getMetaClient().reloadActiveTimeline(); + + DataStream source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans)) + .name("compaction_source") + .uid("uid_compaction_source"); + SingleOutputStreamOperator operator = source.rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new ProcessOperator<>(new CompactFunction(conf))) + .setParallelism(1) + .process(new ProcessFunction() { + @Override + public void processElement(CompactionCommitEvent event, ProcessFunction.Context context, Collector out) { + context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event); + } + }) + .name("group_by_compaction_plan") + .uid("uid_group_by_compaction_plan") + .setParallelism(1); + compactionPlans.forEach(pair -> + operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class))) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits " + pair.getLeft()) + .uid("uid_clean_commits_" + pair.getLeft()) + .setParallelism(1)); + + env.execute("flink_hudi_compaction"); + writeClient.close(); + TestData.checkWrittenFullData(tempFile, EXPECTED3); + } + + private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient writeClient) { + boolean scheduled = false; + // judge whether have operation + // To compute the compaction instant time and do compaction. + Option compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); + if (compactionInstantTimeOption.isPresent()) { + scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty()); + } + assertTrue(scheduled, "The compaction plan should be scheduled"); + return compactionInstantTimeOption.get(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java new file mode 100644 index 000000000000..3ac9f6c6663e --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.List; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.sink.compact.strategy.AllPendingCompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.InstantCompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.MultiCompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Test case for every {@link CompactionPlanSelectStrategy} implements + */ +public class TestCompactionPlanSelectStrategy { + private HoodieTimeline timeline; + private HoodieTimeline emptyTimeline; + private HoodieTimeline allCompleteTimeline; + + private static final HoodieInstant INSTANT_001 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"); + private static final HoodieInstant INSTANT_002 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "002"); + private static final HoodieInstant INSTANT_003 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "003"); + private static final HoodieInstant INSTANT_004 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "004"); + private static final HoodieInstant INSTANT_005 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, "005"); + private static final HoodieInstant INSTANT_006 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "006"); + + @BeforeEach + public void beforeEach() { + timeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_005, INSTANT_006); + emptyTimeline = new MockHoodieActiveTimeline(); + allCompleteTimeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_005); + } + + @Test + void testSingleCompactionPlanSelectStrategy() { + HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + + SingleCompactionPlanSelectStrategy strategy = new SingleCompactionPlanSelectStrategy(); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + + HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + } + + @Test + void testMultiCompactionPlanSelectStrategy() { + HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + compactionConfig.compactionPlanMaxSelect = 2; + + MultiCompactionPlanSelectStrategy strategy = new MultiCompactionPlanSelectStrategy(); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + + HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + } + + @Test + void testAllPendingCompactionPlanSelectStrategy() { + HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + + AllPendingCompactionPlanSelectStrategy strategy = new AllPendingCompactionPlanSelectStrategy(); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_006}, + strategy.select(pendingCompactionTimeline, compactionConfig)); + + HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + + HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + } + + @Test + void testInstantCompactionPlanSelectStrategy() { + HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + compactionConfig.compactionPlanInstant = "004"; + + InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy(); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionPlanInstant = "002,003"; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionPlanInstant = "002,005"; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionPlanInstant = "005"; + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline, compactionConfig)); + } + + private void assertHoodieInstantsEquals(HoodieInstant[] expected, List actual) { + assertEquals(expected.length, actual.size()); + for (int index = 0; index < expected.length; index++) { + assertHoodieInstantEquals(expected[index], actual.get(index)); + } + } + + private void assertHoodieInstantEquals(HoodieInstant expected, HoodieInstant actual) { + assertEquals(expected.getState(), actual.getState()); + assertEquals(expected.getAction(), actual.getAction()); + assertEquals(expected.getTimestamp(), actual.getTimestamp()); + } + + private static final class MockHoodieActiveTimeline extends HoodieActiveTimeline { + public MockHoodieActiveTimeline(HoodieInstant... instants) { + super(); + setInstants(Arrays.asList(instants)); + } + } +}