diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 6d3a170f4c437..a8e3d07789b58 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -22,7 +22,9 @@ import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieNotSupportedException; import java.io.File; import java.io.FileReader; @@ -41,6 +43,14 @@ public class HoodieClusteringConfig extends HoodieConfig { // Any strategy specific params can be saved with this prefix public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy."; + public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY = + "org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy"; + public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY = + "org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy"; + public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY = + "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy"; + public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY = + "org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy"; // Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize."; @@ -59,7 +69,7 @@ public class HoodieClusteringConfig extends HoodieConfig { public static final ConfigProperty PLAN_STRATEGY_CLASS_NAME = ConfigProperty .key("hoodie.clustering.plan.strategy.class") - .defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy") + .defaultValue(SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY) .sinceVersion("0.7.0") .withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan " + "i.e select what file groups are being clustered. Default strategy, looks at the clustering small file size limit (determined by " @@ -67,7 +77,7 @@ public class HoodieClusteringConfig extends HoodieConfig { public static final ConfigProperty EXECUTION_STRATEGY_CLASS_NAME = ConfigProperty .key("hoodie.clustering.execution.strategy.class") - .defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy") + .defaultValue(SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY) .sinceVersion("0.7.0") .withDocumentation("Config to provide a strategy class (subclass of RunClusteringStrategy) to define how the " + " clustering plan is executed. By default, we sort the file groups in th plan by the specified columns, while " @@ -325,6 +335,12 @@ public static Builder newBuilder() { public static class Builder { private final HoodieClusteringConfig clusteringConfig = new HoodieClusteringConfig(); + private EngineType engineType = EngineType.SPARK; + + public Builder withEngineType(EngineType engineType) { + this.engineType = engineType; + return this; + } public Builder fromFile(File propertiesFile) throws IOException { try (FileReader reader = new FileReader(propertiesFile)) { @@ -444,9 +460,37 @@ public Builder withDataOptimizeDataSkippingEnable(boolean dataSkipping) { } public HoodieClusteringConfig build() { + clusteringConfig.setDefaultValue( + PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType)); + clusteringConfig.setDefaultValue( + EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType)); clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName()); return clusteringConfig; } + + private String getDefaultPlanStrategyClassName(EngineType engineType) { + switch (engineType) { + case SPARK: + return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; + case FLINK: + case JAVA: + return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY; + default: + throw new HoodieNotSupportedException("Unsupported engine " + engineType); + } + } + + private String getDefaultExecutionStrategyClassName(EngineType engineType) { + switch (engineType) { + case SPARK: + return SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY; + case FLINK: + case JAVA: + return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY; + default: + throw new HoodieNotSupportedException("Unsupported engine " + engineType); + } + } } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d4966104d1398..17386e9e986d2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2182,7 +2182,8 @@ protected void setDefaults() { writeConfig.setDefaultOnCondition(!isCompactionConfigSet, HoodieCompactionConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isClusteringConfigSet, - HoodieClusteringConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); + HoodieClusteringConfig.newBuilder().withEngineType(engineType) + .fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties( writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isBootstrapConfigSet, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java index 97407e3464d79..8071bfb83a0d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java @@ -27,10 +27,15 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.BaseActionExecutor; +import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; @@ -38,6 +43,8 @@ public abstract class BaseClusteringPlanActionExecutor extends BaseActionExecutor> { + private static final Logger LOG = LogManager.getLogger(BaseClusteringPlanActionExecutor.class); + private final Option> extraMetadata; public BaseClusteringPlanActionExecutor(HoodieEngineContext context, @@ -49,7 +56,32 @@ public BaseClusteringPlanActionExecutor(HoodieEngineContext context, this.extraMetadata = extraMetadata; } - protected abstract Option createClusteringPlan(); + protected Option createClusteringPlan() { + LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); + Option lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant(); + + int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() + .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE) + .countInstants(); + if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { + LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering + + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " + + config.getInlineClusterMaxCommits()); + return Option.empty(); + } + + if (config.isAsyncClusteringEnabled() && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) { + LOG.info("Not scheduling async clustering as only " + commitsSinceLastClustering + + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " + + config.getAsyncClusterMaxCommits()); + return Option.empty(); + } + + LOG.info("Generating clustering plan for table " + config.getBasePath()); + ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) + ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config); + return strategy.generateClusteringPlan(); + } @Override public Option execute() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 87c5a7c310445..5adaf5ae3b003 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -68,12 +69,15 @@ public ScheduleCompactionActionExecutor(HoodieEngineContext context, public Option execute() { if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() && !config.getFailedWritesCleanPolicy().isLazy()) { - // if there are inflight writes, their instantTime must not be less than that of compaction instant time - table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant() - .ifPresent(earliestInflight -> ValidationUtils.checkArgument( - HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime), - "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight - + ", Compaction scheduled at " + instantTime)); + // TODO(yihua): this validation is removed for Java client used by kafka-connect. Need to revisit this. + if (config.getEngineType() != EngineType.JAVA) { + // if there are inflight writes, their instantTime must not be less than that of compaction instant time + table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant() + .ifPresent(earliestInflight -> ValidationUtils.checkArgument( + HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime), + "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight + + ", Compaction scheduled at " + instantTime)); + } // Committed and pending compaction instants should have strictly lower timestamps List conflictingInstants = table.getActiveTimeline() .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index ed6b9e6cc5352..1df71e812d6b2 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -20,7 +20,6 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.config.HoodieWriteConfig.Builder; - import org.apache.hudi.index.HoodieIndex; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -81,6 +80,52 @@ public void testDefaultIndexAccordingToEngineType() { assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType()); } + @Test + public void testDefaultClusteringPlanStrategyClassAccordingToEngineType() { + // Default (as Spark) + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build(); + assertEquals( + HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY, + writeConfig.getClusteringPlanStrategyClass()); + + // Spark + writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build(); + assertEquals( + HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY, + writeConfig.getClusteringPlanStrategyClass()); + + // Flink and Java + for (EngineType engineType : new EngineType[] {EngineType.FLINK, EngineType.JAVA}) { + writeConfig = HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build(); + assertEquals( + HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY, + writeConfig.getClusteringPlanStrategyClass()); + } + } + + @Test + public void testDefaultClusteringExecutionStrategyClassAccordingToEngineType() { + // Default (as Spark) + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build(); + assertEquals( + HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY, + writeConfig.getClusteringExecutionStrategyClass()); + + // Spark + writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build(); + assertEquals( + HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY, + writeConfig.getClusteringExecutionStrategyClass()); + + // Flink and Java + for (EngineType engineType : new EngineType[] {EngineType.FLINK, EngineType.JAVA}) { + writeConfig = HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build(); + assertEquals( + HoodieClusteringConfig.JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY, + writeConfig.getClusteringExecutionStrategyClass()); + } + } + private ByteArrayOutputStream saveParamsIntoOutputStream(Map params) throws IOException { Properties properties = new Properties(); properties.putAll(params); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java new file mode 100644 index 0000000000000..6d9b2eedc094d --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieJavaCopyOnWriteTable; +import org.apache.hudi.table.HoodieJavaMergeOnReadTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions + * for Java engine. + */ +public class JavaRecentDaysClusteringPlanStrategy> + extends JavaSizeBasedClusteringPlanStrategy { + private static final Logger LOG = LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class); + + public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable table, + HoodieJavaEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable table, + HoodieJavaEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List filterPartitionPaths(List partitionPaths) { + int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); + int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering(); + return partitionPaths.stream() + .sorted(Comparator.reverseOrder()) + .skip(Math.max(skipPartitionsFromLatestForClustering, 0)) + .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size()) + .collect(Collectors.toList()); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java new file mode 100644 index 0000000000000..9052f030f4616 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieJavaCopyOnWriteTable; +import org.apache.hudi.table.HoodieJavaMergeOnReadTable; +import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; + +/** + * Clustering Strategy for Java engine based on following. + * 1) Creates clustering groups based on max size allowed per group. + * 2) Excludes files that are greater than 'small.file.limit' from clustering plan. + */ +public class JavaSizeBasedClusteringPlanStrategy> + extends PartitionAwareClusteringPlanStrategy>, List, List> { + private static final Logger LOG = LogManager.getLogger(JavaSizeBasedClusteringPlanStrategy.class); + + public JavaSizeBasedClusteringPlanStrategy(HoodieJavaCopyOnWriteTable table, + HoodieJavaEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public JavaSizeBasedClusteringPlanStrategy(HoodieJavaMergeOnReadTable table, + HoodieJavaEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected Stream buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { + List, Integer>> fileSliceGroups = new ArrayList<>(); + List currentGroup = new ArrayList<>(); + long totalSizeSoFar = 0; + HoodieWriteConfig writeConfig = getWriteConfig(); + for (FileSlice currentSlice : fileSlices) { + // assume each filegroup size is ~= parquet.max.file.size + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); + // check if max size is reached and create new group, if needed. + if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); + LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: " + + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); + currentGroup = new ArrayList<>(); + totalSizeSoFar = 0; + } + currentGroup.add(currentSlice); + // totalSizeSoFar could be 0 when new group was created in the previous conditional block. + // reset to the size of current slice, otherwise the number of output file group will become 0 even though current slice is present. + if (totalSizeSoFar == 0) { + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); + } + } + if (!currentGroup.isEmpty()) { + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); + LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: " + + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); + } + + return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputFileGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); + } + + @Override + protected Map getStrategyParams() { + Map params = new HashMap<>(); + if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) { + params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), getWriteConfig().getClusteringSortColumns()); + } + return params; + } + + @Override + protected List filterPartitionPaths(List partitionPaths) { + return partitionPaths; + } + + @Override + protected Stream getFileSlicesEligibleForClustering(final String partition) { + return super.getFileSlicesEligibleForClustering(partition) + // Only files that have basefile size smaller than small file size are eligible. + .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit()); + } + + private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) { + return (int) Math.ceil(groupSize / (double) targetFileSize); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java new file mode 100644 index 0000000000000..c830925419b57 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.JavaTaskContextSupplier; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.RewriteAvroPayload; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner; +import org.apache.hudi.io.IOUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; +import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; + +/** + * Clustering strategy for Java engine. + */ +public abstract class JavaExecutionStrategy> + extends ClusteringExecutionStrategy>, List, List> { + + private static final Logger LOG = LogManager.getLogger(JavaExecutionStrategy.class); + + public JavaExecutionStrategy( + HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + public HoodieWriteMetadata> performClustering( + HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) { + // execute clustering for each group and collect WriteStatus + List writeStatusList = new ArrayList<>(); + clusteringPlan.getInputGroups().forEach( + inputGroup -> writeStatusList.addAll(runClusteringForGroup( + inputGroup, clusteringPlan.getStrategy().getStrategyParams(), + Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), + instantTime))); + HoodieWriteMetadata> writeMetadata = new HoodieWriteMetadata<>(); + writeMetadata.setWriteStatuses(writeStatusList); + return writeMetadata; + } + + /** + * Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. + * The number of new file groups created is bounded by numOutputGroups. + * Note that commit is not done as part of strategy. commit is callers responsibility. + * + * @param inputRecords List of {@link HoodieRecord}. + * @param numOutputGroups Number of output file groups. + * @param instantTime Clustering (replace commit) instant time. + * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. + * @param schema Schema of the data including metadata fields. + * @param fileGroupIdList File group id corresponding to each out group. + * @param preserveHoodieMetadata Whether to preserve commit metadata while clustering. + * @return List of {@link WriteStatus}. + */ + public abstract List performClusteringWithRecordList( + final List> inputRecords, final int numOutputGroups, final String instantTime, + final Map strategyParams, final Schema schema, + final List fileGroupIdList, final boolean preserveHoodieMetadata); + + /** + * Create {@link BulkInsertPartitioner} based on strategy params. + * + * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. + * @param schema Schema of the data including metadata fields. + * @return empty for now. + */ + protected Option> getPartitioner(Map strategyParams, Schema schema) { + if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { + return Option.of(new JavaCustomColumnsSortPartitioner( + strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), + HoodieAvroUtils.addMetadataFields(schema))); + } else { + return Option.empty(); + } + } + + /** + * Executes clustering for the group. + */ + private List runClusteringForGroup( + HoodieClusteringGroup clusteringGroup, Map strategyParams, + boolean preserveHoodieMetadata, String instantTime) { + List> inputRecords = readRecordsForGroup(clusteringGroup, instantTime); + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); + List inputFileIds = clusteringGroup.getSlices().stream() + .map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())) + .collect(Collectors.toList()); + return performClusteringWithRecordList(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata); + } + + /** + * Get a list of all records for the group. This includes all records from file slice + * (Apply updates from log files, if any). + */ + private List> readRecordsForGroup(HoodieClusteringGroup clusteringGroup, String instantTime) { + List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList()); + boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); + if (hasLogFiles) { + // if there are log files, we read all records into memory for a file group and apply updates. + return readRecordsForGroupWithLogs(clusteringOps, instantTime); + } else { + // We want to optimize reading records for case there are no log files. + return readRecordsForGroupBaseFiles(clusteringOps); + } + } + + /** + * Read records from baseFiles and apply updates. + */ + private List> readRecordsForGroupWithLogs(List clusteringOps, + String instantTime) { + HoodieWriteConfig config = getWriteConfig(); + HoodieTable table = getHoodieTable(); + List> records = new ArrayList<>(); + + clusteringOps.forEach(clusteringOp -> { + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config); + LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(table.getMetaClient().getFs()) + .withBasePath(table.getMetaClient().getBasePath()) + .withLogFilePaths(clusteringOp.getDeltaFilePaths()) + .withReaderSchema(readerSchema) + .withLatestInstantTime(instantTime) + .withMaxMemorySizeInBytes(maxMemoryPerCompaction) + .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) + .withReverseReader(config.getCompactionReverseLogReadEnabled()) + .withBufferSize(config.getMaxDFSStreamBufferSize()) + .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .withPartition(clusteringOp.getPartitionPath()) + .build(); + + Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) + ? Option.empty() + : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); + HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); + Iterator> fileSliceReader = getFileSliceReader(baseFileReader, scanner, readerSchema, + tableConfig.getPayloadClass(), + tableConfig.getPreCombineField(), + tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp()))); + fileSliceReader.forEachRemaining(records::add); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }); + return records; + } + + /** + * Read records from baseFiles. + */ + private List> readRecordsForGroupBaseFiles(List clusteringOps) { + List> records = new ArrayList<>(); + clusteringOps.forEach(clusteringOp -> { + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + Iterator recordIterator = baseFileReader.getRecordIterator(readerSchema); + recordIterator.forEachRemaining(record -> records.add(transform(record))); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }); + return records; + } + + /** + * Transform IndexedRecord into HoodieRecord. + */ + private HoodieRecord transform(IndexedRecord indexedRecord) { + GenericRecord record = (GenericRecord) indexedRecord; + Option keyGeneratorOpt = Option.empty(); + String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt); + String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt); + HoodieKey hoodieKey = new HoodieKey(key, partition); + + HoodieRecordPayload avroPayload = new RewriteAvroPayload(record); + HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload); + return hoodieRecord; + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java new file mode 100644 index 0000000000000..a33af7ccd0214 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.avro.Schema; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.JavaBulkInsertHelper; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Clustering Strategy based on following. + * 1) Java execution engine. + * 2) Uses bulk_insert to write data into new files. + */ +public class JavaSortAndSizeExecutionStrategy> + extends JavaExecutionStrategy { + private static final Logger LOG = LogManager.getLogger(JavaSortAndSizeExecutionStrategy.class); + + public JavaSortAndSizeExecutionStrategy(HoodieTable table, + HoodieEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + public List performClusteringWithRecordList( + final List> inputRecords, final int numOutputGroups, + final String instantTime, final Map strategyParams, final Schema schema, + final List fileGroupIdList, final boolean preserveHoodieMetadata) { + LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); + Properties props = getWriteConfig().getProps(); + props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups)); + // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. + props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); + props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); + return (List) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, + false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java new file mode 100644 index 0000000000000..bb7cd5e23a5b9 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.avro.Schema; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * A partitioner that does sorting based on specified column values for Java client. + * + * @param HoodieRecordPayload type + */ +public class JavaCustomColumnsSortPartitioner + implements BulkInsertPartitioner>> { + + private final String[] sortColumnNames; + private final Schema schema; + + public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema) { + this.sortColumnNames = columnNames; + this.schema = schema; + } + + @Override + public List> repartitionRecords( + List> records, int outputSparkPartitions) { + return records.stream().sorted((o1, o2) -> { + Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema); + Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema); + return values1.toString().compareTo(values2.toString()); + }).collect(Collectors.toList()); + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 4107adb8fb488..cead7aa477332 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -29,6 +29,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -39,17 +40,24 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.io.HoodieCreateHandle; +import org.apache.hudi.io.HoodieMergeHandle; +import org.apache.hudi.io.HoodieSortedMergeHandle; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.clean.CleanActionExecutor; import org.apache.hudi.table.action.clean.CleanPlanActionExecutor; -import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor; +import org.apache.hudi.table.action.cluster.JavaClusteringPlanActionExecutor; +import org.apache.hudi.table.action.cluster.JavaExecuteClusteringCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertOverwriteCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaMergeHelper; import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor; @@ -57,10 +65,20 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; -public class HoodieJavaCopyOnWriteTable extends HoodieJavaTable { +public class HoodieJavaCopyOnWriteTable + extends HoodieJavaTable implements HoodieCompactionHandler { + + private static final Logger LOG = LoggerFactory.getLogger(HoodieJavaCopyOnWriteTable.class); + protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { @@ -160,23 +178,23 @@ public void updateStatistics(HoodieEngineContext context, List public Option scheduleCompaction(HoodieEngineContext context, String instantTime, Option> extraMetadata) { - throw new HoodieNotSupportedException("ScheduleCompaction is not supported yet"); + throw new HoodieNotSupportedException("ScheduleCompaction is not supported on a CopyOnWrite table"); } @Override public HoodieWriteMetadata> compact(HoodieEngineContext context, String compactionInstantTime) { - throw new HoodieNotSupportedException("Compact is not supported yet"); + throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } @Override public Option scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option> extraMetadata) { - throw new HoodieNotSupportedException("Clustering is not supported yet"); + return new JavaClusteringPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute(); } @Override public HoodieWriteMetadata> cluster(final HoodieEngineContext context, final String clusteringInstantTime) { - throw new HoodieNotSupportedException("Clustering is not supported yet"); + return new JavaExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute(); } @Override @@ -235,4 +253,53 @@ public HoodieRestoreMetadata restore(HoodieEngineContext context, return new CopyOnWriteRestoreActionExecutor( context, config, this, restoreInstantTime, instantToRestore).execute(); } + + @Override + public Iterator> handleUpdate( + String instantTime, String partitionPath, String fileId, + Map> keyToNewRecords, HoodieBaseFile oldDataFile) + throws IOException { + // these are updates + HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile); + return handleUpdateInternal(upsertHandle, instantTime, fileId); + } + + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String instantTime, + String fileId) throws IOException { + if (upsertHandle.getOldFilePath() == null) { + throw new HoodieUpsertException( + "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId); + } else { + JavaMergeHelper.newInstance().runMerge(this, upsertHandle); + } + + // TODO(yihua): This needs to be revisited + if (upsertHandle.getPartitionPath() == null) { + LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + + upsertHandle.writeStatuses()); + } + + return Collections.singletonList(upsertHandle.writeStatuses()).iterator(); + } + + protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId, + Map> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) { + if (requireSortedRecords()) { + return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, Option.empty()); + } else { + return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier, Option.empty()); + } + } + + @Override + public Iterator> handleInsert( + String instantTime, String partitionPath, String fileId, + Map> recordMap) { + HoodieCreateHandle createHandle = + new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier); + createHandle.write(); + return Collections.singletonList(createHandle.close()).iterator(); + } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index b219ba1a99016..136c25b8cdb2a 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -29,9 +30,13 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor; +import org.apache.hudi.table.action.compact.HoodieJavaMergeOnReadTableCompactor; +import org.apache.hudi.table.action.compact.RunCompactionActionExecutor; +import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor; import org.apache.hudi.table.action.deltacommit.JavaUpsertPreppedDeltaCommitActionExecutor; import java.util.List; +import java.util.Map; public class HoodieJavaMergeOnReadTable extends HoodieJavaCopyOnWriteTable { protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { @@ -60,4 +65,21 @@ public HoodieWriteMetadata> bulkInsertPrepped(HoodieEngineCont return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config, this, instantTime, preppedRecords, bulkInsertPartitioner).execute(); } + + @Override + public Option scheduleCompaction(HoodieEngineContext context, String instantTime, Option> extraMetadata) { + ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor( + context, config, this, instantTime, extraMetadata, + new HoodieJavaMergeOnReadTableCompactor()); + return scheduleCompactionExecutor.execute(); + } + + @Override + public HoodieWriteMetadata> compact( + HoodieEngineContext context, String compactionInstantTime) { + RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor( + context, config, this, compactionInstantTime, new HoodieJavaMergeOnReadTableCompactor(), + new HoodieJavaCopyOnWriteTable(config, context, getMetaClient())); + return convertMetadata(compactionExecutor.execute()); + } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index 8b0a7a95ef87c..f9c7caff6ebbc 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -31,9 +32,12 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.JavaHoodieIndexFactory; +import org.apache.hudi.table.action.HoodieWriteMetadata; import java.util.List; +import static org.apache.hudi.common.data.HoodieList.getList; + public abstract class HoodieJavaTable extends HoodieTable>, List, List> { protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { @@ -61,6 +65,11 @@ public static HoodieJavaTable create(HoodieWr } } + public static HoodieWriteMetadata> convertMetadata( + HoodieWriteMetadata> metadata) { + return metadata.clone(getList(metadata.getWriteStatuses())); + } + @Override protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context) { return JavaHoodieIndexFactory.createIndex(config); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java new file mode 100644 index 0000000000000..1d78ecc2bf41c --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.cluster; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import java.util.List; +import java.util.Map; + +public class JavaClusteringPlanActionExecutor extends + BaseClusteringPlanActionExecutor>, List, List> { + + public JavaClusteringPlanActionExecutor( + HoodieEngineContext context, HoodieWriteConfig config, + HoodieTable>, List, List> table, + String instantTime, Option> extraMetadata) { + super(context, config, table, instantTime, extraMetadata); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java new file mode 100644 index 0000000000000..83364bdc3ad35 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.cluster; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; +import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor; + +import org.apache.avro.Schema; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class JavaExecuteClusteringCommitActionExecutor> + extends BaseJavaCommitActionExecutor { + + private final HoodieClusteringPlan clusteringPlan; + + public JavaExecuteClusteringCommitActionExecutor( + HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, + String instantTime) { + super(context, config, table, instantTime, WriteOperationType.CLUSTER); + this.clusteringPlan = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime)) + .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException( + "Unable to read clustering plan for instant: " + instantTime)); + } + + @Override + public HoodieWriteMetadata> execute() { + HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime); + // Mark instant as clustering inflight + table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + table.getMetaClient().reloadActiveTimeline(); + + final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + HoodieWriteMetadata> writeMetadata = ( + (ClusteringExecutionStrategy>, List, List>) + ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), + new Class[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config)) + .performClustering(clusteringPlan, schema, instantTime); + List writeStatusList = writeMetadata.getWriteStatuses(); + List statuses = updateIndex(writeStatusList, writeMetadata); + writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList())); + writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata)); + validateWriteResult(writeMetadata); + commitOnAutoCommit(writeMetadata); + if (!writeMetadata.getCommitMetadata().isPresent()) { + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); + writeMetadata.setCommitMetadata(Option.of(commitMetadata)); + } + return writeMetadata; + } + + /** + * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written. + * But we can extend this to add more validation. E.g. number of records read = number of records written etc. + * We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions. + */ + private void validateWriteResult(HoodieWriteMetadata> writeMetadata) { + if (writeMetadata.getWriteStatuses().isEmpty()) { + throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + + " write statuses"); + } + } + + @Override + protected String getCommitActionType() { + return HoodieTimeline.REPLACE_COMMIT_ACTION; + } + + @Override + protected Map> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeMetadata) { + Set newFilesWritten = writeMetadata.getWriteStats().get().stream() + .map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet()); + return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan) + .filter(fg -> !newFilesWritten.contains(fg)) + .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 66cb40758bdc0..2a93c5012ce1e 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -126,13 +126,14 @@ public HoodieWriteMetadata> execute(List> inpu return result; } - protected void updateIndex(List writeStatuses, HoodieWriteMetadata> result) { + protected List updateIndex(List writeStatuses, HoodieWriteMetadata> result) { Instant indexStartTime = Instant.now(); // Update the index back List statuses = HoodieList.getList( table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, table)); result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); result.setWriteStatuses(statuses); + return statuses; } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java new file mode 100644 index 0000000000000..30bdcda759ce0 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.compact; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import java.util.List; + +/** + * Compacts a hoodie table with merge on read storage in Java engine. Computes all possible + * compactions, passes it through a CompactionFilter and executes all the compactions and + * writes a new version of base files and make a normal commit. + */ +public class HoodieJavaMergeOnReadTableCompactor + extends HoodieCompactor>, List, List> { + + @Override + public void preCompact( + HoodieTable table, HoodieTimeline pendingCompactionTimeline, String compactionInstantTime) { + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + table.rollbackInflightCompaction(inflightInstant); + table.getMetaClient().reloadActiveTimeline(); + } + } + + @Override + public void maybePersist(HoodieData writeStatus, HoodieWriteConfig config) { + // No OP + } +} diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java new file mode 100644 index 0000000000000..5d6f21164589d --- /dev/null +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.testutils.HoodieJavaClientTestBase; + +import org.apache.avro.Schema; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestJavaBulkInsertInternalPartitioner extends HoodieJavaClientTestBase { + private static final Comparator KEY_COMPARATOR = + Comparator.comparing(o -> (o.getPartitionPath() + "+" + o.getRecordKey())); + + public static List generateTestRecordsForBulkInsert(int numRecords) { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + List records = dataGenerator.generateInserts("0", numRecords); + return records; + } + + public static Map generatePartitionNumRecords(List records) { + return records.stream().map(record -> record.getPartitionPath()) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + } + + @ParameterizedTest + @ValueSource(strings = {"rider", "rider,driver"}) + public void testCustomColumnSortPartitioner(String sortColumnString) throws Exception { + String[] sortColumns = sortColumnString.split(","); + Comparator columnComparator = + getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns); + + List records = generateTestRecordsForBulkInsert(1000); + testBulkInsertInternalPartitioner( + new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA), + records, true, generatePartitionNumRecords(records), Option.of(columnComparator)); + } + + private Comparator getCustomColumnComparator(Schema schema, String[] sortColumns) { + return Comparator.comparing( + record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema).toString()); + } + + private void verifyRecordAscendingOrder(List records, + Option> comparator) { + List expectedRecords = new ArrayList<>(records); + Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR)); + assertEquals(expectedRecords, records); + } + + private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, + List records, + boolean isSorted, + Map expectedPartitionNumRecords, + Option> comparator) { + List actualRecords = + (List) partitioner.repartitionRecords(records, 1); + if (isSorted) { + // Verify global order + verifyRecordAscendingOrder(actualRecords, comparator); + } + + // Verify number of records per partition path + assertEquals(expectedPartitionNumRecords, generatePartitionNumRecords(actualRecords)); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java index 5c132773c9ced..ad19824e1e290 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java @@ -23,6 +23,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieSparkMergeOnReadTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -49,6 +50,7 @@ public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable tabl super(table, engineContext, writeConfig); } + @Override protected List filterPartitionPaths(List partitionPaths) { int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index fb3c5ec0dc3ca..a36da5fd445cc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -18,18 +18,15 @@ package org.apache.hudi.execution.bulkinsert; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.BulkInsertPartitioner; -import org.apache.spark.api.java.JavaRDD; -import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaRDD; /** * A partitioner that does sorting based on specified column values for each RDD partition. @@ -57,7 +54,8 @@ public JavaRDD> repartitionRecords(JavaRDD> reco int outputSparkPartitions) { final String[] sortColumns = this.sortColumnNames; final SerializableSchema schema = this.serializableSchema; - return records.sortBy(record -> getRecordSortColumnValues(record, sortColumns, schema), + return records.sortBy( + record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema), true, outputSparkPartitions); } @@ -66,26 +64,6 @@ public boolean arePartitionRecordsSorted() { return true; } - private static Object getRecordSortColumnValues(HoodieRecord record, - String[] sortColumns, - SerializableSchema schema) { - try { - GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get(); - if (sortColumns.length == 1) { - return HoodieAvroUtils.getNestedFieldVal(genericRecord, sortColumns[0], true); - } else { - StringBuilder sb = new StringBuilder(); - for (String col : sortColumns) { - sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true)); - } - - return sb.toString(); - } - } catch (IOException e) { - throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e); - } - } - private String[] getSortColumnName(HoodieWriteConfig config) { return config.getUserDefinedBulkInsertPartitionerSortColumns().split(","); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java index 683d852131f6d..81a0a74aee1d3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java @@ -18,20 +18,14 @@ package org.apache.hudi.table.action.cluster; -import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import java.util.Map; @@ -40,8 +34,6 @@ public class SparkClusteringPlanActionExecutor extends BaseClusteringPlanActionExecutor>, JavaRDD, JavaRDD> { - private static final Logger LOG = LogManager.getLogger(SparkClusteringPlanActionExecutor.class); - public SparkClusteringPlanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable>, JavaRDD, JavaRDD> table, @@ -49,33 +41,4 @@ public SparkClusteringPlanActionExecutor(HoodieEngineContext context, Option> extraMetadata) { super(context, config, table, instantTime, extraMetadata); } - - @Override - protected Option createClusteringPlan() { - LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); - Option lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant(); - - int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() - .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE) - .countInstants(); - if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { - LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering - + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " - + config.getInlineClusterMaxCommits()); - return Option.empty(); - } - - if (config.isAsyncClusteringEnabled() && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) { - LOG.info("Not scheduling async clustering as only " + commitsSinceLastClustering - + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " - + config.getAsyncClusterMaxCommits()); - return Option.empty(); - } - - LOG.info("Generating clustering plan for table " + config.getBasePath()); - ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) - ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config); - return strategy.generateClusteringPlan(); - } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index a602b452c3247..ff8aefe97e85a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -18,8 +18,10 @@ package org.apache.hudi.avro; +import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -229,10 +231,10 @@ public static Schema removeMetadataFields(Schema schema) { public static Schema removeFields(Schema schema, List fieldsToRemove) { List filteredFields = schema.getFields() - .stream() - .filter(field -> !fieldsToRemove.contains(field.name())) - .map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) - .collect(Collectors.toList()); + .stream() + .filter(field -> !fieldsToRemove.contains(field.name())) + .map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) + .collect(Collectors.toList()); Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false); filteredSchema.setFields(filteredFields); return filteredSchema; @@ -289,7 +291,7 @@ public static Schema getSchemaForFields(Schema fileSchema, List fields) } public static GenericRecord addHoodieKeyToRecord(GenericRecord record, String recordKey, String partitionPath, - String fileName) { + String fileName) { record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileName); record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath); record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey); @@ -551,7 +553,7 @@ private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object } else if (fieldSchema.getType() == Schema.Type.BYTES) { ByteBuffer byteBuffer = (ByteBuffer) fieldValue; BigDecimal convertedValue = decimalConversion.fromBytes(byteBuffer, fieldSchema, - LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); + LogicalTypes.decimal(dc.getPrecision(), dc.getScale())); byteBuffer.rewind(); return convertedValue; } @@ -570,9 +572,51 @@ public static Schema getNullSchema() { * @return sanitized name */ public static String sanitizeName(String name) { - if (name.substring(0,1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) { + if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) { name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES); } return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES); } + + /** + * Gets record column values into one object. + * + * @param record Hoodie record. + * @param columns Names of the columns to get values. + * @param schema {@link Schema} instance. + * @return Column value if a single column, or concatenated String values by comma. + */ + public static Object getRecordColumnValues(HoodieRecord record, + String[] columns, + Schema schema) { + try { + GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); + if (columns.length == 1) { + return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0], true); + } else { + StringBuilder sb = new StringBuilder(); + for (String col : columns) { + sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true)); + } + + return sb.toString(); + } + } catch (IOException e) { + throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e); + } + } + + /** + * Gets record column values into one object. + * + * @param record Hoodie record. + * @param columns Names of the columns to get values. + * @param schema {@link SerializableSchema} instance. + * @return Column value if a single column, or concatenated String values by comma. + */ + public static Object getRecordColumnValues(HoodieRecord record, + String[] columns, + SerializableSchema schema) { + return getRecordColumnValues(record, columns, schema.get()); + } } diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index f5087ce0f9b6d..1754eb8252729 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -106,7 +106,7 @@ to generate, with each batch containing a number of messages and idle time betwe bash setupKafka.sh -n -b ``` -### 4 - Run the Sink connector worker (multiple workers can be run) +### 5 - Run the Sink connector worker (multiple workers can be run) The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks) that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with @@ -120,7 +120,7 @@ cd $KAFKA_HOME ./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties ``` -### 5- To add the Hudi Sink to the Connector (delete it if you want to re-configure) +### 6 - To add the Hudi Sink to the Connector (delete it if you want to re-configure) Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink, @@ -170,4 +170,132 @@ total 5168 -rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet ``` +### 7 - Run async compaction and clustering if scheduled +When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is +running. Inline compaction and clustering are disabled by default due to performance reason. By default, async +compaction scheduling is enabled, and you can disable it by setting `hoodie.kafka.compaction.async.enable` to `false`. +Async clustering scheduling is disabled by default, and you can enable it by setting `hoodie.clustering.async.enabled` +to `true`. + +The Sink only schedules the compaction and clustering if necessary and does not execute them for performance. You need +to execute the scheduled compaction and clustering using separate Spark jobs or Hudi CLI. + +After the compaction is scheduled, you can see the requested compaction instant (`20211111111410.compaction.requested`) +below: + +``` +ls -l /tmp/hoodie/hudi-test-topic/.hoodie +total 280 +-rw-r--r-- 1 user wheel 21172 Nov 11 11:09 20211111110807.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.requested +-rw-r--r-- 1 user wheel 22458 Nov 11 11:11 20211111110940.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:09 20211111110940.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:09 20211111110940.deltacommit.requested +-rw-r--r-- 1 user wheel 21445 Nov 11 11:13 20211111111110.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:11 20211111111110.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:11 20211111111110.deltacommit.requested +-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.requested +-rw-r--r-- 1 user wheel 9885 Nov 11 11:14 20211111111410.compaction.requested +-rw-r--r-- 1 user wheel 21192 Nov 11 11:15 20211111111411.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.requested +-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.requested +drwxr-xr-x 2 user wheel 64 Nov 11 11:08 archived +-rw-r--r-- 1 user wheel 387 Nov 11 11:08 hoodie.properties +``` + +Then you can run async compaction job with `HoodieCompactor` and `spark-submit` by: + +``` +spark-submit \ + --class org.apache.hudi.utilities.HoodieCompactor \ + hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar \ + --base-path /tmp/hoodie/hudi-test-topic \ + --table-name hudi-test-topic \ + --schema-file /Users/user/repo/hudi/docker/demo/config/schema.avsc \ + --instant-time 20211111111410 \ + --parallelism 2 \ + --spark-memory 1g +``` + +Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled +compaction is going to be executed. + +Alternatively, you can use Hudi CLI to execute compaction: + +``` +hudi-> connect --path /tmp/hoodie/hudi-test-topic +hudi:hudi-test-topic-> compactions show all +╔═════════════════════════╤═══════════╤═══════════════════════════════╗ +║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║ +╠═════════════════════════╪═══════════╪═══════════════════════════════╣ +║ 20211111111410 │ REQUESTED │ 12 ║ +╚═════════════════════════╧═══════════╧═══════════════════════════════╝ + +compaction validate --instant 20211111111410 +compaction run --compactionInstant 20211111111410 --parallelism 2 --schemaFilePath /Users/user/repo/hudi/docker/demo/config/schema.avsc +``` + +Similarly, you can see the requested clustering instant (`20211111111813.replacecommit.requested`) after it is scheduled +by the Sink: + +``` +ls -l /tmp/hoodie/hudi-test-topic/.hoodie +total 736 +-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.requested +-rw-r--r-- 1 user wheel 18681 Nov 11 11:17 20211111111410.commit +-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111410.compaction.inflight +-rw-r--r-- 1 user wheel 9885 Nov 11 11:14 20211111111410.compaction.requested +-rw-r--r-- 1 user wheel 21192 Nov 11 11:15 20211111111411.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.requested +-rw-r--r-- 1 user wheel 22460 Nov 11 11:17 20211111111530.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.requested +-rw-r--r-- 1 user wheel 21357 Nov 11 11:18 20211111111711.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111711.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111711.deltacommit.requested +-rw-r--r-- 1 user wheel 6516 Nov 11 11:18 20211111111813.replacecommit.requested +-rw-r--r-- 1 user wheel 26070 Nov 11 11:20 20211111111815.deltacommit +-rw-r--r-- 1 user wheel 0 Nov 11 11:18 20211111111815.deltacommit.inflight +-rw-r--r-- 1 user wheel 0 Nov 11 11:18 20211111111815.deltacommit.requested +``` + +Then you can run async clustering job with `HoodieClusteringJob` and `spark-submit` by: + +``` +spark-submit \ + --class org.apache.hudi.utilities.HoodieClusteringJob \ + hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar \ + --props clusteringjob.properties \ + --mode execute \ + --base-path /tmp/hoodie/hudi-test-topic \ + --table-name sample_table \ + --instant-time 20211111111813 \ + --spark-memory 1g +``` + +Sample `clusteringjob.properties`: + +``` +hoodie.datasource.write.recordkey.field=volume +hoodie.datasource.write.partitionpath.field=date +hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/hudi-test-topic/versions/latest + +hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824 +hoodie.clustering.plan.strategy.small.file.limit=629145600 +hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy +hoodie.clustering.plan.strategy.sort.columns=volume + +hoodie.write.concurrency.mode=single_writer +``` + +Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled +clustering is going to be executed. diff --git a/hudi-kafka-connect/demo/config-sink.json b/hudi-kafka-connect/demo/config-sink.json index 2d2be00f89358..12a2f5053f308 100644 --- a/hudi-kafka-connect/demo/config-sink.json +++ b/hudi-kafka-connect/demo/config-sink.json @@ -10,6 +10,7 @@ "topics": "hudi-test-topic", "hoodie.table.name": "hudi-test-topic", "hoodie.table.type": "MERGE_ON_READ", + "hoodie.metadata.enable": "false", "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic", "hoodie.datasource.write.recordkey.field": "volume", "hoodie.datasource.write.partitionpath.field": "date", diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java index 773ce1e048a7d..714d4d2a85f5c 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java @@ -73,6 +73,11 @@ public class KafkaConnectConfigs extends HoodieConfig { + "the coordinator will wait for the write statuses from all the partitions" + "to ignore the current commit and start a new commit."); + public static final ConfigProperty ASYNC_COMPACT_ENABLE = ConfigProperty + .key("hoodie.kafka.compaction.async.enable") + .defaultValue("true") + .withDocumentation("Controls whether async compaction should be turned on for MOR table writing."); + public static final ConfigProperty META_SYNC_ENABLE = ConfigProperty .key("hoodie.meta.sync.enable") .defaultValue("false") @@ -121,6 +126,10 @@ public String getKafkaValueConverter() { return getString(KAFKA_VALUE_CONVERTER); } + public Boolean isAsyncCompactEnabled() { + return getBoolean(ASYNC_COMPACT_ENABLE); + } + public Boolean isMetaSyncEnabled() { return getBoolean(META_SYNC_ENABLE); } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java index 8039e56d37ba5..7ec3034bfedcb 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -54,6 +55,8 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic private final Option tableMetaClient; private final Configuration hadoopConf; + private final HoodieWriteConfig writeConfig; + private final KafkaConnectConfigs connectConfigs; private final String tableBasePath; private final String tableName; private final HoodieEngineContext context; @@ -61,8 +64,11 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic private final HoodieJavaWriteClient javaClient; public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException { - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() - .withProperties(connectConfigs.getProps()).build(); + this.connectConfigs = connectConfigs; + this.writeConfig = HoodieWriteConfig.newBuilder() + .withEngineType(EngineType.JAVA) + .withProperties(connectConfigs.getProps()) + .build(); tableBasePath = writeConfig.getBasePath(); tableName = writeConfig.getTableName(); @@ -95,6 +101,7 @@ public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throw } } + @Override public String startCommit() { String newCommitTime = javaClient.startCommit(); javaClient.transitionInflight(newCommitTime); @@ -102,11 +109,23 @@ public String startCommit() { return newCommitTime; } + @Override public void endCommit(String commitTime, List writeStatuses, Map extraMetadata) { javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata)); LOG.info("Ending Hudi commit " + commitTime); + + // Schedule clustering and compaction as needed. + if (writeConfig.isAsyncClusteringEnabled()) { + javaClient.scheduleClustering(Option.empty()).ifPresent( + instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs)); + } + if (isAsyncCompactionEnabled()) { + javaClient.scheduleCompaction(Option.empty()).ifPresent( + instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs)); + } } + @Override public Map fetchLatestExtraCommitMetadata() { if (tableMetaClient.isPresent()) { Option metadata = KafkaConnectUtils.getCommitMetadataForLatestInstant(tableMetaClient.get()); @@ -119,4 +138,10 @@ public Map fetchLatestExtraCommitMetadata() { } throw new HoodieException("Fatal error retrieving Hoodie Extra Metadata since Table Meta Client is absent"); } + + private boolean isAsyncCompactionEnabled() { + return tableMetaClient.isPresent() + && HoodieTableType.MERGE_ON_READ.equals(tableMetaClient.get().getTableType()) + && connectConfigs.isAsyncCompactEnabled(); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 75f55bb26544f..ce69eff043212 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -164,7 +164,7 @@ private int doSchedule(JavaSparkContext jsc) throws Exception { // Get schema. SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props); - if (cfg.compactionInstantTime == null) { + if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) { throw new IllegalArgumentException("No instant time is provided for scheduling compaction. " + "Please specify the compaction instant time by using --instant-time."); }