From 34e25e662700a4b902528e2eed38b1c177ce9cf0 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Thu, 7 Jul 2022 16:42:08 +0800 Subject: [PATCH] [minor] following 4152, refactor the clazz about plan selection strategy --- .../compact/CompactionPlanSourceFunction.java | 31 +++---- .../sink/compact/FlinkCompactionConfig.java | 26 +++--- .../sink/compact/HoodieFlinkCompactor.java | 53 ++++------- .../MultiCompactionPlanSourceFunction.java | 90 ------------------- ...llPendingCompactionPlanSelectStrategy.java | 35 -------- .../strategy/CompactionPlanStrategies.java | 74 +++++++++++++++ ...ategy.java => CompactionPlanStrategy.java} | 16 ++-- .../InstantCompactionPlanSelectStrategy.java | 50 ----------- .../MultiCompactionPlanSelectStrategy.java | 42 --------- .../SingleCompactionPlanSelectStrategy.java | 43 --------- .../compact/ITTestHoodieFlinkCompactor.java | 48 +++------- ...y.java => TestCompactionPlanStrategy.java} | 59 ++++++------ 12 files changed, 168 insertions(+), 399 deletions(-) delete mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java delete mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java rename hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/{CompactionPlanSelectStrategy.java => CompactionPlanStrategy.java} (81%) delete mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java delete mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java delete mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java rename hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/{TestCompactionPlanSelectStrategy.java => TestCompactionPlanStrategy.java} (74%) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java index fe55089988e53..883ba8bd114cd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.util.collection.Pair; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.Configuration; @@ -28,8 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.List; - -import static java.util.stream.Collectors.toList; +import java.util.stream.Collectors; /** * Flink hudi compaction source function. @@ -53,18 +53,12 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement protected static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class); /** - * Compaction instant time. - */ - private final String compactionInstantTime; - - /** - * The compaction plan. + * compaction plan instant -> compaction plan */ - private final HoodieCompactionPlan compactionPlan; + private final List> compactionPlans; - public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) { - this.compactionPlan = compactionPlan; - this.compactionInstantTime = compactionInstantTime; + public CompactionPlanSourceFunction(List> compactionPlans) { + this.compactionPlans = compactionPlans; } @Override @@ -74,11 +68,14 @@ public void open(Configuration parameters) throws Exception { @Override public void run(SourceContext sourceContext) throws Exception { - List operations = this.compactionPlan.getOperations().stream() - .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("CompactionPlanFunction compacting " + operations + " files"); - for (CompactionOperation operation : operations) { - sourceContext.collect(new CompactionPlanEvent(compactionInstantTime, operation)); + for (Pair pair : compactionPlans) { + HoodieCompactionPlan compactionPlan = pair.getRight(); + List operations = compactionPlan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); + LOG.info("CompactionPlanFunction compacting " + operations + " files"); + for (CompactionOperation operation : operations) { + sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation)); + } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 02041690f1dec..449b06846156c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -20,10 +20,10 @@ import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy; import com.beust.jcommander.Parameter; import org.apache.flink.configuration.Configuration; -import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy; /** * Configurations for Hoodie Flink compaction. @@ -102,7 +102,7 @@ public class FlinkCompactionConfig extends Configuration { @Parameter(names = {"--seq"}, description = "Compaction plan execution sequence, two options are supported:\n" + "1). FIFO: execute the oldest plan first;\n" + "2). LIFO: execute the latest plan first, by default LIFO", required = false) - public String compactionSeq = SEQ_LIFO; + public String compactionSeq = SEQ_FIFO; @Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default") public Boolean serviceMode = false; @@ -111,21 +111,21 @@ public class FlinkCompactionConfig extends Configuration { description = "Min compaction interval of async compaction service, default 10 minutes") public Integer minCompactionIntervalSeconds = 600; - @Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n" - + "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan." - + "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)." - + "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan" - + "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant") - public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName(); + @Parameter(names = {"--plan-select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n" + + "1). num_instants: select plans by specific number of instants, it's the default strategy with 1 instant at a time;\n" + + "3). all: Select all pending compaction plan;\n" + + "4). instants: Select the compaction plan by specific instants") + public String compactionPlanSelectStrategy = CompactionPlanStrategy.NUM_INSTANTS; - @Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction." + @Parameter(names = {"--max-num-plans"}, description = "Max number of compaction plan would be selected in compaction." + "It's only effective for MultiCompactionPlanSelectStrategy.") - public Integer compactionPlanMaxSelect = 10; + public Integer maxNumCompactionPlans = 1; - @Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than" - + "one instant in a time by using comma." - + "It's only effective for InstantCompactionPlanSelectStrategy.") + @Parameter(names = {"--target-instants"}, description = "Specify the compaction plan instants to compact,\n" + + "Multiple instants are supported by comma separated instant time.\n" + + "It's only effective for 'instants' plan selection strategy.") public String compactionPlanInstant; + @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false) public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index f56b5a2f0fb1d..e2d2972a0de43 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -18,11 +18,6 @@ package org.apache.hudi.sink.compact; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; import org.apache.hudi.async.HoodieAsyncTableService; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; @@ -31,11 +26,10 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; @@ -228,9 +222,8 @@ private void compact() throws Exception { } // fetch the instant based on the configured execution sequence - HoodieTimeline timeline = table.getActiveTimeline(); - List requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy)) - .select(timeline.filterPendingCompactionTimeline(), cfg); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + List requested = CompactionPlanStrategies.getStrategy(cfg).select(pendingCompactionTimeline); if (requested.isEmpty()) { // do nothing. LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option"); @@ -240,7 +233,7 @@ private void compact() throws Exception { List compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); compactionInstantTimes.forEach(timestamp -> { HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp); - if (timeline.containsInstant(inflightInstant)) { + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { LOG.info("Rollback inflight compaction instant: [" + timestamp + "]"); table.rollbackInflightCompaction(inflightInstant); table.getMetaClient().reloadActiveTimeline(); @@ -254,13 +247,11 @@ private void compact() throws Exception { try { return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp)); } catch (IOException e) { - throw new HoodieException(e); + throw new HoodieException("Get compaction plan at instant " + timestamp + " error", e); } }) // reject empty compaction plan - .filter(pair -> !(pair.getRight() == null - || pair.getRight().getOperations() == null - || pair.getRight().getOperations().isEmpty())) + .filter(pair -> validCompactionPlan(pair.getRight())) .collect(Collectors.toList()); if (compactionPlans.isEmpty()) { @@ -270,7 +261,6 @@ private void compact() throws Exception { } List instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList()); - HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); for (HoodieInstant instant : instants) { if (!pendingCompactionTimeline.containsInstant(instant)) { // this means that the compaction plan was written to auxiliary path(.tmp) @@ -297,34 +287,19 @@ private void compact() throws Exception { } table.getMetaClient().reloadActiveTimeline(); - // use side-output to make operations that is in the same plan to be placed in the same stream - // keyby() cannot sure that different operations are in the different stream - DataStream source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans)) + env.addSource(new CompactionPlanSourceFunction(compactionPlans)) .name("compaction_source") - .uid("uid_compaction_source"); - - SingleOutputStreamOperator operator = source.rebalance() + .uid("uid_compaction_source") + .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator<>(new CompactFunction(conf))) .setParallelism(compactionParallelism) - .process(new ProcessFunction() { - @Override - public void processElement(CompactionCommitEvent event, ProcessFunction.Context context, Collector out) { - context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event); - } - }) - .name("group_by_compaction_plan") - .uid("uid_group_by_compaction_plan") + .addSink(new CompactionCommitSink(conf)) + .name("compaction_commit") + .uid("uid_compaction_commit") .setParallelism(1); - compactionPlans.forEach(pair -> - operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class))) - .addSink(new CompactionCommitSink(conf)) - .name("clean_commits " + pair.getLeft()) - .uid("uid_clean_commits_" + pair.getLeft()) - .setParallelism(1)); - env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes)); } @@ -342,4 +317,8 @@ public void shutDown() { shutdownAsyncService(false); } } + + private static boolean validCompactionPlan(HoodieCompactionPlan plan) { + return plan != null && plan.getOperations() != null && plan.getOperations().size() > 0; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java deleted file mode 100644 index 8a8c3f6b4eeb3..0000000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.compact; - -import static java.util.stream.Collectors.toList; - -import java.util.List; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.common.model.CompactionOperation; -import org.apache.hudi.common.util.collection.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Flink hudi compaction source function. - * - *

This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task - * event {@link CompactionPlanEvent} to downstream operators. - * - *

The compaction instant time is specified explicitly with strategies: - * - *

    - *
  • If the timeline has no inflight instants, - * use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()} - * as the instant time;
  • - *
  • If the timeline has inflight instants, - * use the median instant time between [last complete instant time, earliest inflight instant time] - * as the instant time.
  • - *
- */ -public class MultiCompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction { - - protected static final Logger LOG = LoggerFactory.getLogger(MultiCompactionPlanSourceFunction.class); - - /** - * compaction plan instant -> compaction plan - */ - private final List> compactionPlans; - - public MultiCompactionPlanSourceFunction(List> compactionPlans) { - this.compactionPlans = compactionPlans; - } - - @Override - public void open(Configuration parameters) throws Exception { - // no operation - } - - @Override - public void run(SourceContext sourceContext) throws Exception { - for (Pair pair : compactionPlans) { - HoodieCompactionPlan compactionPlan = pair.getRight(); - List operations = compactionPlan.getOperations().stream() - .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("CompactionPlanFunction compacting " + operations + " files"); - for (CompactionOperation operation : operations) { - sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation)); - } - } - } - - @Override - public void close() throws Exception { - // no operation - } - - @Override - public void cancel() { - // no operation - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java deleted file mode 100644 index 23b6708ff304c..0000000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.compact.strategy; - -import java.util.List; -import java.util.stream.Collectors; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.sink.compact.FlinkCompactionConfig; - -/** - * Select all pending compaction plan to compact - */ -public class AllPendingCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { - @Override - public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { - return pendingCompactionTimeline.getInstants().collect(Collectors.toList()); - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java new file mode 100644 index 0000000000000..662dcabda3220 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategies.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.util.CompactionUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +/** + * Factory clazz for CompactionPlanStrategy. + */ +public class CompactionPlanStrategies { + private static final Logger LOG = LoggerFactory.getLogger(CompactionPlanStrategies.class); + + private CompactionPlanStrategies() { + } + + public static CompactionPlanStrategy getStrategy(FlinkCompactionConfig config) { + switch (config.compactionPlanSelectStrategy.toLowerCase(Locale.ROOT)) { + case CompactionPlanStrategy.ALL: + return pendingCompactionTimeline -> pendingCompactionTimeline.getInstants().collect(Collectors.toList()); + case CompactionPlanStrategy.INSTANTS: + return pendingCompactionTimeline -> { + if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) { + LOG.warn("None instant is selected"); + return Collections.emptyList(); + } + List instants = Arrays.asList(config.compactionPlanInstant.split(",")); + return pendingCompactionTimeline.getInstants() + .filter(instant -> instants.contains(instant.getTimestamp())) + .collect(Collectors.toList()); + }; + case CompactionPlanStrategy.NUM_INSTANTS: + return pendingCompactionTimeline -> { + List pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList()); + if (CompactionUtil.isLIFO(config.compactionSeq)) { + Collections.reverse(pendingCompactionPlanInstants); + } + int range = Math.min(config.maxNumCompactionPlans, pendingCompactionPlanInstants.size()); + return pendingCompactionPlanInstants.subList(0, range); + }; + default: + throw new UnsupportedOperationException("Unknown compaction plan strategy: " + + config.compactionPlanSelectStrategy + + ", supported strategies:[num_instants,instants,all]"); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategy.java similarity index 81% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategy.java index a41fcef198139..e209ff53391fc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanStrategy.java @@ -18,17 +18,21 @@ package org.apache.hudi.sink.compact.strategy; -import java.util.List; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.sink.compact.FlinkCompactionConfig; + +import java.util.List; /** - * CompactionRangeStrategy + * Compaction plan selection strategy. */ -public interface CompactionPlanSelectStrategy { +public interface CompactionPlanStrategy { + String ALL = "all"; + String INSTANTS = "instants"; + String NUM_INSTANTS = "num_instants"; + /** - * Define how to select compaction plan to compact + * Define how to select compaction plan to compact. */ - List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config); + List select(HoodieTimeline pendingCompactionTimeline); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java deleted file mode 100644 index 45382b70c4def..0000000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.compact.strategy; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.sink.compact.FlinkCompactionConfig; -import org.apache.hudi.sink.compact.HoodieFlinkCompactor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Specify the compaction plan instant to compact - */ -public class InstantCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { - protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); - - @Override - public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { - if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) { - LOG.warn("None instant is selected"); - return Collections.emptyList(); - } - List instants = Arrays.asList(config.compactionPlanInstant.split(",")); - return pendingCompactionTimeline.getInstants() - .filter(instant -> instants.contains(instant.getTimestamp())) - .collect(Collectors.toList()); - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java deleted file mode 100644 index ee0e93653f87d..0000000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.compact.strategy; - -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.sink.compact.FlinkCompactionConfig; -import org.apache.hudi.util.CompactionUtil; - -/** - * Select multi compaction plan to compact - */ -public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { - @Override - public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { - List pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList()); - if (CompactionUtil.isLIFO(config.compactionSeq)) { - Collections.reverse(pendingCompactionPlanInstants); - } - int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size()); - return pendingCompactionPlanInstants.subList(0, range); - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java deleted file mode 100644 index 7ca939866ceec..0000000000000 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.compact.strategy; - -import java.util.Collections; -import java.util.List; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.sink.compact.FlinkCompactionConfig; -import org.apache.hudi.util.CompactionUtil; - -/** - * Select one compaction plan to compact - */ -public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { - @Override - public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { - Option compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq) - ? pendingCompactionTimeline.lastInstant() - : pendingCompactionTimeline.firstInstant(); - if (compactionPlanInstant.isPresent()) { - return Collections.singletonList(compactionPlanInstant.get()); - } - return Collections.emptyList(); - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 43e4ed511452d..341a157e86c78 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -18,11 +18,6 @@ package org.apache.hudi.sink.compact; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -54,8 +49,9 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -138,17 +134,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); - boolean scheduled = false; - // judge whether have operation - // To compute the compaction instant time and do compaction. - Option compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); - if (compactionInstantTimeOption.isPresent()) { - scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty()); - } - - String compactionInstantTime = compactionInstantTimeOption.get(); - - assertTrue(scheduled, "The compaction plan should be scheduled"); + String compactionInstantTime = scheduleCompactionPlan(metaClient, writeClient); HoodieFlinkTable table = writeClient.getHoodieTable(); // generate compaction plan @@ -160,7 +146,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { // Mark instant as compaction inflight table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); - env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) + env.addSource(new CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime, compactionPlan)))) .name("compaction_source") .uid("uid_compaction_source") .rebalance() @@ -280,29 +266,18 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel } table.getMetaClient().reloadActiveTimeline(); - DataStream source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans)) + env.addSource(new CompactionPlanSourceFunction(compactionPlans)) .name("compaction_source") - .uid("uid_compaction_source"); - SingleOutputStreamOperator operator = source.rebalance() + .uid("uid_compaction_source") + .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator<>(new CompactFunction(conf))) .setParallelism(1) - .process(new ProcessFunction() { - @Override - public void processElement(CompactionCommitEvent event, ProcessFunction.Context context, Collector out) { - context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event); - } - }) - .name("group_by_compaction_plan") - .uid("uid_group_by_compaction_plan") + .addSink(new CompactionCommitSink(conf)) + .name("compaction_commit") + .uid("uid_compaction_commit") .setParallelism(1); - compactionPlans.forEach(pair -> - operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class))) - .addSink(new CompactionCommitSink(conf)) - .name("clean_commits " + pair.getLeft()) - .uid("uid_clean_commits_" + pair.getLeft()) - .setParallelism(1)); env.execute("flink_hudi_compaction"); writeClient.close(); @@ -311,8 +286,7 @@ public void processElement(CompactionCommitEvent event, ProcessFunction writeClient) { boolean scheduled = false; - // judge whether have operation - // To compute the compaction instant time and do compaction. + // judge whether there are any compaction operations. Option compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); if (compactionInstantTimeOption.isPresent()) { scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanStrategy.java similarity index 74% rename from hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java rename to hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanStrategy.java index 3ac9f6c6663ef..181be26d6dc60 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanStrategy.java @@ -25,18 +25,16 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.sink.compact.strategy.AllPendingCompactionPlanSelectStrategy; -import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy; -import org.apache.hudi.sink.compact.strategy.InstantCompactionPlanSelectStrategy; -import org.apache.hudi.sink.compact.strategy.MultiCompactionPlanSelectStrategy; -import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies; +import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** - * Test case for every {@link CompactionPlanSelectStrategy} implements + * Test case for every {@link CompactionPlanStrategy} implements */ -public class TestCompactionPlanSelectStrategy { +public class TestCompactionPlanStrategy { private HoodieTimeline timeline; private HoodieTimeline emptyTimeline; private HoodieTimeline allCompleteTimeline; @@ -59,72 +57,75 @@ public void beforeEach() { void testSingleCompactionPlanSelectStrategy() { HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(compactionConfig); - SingleCompactionPlanSelectStrategy strategy = new SingleCompactionPlanSelectStrategy(); - assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline)); - compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO; - assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig)); + compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_LIFO; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline)); HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); - assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline)); HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); - assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline)); } @Test void testMultiCompactionPlanSelectStrategy() { HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); - compactionConfig.compactionPlanMaxSelect = 2; + compactionConfig.maxNumCompactionPlans = 2; - MultiCompactionPlanSelectStrategy strategy = new MultiCompactionPlanSelectStrategy(); - assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig)); + CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(compactionConfig); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline)); - compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO; - assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig)); + compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_LIFO; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline)); HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); - assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline)); HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); - assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline)); } @Test void testAllPendingCompactionPlanSelectStrategy() { HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + compactionConfig.compactionPlanSelectStrategy = CompactionPlanStrategy.ALL; + CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(compactionConfig); - AllPendingCompactionPlanSelectStrategy strategy = new AllPendingCompactionPlanSelectStrategy(); assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_006}, - strategy.select(pendingCompactionTimeline, compactionConfig)); + strategy.select(pendingCompactionTimeline)); HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); - assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline)); HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); - assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline)); } @Test void testInstantCompactionPlanSelectStrategy() { HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + + compactionConfig.compactionPlanSelectStrategy = CompactionPlanStrategy.INSTANTS; + CompactionPlanStrategy strategy = CompactionPlanStrategies.getStrategy(compactionConfig); compactionConfig.compactionPlanInstant = "004"; - InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy(); - assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline)); compactionConfig.compactionPlanInstant = "002,003"; - assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline)); compactionConfig.compactionPlanInstant = "002,005"; - assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline)); compactionConfig.compactionPlanInstant = "005"; - assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline)); } private void assertHoodieInstantsEquals(HoodieInstant[] expected, List actual) {