Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;

import java.io.File;
import java.io.FileReader;
Expand All @@ -41,6 +43,14 @@ public class HoodieClusteringConfig extends HoodieConfig {

// Any strategy specific params can be saved with this prefix
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";

// Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix
public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize.";
Expand All @@ -59,15 +69,15 @@ public class HoodieClusteringConfig extends HoodieConfig {

public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = ConfigProperty
.key("hoodie.clustering.plan.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy")
.defaultValue(SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY)
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+ "i.e select what file groups are being clustered. Default strategy, looks at the clustering small file size limit (determined by "
+ PLAN_STRATEGY_SMALL_FILE_LIMIT.key() + ") to pick the small file slices within partitions for clustering.");

public static final ConfigProperty<String> EXECUTION_STRATEGY_CLASS_NAME = ConfigProperty
.key("hoodie.clustering.execution.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
.defaultValue(SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY)
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of RunClusteringStrategy) to define how the "
+ " clustering plan is executed. By default, we sort the file groups in th plan by the specified columns, while "
Expand Down Expand Up @@ -325,6 +335,12 @@ public static Builder newBuilder() {
public static class Builder {

private final HoodieClusteringConfig clusteringConfig = new HoodieClusteringConfig();
private EngineType engineType = EngineType.SPARK;

public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
return this;
}

public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
Expand Down Expand Up @@ -444,9 +460,37 @@ public Builder withDataOptimizeDataSkippingEnable(boolean dataSkipping) {
}

public HoodieClusteringConfig build() {
clusteringConfig.setDefaultValue(
PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType));
clusteringConfig.setDefaultValue(
EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType));
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
return clusteringConfig;
}

private String getDefaultPlanStrategyClassName(EngineType engineType) {
switch (engineType) {
case SPARK:
return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
case FLINK:
case JAVA:
return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}

private String getDefaultExecutionStrategyClassName(EngineType engineType) {
switch (engineType) {
case SPARK:
return SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
case FLINK:
case JAVA:
return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY;
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2182,7 +2182,8 @@ protected void setDefaults() {
writeConfig.setDefaultOnCondition(!isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isClusteringConfigSet,
HoodieClusteringConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
HoodieClusteringConfig.newBuilder().withEngineType(engineType)
.fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties(
writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isBootstrapConfigSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,24 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

public abstract class BaseClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {

private static final Logger LOG = LogManager.getLogger(BaseClusteringPlanActionExecutor.class);

private final Option<Map<String, String>> extraMetadata;

public BaseClusteringPlanActionExecutor(HoodieEngineContext context,
Expand All @@ -49,7 +56,32 @@ public BaseClusteringPlanActionExecutor(HoodieEngineContext context,
this.extraMetadata = extraMetadata;
}

protected abstract Option<HoodieClusteringPlan> createClusteringPlan();
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant();

int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
.countInstants();
if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
}

if (config.isAsyncClusteringEnabled() && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling async clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getAsyncClusterMaxCommits());
return Option.empty();
}

LOG.info("Generating clustering plan for table " + config.getBasePath());
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config);
return strategy.generateClusteringPlan();
}

@Override
public Option<HoodieClusteringPlan> execute() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.compact;

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -68,12 +69,15 @@ public ScheduleCompactionActionExecutor(HoodieEngineContext context,
public Option<HoodieCompactionPlan> execute() {
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
// TODO(yihua): this validation is removed for Java client used by kafka-connect. Need to revisit this.
if (config.getEngineType() != EngineType.JAVA) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
}
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.config.HoodieWriteConfig.Builder;

import org.apache.hudi.index.HoodieIndex;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -81,6 +80,52 @@ public void testDefaultIndexAccordingToEngineType() {
assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType());
}

@Test
public void testDefaultClusteringPlanStrategyClassAccordingToEngineType() {
// Default (as Spark)
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
writeConfig.getClusteringPlanStrategyClass());

// Spark
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
writeConfig.getClusteringPlanStrategyClass());

// Flink and Java
for (EngineType engineType : new EngineType[] {EngineType.FLINK, EngineType.JAVA}) {
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
writeConfig.getClusteringPlanStrategyClass());
}
}

@Test
public void testDefaultClusteringExecutionStrategyClassAccordingToEngineType() {
// Default (as Spark)
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY,
writeConfig.getClusteringExecutionStrategyClass());

// Spark
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY,
writeConfig.getClusteringExecutionStrategyClass());

// Flink and Java
for (EngineType engineType : new EngineType[] {EngineType.FLINK, EngineType.JAVA}) {
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY,
writeConfig.getClusteringExecutionStrategyClass());
}
}

private ByteArrayOutputStream saveParamsIntoOutputStream(Map<String, String> params) throws IOException {
Properties properties = new Properties();
properties.putAll(params);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.client.clustering.plan.strategy;

import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
import org.apache.hudi.table.HoodieJavaMergeOnReadTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
* Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions
* for Java engine.
*/
public class JavaRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends JavaSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class);

public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T> table,
HoodieJavaEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}

public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T> table,
HoodieJavaEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}

@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
return partitionPaths.stream()
.sorted(Comparator.reverseOrder())
.skip(Math.max(skipPartitionsFromLatestForClustering, 0))
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
.collect(Collectors.toList());
}
}
Loading