diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index b2b873f377b5c..e97cec6d2bcb2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -125,7 +125,13 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Enable running of clustering service, asynchronously as inserts happen on the table."); - private HoodieClusteringConfig() { + public static final ConfigProperty CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA = ConfigProperty + .key("hoodie.clustering.preserve.commit.metadata") + .defaultValue(false) + .sinceVersion("0.9.0") + .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); + + public HoodieClusteringConfig() { super(); } @@ -214,6 +220,11 @@ public Builder withAsyncClustering(Boolean asyncClustering) { return this; } + public Builder withPreserveHoodieCommitMetadata(Boolean preserveHoodieCommitMetadata) { + clusteringConfig.setValue(CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA, String.valueOf(preserveHoodieCommitMetadata)); + return this; + } + public HoodieClusteringConfig build() { clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName()); return clusteringConfig; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 4c7a3c5aff0de..6e87257b3f428 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -700,6 +700,10 @@ public boolean isAsyncClusteringEnabled() { return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE); } + public boolean isPreserveHoodieCommitMetadata() { + return getBoolean(HoodieClusteringConfig.CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA); + } + public boolean isClusteringEnabled() { // TODO: future support async clustering return inlineClusteringEnabled() || isAsyncClusteringEnabled(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java index 3a04c061a51a3..9e23b6f2ceacb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateHandleFactory.java @@ -25,12 +25,22 @@ public class CreateHandleFactory extends WriteHandleFactory { + private boolean preserveMetadata = false; + + public CreateHandleFactory() { + this(false); + } + + public CreateHandleFactory(boolean preserveMetadata) { + this.preserveMetadata = preserveMetadata; + } + @Override public HoodieWriteHandle create(final HoodieWriteConfig hoodieConfig, final String commitTime, - final HoodieTable hoodieTable, final String partitionPath, - final String fileIdPrefix, TaskContextSupplier taskContextSupplier) { + final HoodieTable hoodieTable, final String partitionPath, + final String fileIdPrefix, TaskContextSupplier taskContextSupplier) { return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, - getNextFileId(fileIdPrefix), taskContextSupplier); + getNextFileId(fileIdPrefix), taskContextSupplier, preserveMetadata); } -} \ No newline at end of file +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index b57a43991fa5b..eef5b3d38c97c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -59,18 +59,33 @@ public class HoodieCreateHandle extends protected long recordsDeleted = 0; private Map> recordMap; private boolean useWriterSchema = false; + private boolean preserveHoodieMetadata = false; public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(), - taskContextSupplier); + taskContextSupplier, false); + } + + public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, + boolean preserveHoodieMetadata) { + this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(), + taskContextSupplier, preserveHoodieMetadata); } public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String partitionPath, String fileId, Option overriddenSchema, TaskContextSupplier taskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, overriddenSchema, taskContextSupplier, false); + } + + public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, Option overriddenSchema, + TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) { super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema, taskContextSupplier); + this.preserveHoodieMetadata = preserveHoodieMetadata; writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); writeStatus.setStat(new HoodieWriteStat()); @@ -119,7 +134,11 @@ public void write(HoodieRecord record, Option avroRecord) { } // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get()); - fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); + if (preserveHoodieMetadata) { + fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema); + } else { + fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record); + } // update the new location of record, so we know where to find it next record.unseal(); record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId())); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java new file mode 100644 index 0000000000000..9ab44d0f62f1b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieUnboundedCreateHandle.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * A HoodieCreateHandle which writes all data into a single file. + *

+ * Please use this with caution. This can end up creating very large files if not used correctly. + */ +public class HoodieUnboundedCreateHandle extends HoodieCreateHandle { + + private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class); + + public HoodieUnboundedCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, + boolean preserveHoodieMetadata) { + super(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(), + taskContextSupplier, preserveHoodieMetadata); + } + + @Override + public boolean canWrite(HoodieRecord record) { + return true; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java new file mode 100644 index 0000000000000..dd917cb3c1e4f --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SingleFileHandleCreateFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A SingleFileHandleCreateFactory is used to write all data in the spark partition into a single data file. + *

+ * Please use this with caution. This can end up creating very large files if not used correctly. + */ +public class SingleFileHandleCreateFactory extends WriteHandleFactory { + + private AtomicBoolean isHandleCreated = new AtomicBoolean(false); + private String fileId; + private boolean preserveHoodieMetadata; + + public SingleFileHandleCreateFactory(String fileId, boolean preserveHoodieMetadata) { + super(); + this.fileId = fileId; + this.preserveHoodieMetadata = preserveHoodieMetadata; + } + + @Override + public HoodieWriteHandle create(final HoodieWriteConfig hoodieConfig, final String commitTime, + final HoodieTable hoodieTable, final String partitionPath, + final String fileIdPrefix, TaskContextSupplier taskContextSupplier) { + + if (isHandleCreated.compareAndSet(false, true)) { + return new HoodieUnboundedCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, + fileId, // ignore idPfx, always use same fileId + taskContextSupplier, preserveHoodieMetadata); + } + + throw new HoodieIOException("Fixed handle create is only expected to be invoked once"); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java index dce6eeac3bd0b..163947fa34481 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java @@ -18,25 +18,27 @@ package org.apache.hudi.table.action.cluster.strategy; -import org.apache.avro.Schema; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.Serializable; -import java.util.Map; /** * Pluggable implementation for writing data into new file groups based on ClusteringPlan. */ -public abstract class ClusteringExecutionStrategy implements Serializable { +public abstract class ClusteringExecutionStrategy implements Serializable { private static final Logger LOG = LogManager.getLogger(ClusteringExecutionStrategy.class); - private final HoodieTable hoodieTable; - private final HoodieEngineContext engineContext; + private final HoodieTable hoodieTable; + private final transient HoodieEngineContext engineContext; private final HoodieWriteConfig writeConfig; public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { @@ -50,10 +52,9 @@ public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engine * file groups created is bounded by numOutputGroups. * Note that commit is not done as part of strategy. commit is callers responsibility. */ - public abstract O performClustering(final I inputRecords, final int numOutputGroups, final String instantTime, - final Map strategyParams, final Schema schema); + public abstract HoodieWriteMetadata performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime); - protected HoodieTable getHoodieTable() { + protected HoodieTable getHoodieTable() { return this.hoodieTable; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java index d3b39f59eda0c..16954e5bb4b00 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -97,6 +97,7 @@ public Option generateClusteringPlan() { .setInputGroups(clusteringGroups) .setExtraMetadata(getExtraMetadata()) .setVersion(getPlanVersion()) + .setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadata()) .build()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java index d2aa8627bd275..850f3e0761851 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java @@ -43,5 +43,6 @@ public abstract O bulkInsert(I inputRecords, String instantTime, boolean performDedupe, Option> userDefinedBulkInsertPartitioner, boolean addMetadataFields, - int parallelism); + int parallelism, + boolean preserveMetadata); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index e12bfafa2a7ed..aec84a50e18af 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -216,7 +216,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta } } - protected Map> getPartitionToReplacedFileIds(List writeStatuses) { + protected Map> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeMetadata) { return Collections.emptyMap(); } @@ -330,7 +330,7 @@ public void updateIndexAndCommitIfNeeded(List writeStatuses, Hoodie List statuses = table.getIndex().updateLocation(writeStatuses, context, table); result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); result.setWriteStatuses(statuses); - result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses)); + result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result)); commitOnAutoCommit(result); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index cce8ad1b000df..9142569f96072 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -71,7 +71,7 @@ public HoodieWriteMetadata> bulkInsert(final List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism()); + List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false); //update index ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -85,7 +85,8 @@ public List bulkInsert(List> inputRecords, boolean performDedupe, Option> userDefinedBulkInsertPartitioner, boolean useWriterSchema, - int parallelism) { + int parallelism, + boolean preserveHoodieMetadata) { // De-dupe/merge if needed List> dedupedRecords = inputRecords; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java index 519cb76fc24af..b80191909351c 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java @@ -64,9 +64,9 @@ protected String getCommitActionType() { } @Override - protected Map> getPartitionToReplacedFileIds(List writeStatuses) { + protected Map> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeResult) { return context.mapToPair( - writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()), + writeResult.getWriteStatuses().stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()), partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1 ); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java index ca6885ccf52d2..54259c94175a3 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import java.util.HashMap; import java.util.List; @@ -48,7 +49,7 @@ protected List getAllExistingFileIds(String partitionPath) { } @Override - protected Map> getPartitionToReplacedFileIds(List writeStatuses) { + protected Map> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeResult) { Map> partitionToExistingFileIds = new HashMap<>(); List partitionPaths = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), config.useFileListingMetadata(), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index f706412a59561..a88da49dcb405 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -70,6 +70,7 @@ import java.text.ParseException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @SuppressWarnings("checkstyle:LineLength") public class SparkRDDWriteClient extends @@ -357,11 +358,13 @@ public HoodieWriteMetadata> cluster(String clusteringInstan private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD writeStatuses, HoodieTable>, JavaRDD, JavaRDD> table, String clusteringCommitTime) { + + List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> + e.getValue().stream()).collect(Collectors.toList()); - List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - if (!writeStatuses.filter(WriteStatus::hasErrors).isEmpty()) { + if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) { throw new HoodieClusteringException("Clustering failed to write to files:" - + writeStatuses.filter(WriteStatus::hasErrors).map(WriteStatus::getFileId).collect()); + + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } finalizeWrite(table, clusteringCommitTime, writeStats); try { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java index 63bcc6915b941..21f1609e86274 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java @@ -18,41 +18,23 @@ package org.apache.hudi.client.clustering.plan.strategy; -import org.apache.hudi.avro.model.HoodieClusteringGroup; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieSparkMergeOnReadTable; -import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaRDD; -import java.util.ArrayList; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY; /** - * Clustering Strategy based on following. - * 1) Only looks at latest 'daybased.lookback.partitions' partitions. - * 2) Excludes files that are greater than 'small.file.limit' from clustering plan. + * Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions. */ public class SparkRecentDaysClusteringPlanStrategy> - extends PartitionAwareClusteringPlanStrategy>, JavaRDD, JavaRDD> { + extends SparkSizeBasedClusteringPlanStrategy { private static final Logger LOG = LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class); public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable table, @@ -67,49 +49,6 @@ public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable tabl super(table, engineContext, writeConfig); } - @Override - protected Stream buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { - List, Integer>> fileSliceGroups = new ArrayList<>(); - List currentGroup = new ArrayList<>(); - long totalSizeSoFar = 0; - for (FileSlice currentSlice : fileSlices) { - // assume each filegroup size is ~= parquet.max.file.size - totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize(); - // check if max size is reached and create new group, if needed. - if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { - int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()); - LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: " - + getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); - fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); - currentGroup = new ArrayList<>(); - totalSizeSoFar = 0; - } - currentGroup.add(currentSlice); - } - if (!currentGroup.isEmpty()) { - int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()); - LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: " - + getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); - fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); - } - - return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() - .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) - .setNumOutputFileGroups(fileSliceGroup.getRight()) - .setMetrics(buildMetrics(fileSliceGroup.getLeft())) - .build()); - } - - @Override - protected Map getStrategyParams() { - Map params = new HashMap<>(); - if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) { - params.put(CLUSTERING_SORT_COLUMNS_PROPERTY.key(), getWriteConfig().getClusteringSortColumns()); - } - return params; - } - - @Override protected List filterPartitionPaths(List partitionPaths) { int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); return partitionPaths.stream() @@ -117,15 +56,4 @@ protected List filterPartitionPaths(List partitionPaths) { .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size()) .collect(Collectors.toList()); } - - @Override - protected Stream getFileSlicesEligibleForClustering(final String partition) { - return super.getFileSlicesEligibleForClustering(partition) - // Only files that have basefile size smaller than small file size are eligible. - .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit()); - } - - private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) { - return (int) Math.ceil(groupSize / (double) targetFileSize); - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java new file mode 100644 index 0000000000000..549935d2fdfca --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkMergeOnReadTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX; + +/** + * Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive. + */ +public class SparkSelectedPartitionsClusteringPlanStrategy> + extends SparkSizeBasedClusteringPlanStrategy { + private static final Logger LOG = LogManager.getLogger(SparkSelectedPartitionsClusteringPlanStrategy.class); + + public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition"; + public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition"; + + public SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkCopyOnWriteTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkMergeOnReadTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected List filterPartitionPaths(List partitionPaths) { + String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION); + String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION); + List filteredPartitions = partitionPaths.stream() + .filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0) + .collect(Collectors.toList()); + LOG.info("Filtered to the following partitions: " + filteredPartitions); + return filteredPartitions; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java new file mode 100644 index 0000000000000..aa68f3022d2fc --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkMergeOnReadTable; +import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY; + +/** + * Clustering Strategy based on following. + * 1) Creates clustering groups based on max size allowed per group. + * 2) Excludes files that are greater than 'small.file.limit' from clustering plan. + */ +public class SparkSizeBasedClusteringPlanStrategy> + extends PartitionAwareClusteringPlanStrategy>, JavaRDD, JavaRDD> { + private static final Logger LOG = LogManager.getLogger(SparkSizeBasedClusteringPlanStrategy.class); + + public SparkSizeBasedClusteringPlanStrategy(HoodieSparkCopyOnWriteTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public SparkSizeBasedClusteringPlanStrategy(HoodieSparkMergeOnReadTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected Stream buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { + List, Integer>> fileSliceGroups = new ArrayList<>(); + List currentGroup = new ArrayList<>(); + long totalSizeSoFar = 0; + for (FileSlice currentSlice : fileSlices) { + // assume each filegroup size is ~= parquet.max.file.size + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize(); + // check if max size is reached and create new group, if needed. + if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()); + LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: " + + getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); + currentGroup = new ArrayList<>(); + totalSizeSoFar = 0; + } + currentGroup.add(currentSlice); + } + if (!currentGroup.isEmpty()) { + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()); + LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: " + + getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); + } + + return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputFileGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); + } + + @Override + protected Map getStrategyParams() { + Map params = new HashMap<>(); + if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) { + params.put(CLUSTERING_SORT_COLUMNS_PROPERTY.key(), getWriteConfig().getClusteringSortColumns()); + } + return params; + } + + @Override + protected List filterPartitionPaths(List partitionPaths) { + return partitionPaths; + } + + @Override + protected Stream getFileSlicesEligibleForClustering(final String partition) { + return super.getFileSlicesEligibleForClustering(partition) + // Only files that have basefile size smaller than small file size are eligible. + .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit()); + } + + private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) { + return (int) Math.ceil(groupSize / (double) targetFileSize); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java new file mode 100644 index 0000000000000..b66ce7692d0df --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.utils.ConcatenatingIterator; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.RewriteAvroPayload; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.log.HoodieFileSliceReader; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.IOUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Clustering strategy to submit multiple spark jobs and union the results. + */ +public abstract class MultipleSparkJobExecutionStrategy> + extends ClusteringExecutionStrategy>, JavaRDD, JavaRDD> { + private static final Logger LOG = LogManager.getLogger(MultipleSparkJobExecutionStrategy.class); + + public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + public HoodieWriteMetadata> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) { + // execute clustering for each group async and collect WriteStatus + JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); + // execute clustering for each group async and collect WriteStatus + Stream> writeStatusRDDStream = clusteringPlan.getInputGroups().stream() + .map(inputGroup -> runClusteringForGroupAsync(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), + instantTime)) + .map(CompletableFuture::join); + + JavaRDD[] writeStatuses = convertStreamToArray(writeStatusRDDStream); + JavaRDD writeStatusRDD = engineContext.union(writeStatuses); + + HoodieWriteMetadata> writeMetadata = new HoodieWriteMetadata<>(); + writeMetadata.setWriteStatuses(writeStatusRDD); + return writeMetadata; + } + + + /** + * Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. The number of new + * file groups created is bounded by numOutputGroups. + * Note that commit is not done as part of strategy. commit is callers responsibility. + */ + public abstract JavaRDD performClusteringWithRecordsRDD(final JavaRDD> inputRecords, final int numOutputGroups, final String instantTime, + final Map strategyParams, final Schema schema, + final List fileGroupIdList, final boolean preserveHoodieMetadata); + + + /** + * Submit job to execute clustering for the group. + */ + private CompletableFuture> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map strategyParams, + boolean preserveHoodieMetadata, String instantTime) { + CompletableFuture> writeStatusesFuture = CompletableFuture.supplyAsync(() -> { + JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); + JavaRDD> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime); + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); + List inputFileIds = clusteringGroup.getSlices().stream() + .map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())) + .collect(Collectors.toList()); + return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata); + }); + + return writeStatusesFuture; + } + + /** + * Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any). + */ + private JavaRDD> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) { + List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList()); + boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent(); + if (hasLogFiles) { + // if there are log files, we read all records into memory for a file group and apply updates. + return readRecordsForGroupWithLogs(jsc, clusteringOps, instantTime); + } else { + // We want to optimize reading records for case there are no log files. + return readRecordsForGroupBaseFiles(jsc, clusteringOps); + } + } + + /** + * Read records from baseFiles, apply updates and convert to RDD. + */ + private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext jsc, + List clusteringOps, + String instantTime) { + HoodieWriteConfig config = getWriteConfig(); + HoodieTable table = getHoodieTable(); + return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { + List>> recordIterators = new ArrayList<>(); + clusteringOpsPartition.forEachRemaining(clusteringOp -> { + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config); + LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + .withFileSystem(table.getMetaClient().getFs()) + .withBasePath(table.getMetaClient().getBasePath()) + .withLogFilePaths(clusteringOp.getDeltaFilePaths()) + .withReaderSchema(readerSchema) + .withLatestInstantTime(instantTime) + .withMaxMemorySizeInBytes(maxMemoryPerCompaction) + .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) + .withReverseReader(config.getCompactionReverseLogReadEnabled()) + .withBufferSize(config.getMaxDFSStreamBufferSize()) + .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .build(); + + HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); + recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, + tableConfig.getPayloadClass(), + tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp())))); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }); + + return new ConcatenatingIterator<>(recordIterators); + }); + } + + /** + * Read records from baseFiles and convert to RDD. + */ + private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext jsc, + List clusteringOps) { + return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { + List> iteratorsForPartition = new ArrayList<>(); + clusteringOpsPartition.forEachRemaining(clusteringOp -> { + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }); + + return new ConcatenatingIterator<>(iteratorsForPartition); + }).map(this::transform); + } + + /** + * Stream to array conversion with generic type is not straightforward. + * Implement a utility method to abstract high level logic. This needs to be improved in future + */ + private JavaRDD[] convertStreamToArray(Stream> writeStatusRDDStream) { + Object[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new); + JavaRDD[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length]; + for (int i = 0; i < writeStatusObjects.length; i++) { + writeStatusRDDArray[i] = (JavaRDD) writeStatusObjects[i]; + } + return writeStatusRDDArray; + } + + /** + * Transform IndexedRecord into HoodieRecord. + */ + private HoodieRecord transform(IndexedRecord indexedRecord) { + GenericRecord record = (GenericRecord) indexedRecord; + Option keyGeneratorOpt = Option.empty(); + if (!getWriteConfig().populateMetaFields()) { + try { + keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps()))); + } catch (IOException e) { + throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); + } + } + String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt); + String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt); + HoodieKey hoodieKey = new HoodieKey(key, partition); + + HoodieRecordPayload avroPayload = new RewriteAvroPayload(record); + HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload); + return hoodieRecord; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java new file mode 100644 index 0000000000000..98bf9151fc9ef --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.utils.ConcatenatingIterator; +import org.apache.hudi.common.config.SerializableSchema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.RewriteAvroPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Clustering strategy to submit single spark jobs. + * MultipleSparkJobExecution strategy is not ideal for use cases that require large number of clustering groups + */ +public abstract class SingleSparkJobExecutionStrategy> + extends ClusteringExecutionStrategy>, JavaRDD, JavaRDD> { + private static final Logger LOG = LogManager.getLogger(SingleSparkJobExecutionStrategy.class); + + public SingleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + public HoodieWriteMetadata> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) { + JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); + final TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier(); + final SerializableSchema serializableSchema = new SerializableSchema(schema); + final List clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(clusteringGroup -> + ClusteringGroupInfo.create(clusteringGroup)).collect(Collectors.toList()); + + String umask = engineContext.hadoopConfiguration().get("fs.permissions.umask-mode"); + Broadcast umaskBroadcastValue = engineContext.broadcast(umask); + + JavaRDD groupInfoJavaRDD = engineContext.parallelize(clusteringGroupInfos, clusteringGroupInfos.size()); + LOG.info("number of partitions for clustering " + groupInfoJavaRDD.getNumPartitions()); + JavaRDD writeStatusRDD = groupInfoJavaRDD + .mapPartitions(clusteringOps -> { + Configuration configuration = new Configuration(); + configuration.set("fs.permissions.umask-mode", umaskBroadcastValue.getValue()); + Iterable clusteringOpsIterable = () -> clusteringOps; + List groupsInPartition = StreamSupport.stream(clusteringOpsIterable.spliterator(), false).collect(Collectors.toList()); + return groupsInPartition.stream().flatMap(clusteringOp -> + runClusteringForGroup(clusteringOp, clusteringPlan.getStrategy().getStrategyParams(), + Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), + serializableSchema, taskContextSupplier, instantTime) + ).iterator(); + }); + + HoodieWriteMetadata> writeMetadata = new HoodieWriteMetadata<>(); + writeMetadata.setWriteStatuses(writeStatusRDD); + return writeMetadata; + } + + + /** + * Submit job to execute clustering for the group. + */ + private Stream runClusteringForGroup(ClusteringGroupInfo clusteringOps, Map strategyParams, + boolean preserveHoodieMetadata, SerializableSchema schema, + TaskContextSupplier taskContextSupplier, String instantTime) { + + List inputFileIds = clusteringOps.getOperations().stream() + .map(op -> new HoodieFileGroupId(op.getPartitionPath(), op.getFileId())) + .collect(Collectors.toList()); + + Iterator> inputRecords = readRecordsForGroupBaseFiles(clusteringOps.getOperations()); + Iterator> writeStatuses = performClusteringWithRecordsIterator(inputRecords, clusteringOps.getNumOutputGroups(), instantTime, + strategyParams, schema.get(), inputFileIds, preserveHoodieMetadata, taskContextSupplier); + + Iterable> writestatusIterable = () -> writeStatuses; + return StreamSupport.stream(writestatusIterable.spliterator(), false) + .flatMap(writeStatusList -> writeStatusList.stream()); + } + + + /** + * Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. + * The number of new file groups created is bounded by numOutputGroups. + * Note that commit is not done as part of strategy. commit is callers responsibility. + */ + public abstract Iterator> performClusteringWithRecordsIterator(final Iterator> records, final int numOutputGroups, + final String instantTime, + final Map strategyParams, final Schema schema, + final List fileGroupIdList, final boolean preserveHoodieMetadata, + final TaskContextSupplier taskContextSupplier); + + /** + * Read records from baseFiles and get iterator. + */ + private Iterator> readRecordsForGroupBaseFiles(List clusteringOps) { + List>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> { + + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); + Iterable indexedRecords = () -> { + try { + return HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }; + + return StreamSupport.stream(indexedRecords.spliterator(), false).map(record -> transform(record)).iterator(); + }).collect(Collectors.toList()); + + return new ConcatenatingIterator<>(iteratorsForPartition); + } + + /** + * Transform IndexedRecord into HoodieRecord. + */ + private HoodieRecord transform(IndexedRecord indexedRecord) { + GenericRecord record = (GenericRecord) indexedRecord; + Option keyGeneratorOpt = Option.empty(); + if (!getWriteConfig().populateMetaFields()) { + try { + keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps()))); + } catch (IOException e) { + throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); + } + } + String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt); + String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt); + HoodieKey hoodieKey = new HoodieKey(key, partition); + + HoodieRecordPayload avroPayload = new RewriteAvroPayload(record); + HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload); + return hoodieRecord; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index 9ab40fc44401f..1c1cf5ef3e583 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -21,8 +21,8 @@ import org.apache.avro.Schema; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; @@ -30,14 +30,13 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.table.BulkInsertPartitioner; -import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; -import org.apache.hudi.table.HoodieSparkMergeOnReadTable; -import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -49,24 +48,19 @@ * 2) Uses bulk_insert to write data into new files. */ public class SparkSortAndSizeExecutionStrategy> - extends ClusteringExecutionStrategy>, JavaRDD, JavaRDD> { + extends MultipleSparkJobExecutionStrategy { private static final Logger LOG = LogManager.getLogger(SparkSortAndSizeExecutionStrategy.class); - public SparkSortAndSizeExecutionStrategy(HoodieSparkCopyOnWriteTable table, - HoodieSparkEngineContext engineContext, - HoodieWriteConfig writeConfig) { - super(table, engineContext, writeConfig); - } - - public SparkSortAndSizeExecutionStrategy(HoodieSparkMergeOnReadTable table, - HoodieSparkEngineContext engineContext, + public SparkSortAndSizeExecutionStrategy(HoodieTable table, + HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { super(table, engineContext, writeConfig); } @Override - public JavaRDD performClustering(final JavaRDD> inputRecords, final int numOutputGroups, - final String instantTime, final Map strategyParams, final Schema schema) { + public JavaRDD performClusteringWithRecordsRDD(final JavaRDD> inputRecords, final int numOutputGroups, + final String instantTime, final Map strategyParams, final Schema schema, + final List fileGroupIdList, final boolean preserveHoodieMetadata) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); Properties props = getWriteConfig().getProps(); props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM.key(), String.valueOf(numOutputGroups)); @@ -75,7 +69,7 @@ public JavaRDD performClustering(final JavaRDD> inp props.put(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); return (JavaRDD) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, getPartitioner(strategyParams, schema), true, numOutputGroups); + false, getPartitioner(strategyParams, schema), true, numOutputGroups, preserveHoodieMetadata); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java new file mode 100644 index 0000000000000..1547e8c24466d --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.update.strategy; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; +import org.apache.spark.api.java.JavaRDD; + +import java.util.HashSet; + +/** + * Allow ingestion commits during clustering job. + */ +public class SparkAllowUpdateStrategy> extends UpdateStrategy>> { + + public SparkAllowUpdateStrategy( + HoodieSparkEngineContext engineContext, HashSet fileGroupsInPendingClustering) { + super(engineContext, fileGroupsInPendingClustering); + } + + @Override + public JavaRDD> handleUpdate(JavaRDD> taggedRecordsRDD) { + return taggedRecordsRDD; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index f1d1416559160..088872bbd4381 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -57,8 +57,20 @@ public SparkLazyInsertIterable(Iterator> recordItr, String idPrefix, TaskContextSupplier taskContextSupplier, WriteHandleFactory writeHandleFactory) { + this(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, false, writeHandleFactory); + } + + public SparkLazyInsertIterable(Iterator> recordItr, + boolean areRecordsSorted, + HoodieWriteConfig config, + String instantTime, + HoodieTable hoodieTable, + String idPrefix, + TaskContextSupplier taskContextSupplier, + boolean useWriterSchema, + WriteHandleFactory writeHandleFactory) { super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory); - this.useWriterSchema = false; + this.useWriterSchema = useWriterSchema; } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java index 96da3969eecb2..f7d04df132ae2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.SparkLazyInsertIterable; +import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.function.Function2; @@ -42,21 +43,25 @@ public class BulkInsertMapFunction private HoodieTable hoodieTable; private List fileIDPrefixes; private boolean useWriterSchema; + private boolean preserveMetadata; public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted, HoodieWriteConfig config, HoodieTable hoodieTable, - List fileIDPrefixes, boolean useWriterSchema) { + List fileIDPrefixes, boolean useWriterSchema, + boolean preserveMetadata) { this.instantTime = instantTime; this.areRecordsSorted = areRecordsSorted; this.config = config; this.hoodieTable = hoodieTable; this.fileIDPrefixes = fileIDPrefixes; this.useWriterSchema = useWriterSchema; + this.preserveMetadata = preserveMetadata; } @Override public Iterator> call(Integer partition, Iterator> recordItr) { return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, - fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema); + fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, + new CreateHandleFactory(preserveMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index 94c7e937d013e..e734b4a406a67 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -21,20 +21,14 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; -import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.client.utils.ConcatenatingIterator; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableConfig; -import org.apache.hudi.common.table.log.HoodieFileSliceReader; -import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ClusteringUtils; @@ -44,32 +38,21 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; -import org.apache.hudi.io.IOUtils; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; -import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; public class SparkExecuteClusteringCommitActionExecutor> extends BaseSparkCommitActionExecutor { @@ -92,46 +75,28 @@ public HoodieWriteMetadata> execute() { table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); table.getMetaClient().reloadActiveTimeline(); - JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(context); - // execute clustering for each group async and collect WriteStatus - Stream> writeStatusRDDStream = clusteringPlan.getInputGroups().stream() - .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams())) - .map(CompletableFuture::join); - - JavaRDD[] writeStatuses = convertStreamToArray(writeStatusRDDStream); - JavaRDD writeStatusRDD = engineContext.union(writeStatuses); - - HoodieWriteMetadata> writeMetadata = buildWriteMetadata(writeStatusRDD); + final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + HoodieWriteMetadata> writeMetadata = ((ClusteringExecutionStrategy>, JavaRDD, JavaRDD>) + ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), + new Class[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config)) + .performClustering(clusteringPlan, schema, instantTime); + JavaRDD writeStatusRDD = writeMetadata.getWriteStatuses(); JavaRDD statuses = updateIndex(writeStatusRDD, writeMetadata); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect()); - // validate clustering action before committing result + writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata)); validateWriteResult(writeMetadata); commitOnAutoCommit(writeMetadata); if (!writeMetadata.getCommitMetadata().isPresent()) { - HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(), + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); writeMetadata.setCommitMetadata(Option.of(commitMetadata)); } return writeMetadata; } - /** - * Stream to array conversion with generic type is not straightforward. - * Implement a utility method to abstract high level logic. This needs to be improved in future - */ - private JavaRDD[] convertStreamToArray(Stream> writeStatusRDDStream) { - Object[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new); - JavaRDD[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length]; - for (int i = 0; i < writeStatusObjects.length; i++) { - writeStatusRDDArray[i] = (JavaRDD) writeStatusObjects[i]; - } - return writeStatusRDDArray; - } - /** * Validate actions taken by clustering. In the first implementation, we validate at least one new file is written. * But we can extend this to add more validation. E.g. number of records read = number of records written etc. - * * We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions. */ private void validateWriteResult(HoodieWriteMetadata> writeMetadata) { @@ -143,134 +108,18 @@ private void validateWriteResult(HoodieWriteMetadata> write } } - /** - * Submit job to execute clustering for the group. - */ - private CompletableFuture> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map strategyParams) { - CompletableFuture> writeStatusesFuture = CompletableFuture.supplyAsync(() -> { - JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); - JavaRDD> inputRecords = readRecordsForGroup(jsc, clusteringGroup); - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - return ((ClusteringExecutionStrategy>, JavaRDD, JavaRDD>) - ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), table, context, config)) - .performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema); - }); - - return writeStatusesFuture; - } - @Override protected String getCommitActionType() { return HoodieTimeline.REPLACE_COMMIT_ACTION; } @Override - protected Map> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { - return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).collect( - Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); - } - - /** - * Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any). - */ - private JavaRDD> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup) { - List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList()); - boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent(); - if (hasLogFiles) { - // if there are log files, we read all records into memory for a file group and apply updates. - return readRecordsForGroupWithLogs(jsc, clusteringOps); - } else { - // We want to optimize reading records for case there are no log files. - return readRecordsForGroupBaseFiles(jsc, clusteringOps); - } - } - - /** - * Read records from baseFiles, apply updates and convert to RDD. - */ - private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext jsc, - List clusteringOps) { - return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { - List>> recordIterators = new ArrayList<>(); - clusteringOpsPartition.forEachRemaining(clusteringOp -> { - long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config); - LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); - try { - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); - HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(table.getMetaClient().getFs()) - .withBasePath(table.getMetaClient().getBasePath()) - .withLogFilePaths(clusteringOp.getDeltaFilePaths()) - .withReaderSchema(readerSchema) - .withLatestInstantTime(instantTime) - .withMaxMemorySizeInBytes(maxMemoryPerCompaction) - .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) - .withReverseReader(config.getCompactionReverseLogReadEnabled()) - .withBufferSize(config.getMaxDFSStreamBufferSize()) - .withSpillableMapBasePath(config.getSpillableMapBasePath()) - .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) - .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) - .build(); - - HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); - recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, - tableConfig.getPayloadClass(), - tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp())))); - } catch (IOException e) { - throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() - + " and " + clusteringOp.getDeltaFilePaths(), e); - } - }); - - return new ConcatenatingIterator<>(recordIterators); - }); - } - - /** - * Read records from baseFiles and convert to RDD. - */ - private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext jsc, - List clusteringOps) { - return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { - List> iteratorsForPartition = new ArrayList<>(); - clusteringOpsPartition.forEachRemaining(clusteringOp -> { - try { - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); - iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); - } catch (IOException e) { - throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() - + " and " + clusteringOp.getDeltaFilePaths(), e); - } - }); - - return new ConcatenatingIterator<>(iteratorsForPartition); - }).map(this::transform); - } - - /** - * Transform IndexedRecord into HoodieRecord. - */ - private HoodieRecord transform(IndexedRecord indexedRecord) { - GenericRecord record = (GenericRecord) indexedRecord; - String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt); - String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt); - HoodieKey hoodieKey = new HoodieKey(key, partition); - - HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(table.getMetaClient().getTableConfig().getPayloadClass(), - new Object[] {Option.of(record)}, Option.class); - HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload); - return hoodieRecord; - } - - private HoodieWriteMetadata> buildWriteMetadata(JavaRDD writeStatusJavaRDD) { - HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); - result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD)); - result.setWriteStatuses(writeStatusJavaRDD); - result.setCommitMetadata(Option.empty()); - result.setCommitted(false); - return result; + protected Map> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeMetadata) { + Set newFilesWritten = new HashSet(writeMetadata.getWriteStats().get().stream() + .map(s -> new HoodieFileGroupId(s.getPartitionPath(),s.getFileId())) + .collect(Collectors.toList())); + return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan) + .filter(fg -> !newFilesWritten.contains(fg)) + .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index a9ddab398d9ce..2bc1f0302798e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -237,12 +237,12 @@ protected JavaRDD updateIndex(JavaRDD writeStatusRDD, JavaRDD statuses = table.getIndex().updateLocation(writeStatusRDD, context, table); result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); result.setWriteStatuses(statuses); - result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses)); return statuses; } protected void updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, HoodieWriteMetadata result) { updateIndex(writeStatusRDD, result); + result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result)); commitOnAutoCommit(result); } @@ -281,7 +281,7 @@ protected void commit(Option> extraMetadata, HoodieWriteMeta } } - protected Map> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { + protected Map> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeStatuses) { return Collections.emptyMap(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 66fd68e15496c..322d19194ae81 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -72,7 +72,7 @@ public HoodieWriteMetadata> bulkInsert(final JavaRDD writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism()); + JavaRDD writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false); //update index ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -86,7 +86,8 @@ public JavaRDD bulkInsert(JavaRDD> inputRecords, boolean performDedupe, Option> userDefinedBulkInsertPartitioner, boolean useWriterSchema, - int parallelism) { + int parallelism, + boolean preserveMetadata) { // De-dupe/merge if needed JavaRDD> dedupedRecords = inputRecords; @@ -108,7 +109,7 @@ public JavaRDD bulkInsert(JavaRDD> inputRecords, JavaRDD writeStatusRDD = repartitionedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction(instantTime, - partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema), true) + partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, preserveMetadata), true) .flatMap(List::iterator); return writeStatusRDD; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java index 7419a87f59e01..bff85e7fe1c91 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java @@ -72,8 +72,8 @@ protected String getCommitActionType() { } @Override - protected Map> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { - return writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath -> + protected Map> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeMetadata) { + return writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java index 150d2f41665ee..f7c98d5373360 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; @@ -45,7 +46,7 @@ public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context } @Override - protected Map> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { + protected Map> getPartitionToReplacedFileIds(HoodieWriteMetadata> writeMetadata) { Map> partitionToExistingFileIds = new HashMap<>(); List partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index a53b83e43f86a..fd3b47a2f3caf 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -98,6 +98,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -165,6 +166,15 @@ private static Stream populateMetaFieldsParams() { return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of); } + private static Stream populateMetaFieldsAndPreserveMetadataParams() { + return Arrays.stream(new Boolean[][] { + {true, true}, + {false, true}, + {true, false}, + {false, false} + }).map(Arguments::of); + } + private static Stream rollbackFailedCommitsParams() { return Stream.of( Arguments.of(HoodieFailedWritesCleaningPolicy.LAZY, true), @@ -1292,21 +1302,23 @@ public void testDeletesWithDeleteApi() throws Exception { } @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testSimpleClustering(boolean populateMetaFields) throws Exception { + @MethodSource("populateMetaFieldsAndPreserveMetadataParams") + public void testSimpleClustering(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1) + .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception { + @MethodSource("populateMetaFieldsAndPreserveMetadataParams") + public void testClusteringWithSortColumns(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key") - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1) + .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } @@ -1401,37 +1413,37 @@ private List testInsertAndClustering(HoodieClusteringConfig clust private List testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception { - List allRecords = testInsertTwoBatches(populateMetaFields); + Pair, List> allRecords = testInsertTwoBatches(populateMetaFields); testClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords); - return allRecords; + return allRecords.getLeft(); } - private List testInsertTwoBatches(boolean populateMetaFields) throws IOException { + private Pair, List> testInsertTwoBatches(boolean populateMetaFields) throws IOException { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); SparkRDDWriteClient client = getHoodieWriteClient(config); dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"}); - String commitTime = HoodieActiveTimeline.createNewInstantTime(); - List records1 = dataGen.generateInserts(commitTime, 200); - List statuses1 = writeAndVerifyBatch(client, records1, commitTime, populateMetaFields); + String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); + List records1 = dataGen.generateInserts(commitTime1, 200); + List statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields); Set fileIds1 = getFileGroupIdsFromWriteStatus(statuses1); - commitTime = HoodieActiveTimeline.createNewInstantTime(); - List records2 = dataGen.generateInserts(commitTime, 200); - List statuses2 = writeAndVerifyBatch(client, records2, commitTime, populateMetaFields); + String commitTime2 = HoodieActiveTimeline.createNewInstantTime(); + List records2 = dataGen.generateInserts(commitTime2, 200); + List statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields); Set fileIds2 = getFileGroupIdsFromWriteStatus(statuses2); //verify new files are created for 2nd write Set fileIdIntersection = new HashSet<>(fileIds1); fileIdIntersection.retainAll(fileIds2); assertEquals(0, fileIdIntersection.size()); - return Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()); + return Pair.of(Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1, commitTime2)); } - private String testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, - String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation, - List allRecords) throws IOException { + private void testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, + String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation, + Pair, List> allRecords) throws IOException { HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false) .withClusteringConfig(clusteringConfig) @@ -1442,10 +1454,7 @@ private String testClustering(HoodieClusteringConfig clusteringConfig, boolean p if (completeClustering) { String clusteringCommitTime = metaClient.reloadActiveTimeline().getCompletedReplaceTimeline() .getReverseOrderedInstants().findFirst().get().getTimestamp(); - verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config); - return clusteringCommitTime; - } else { - return ""; + verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config); } } @@ -1454,7 +1463,7 @@ private HoodieWriteMetadata> performClustering(HoodieCluste boolean completeClustering, String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation, - List allRecords) throws IOException { + Pair, List> allRecords) throws IOException { HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder() .withPreCommitValidator(StringUtils.nullToEmpty(validatorClasses)) .withPrecommitValidatorEqualitySqlQueries(sqlQueryForEqualityValidation) @@ -1470,7 +1479,11 @@ private HoodieWriteMetadata> performClustering(HoodieCluste SparkRDDWriteClient client = getHoodieWriteClient(config); String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); HoodieWriteMetadata> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering); - verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config); + if (config.isPreserveHoodieCommitMetadata() && config.populateMetaFields()) { + verifyRecordsWrittenWithPreservedMetadata(new HashSet<>(allRecords.getRight()), allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect()); + } else { + verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config); + } Set replacedFileIds = new HashSet<>(); clusterMetadata.getPartitionToReplaceFileIds().entrySet().forEach(partitionFiles -> @@ -1663,13 +1676,7 @@ private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2Re private void verifyRecordsWritten(String commitTime, boolean populateMetadataField, List expectedRecords, List allStatus, HoodieWriteConfig config) throws IOException { List records = new ArrayList<>(); - for (WriteStatus status : allStatus) { - Path filePath = new Path(basePath, status.getStat().getPath()); - records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath)); - } - - Set expectedKeys = recordsToRecordKeySet(expectedRecords); - assertEquals(records.size(), expectedKeys.size()); + Set expectedKeys = verifyRecordKeys(expectedRecords, allStatus, records); if (config.populateMetaFields()) { for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); @@ -1689,6 +1696,29 @@ private void verifyRecordsWritten(String commitTime, boolean populateMetadataFie } } + @NotNull + private Set verifyRecordKeys(List expectedRecords, List allStatus, List records) { + for (WriteStatus status : allStatus) { + Path filePath = new Path(basePath, status.getStat().getPath()); + records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath)); + } + Set expectedKeys = recordsToRecordKeySet(expectedRecords); + assertEquals(records.size(), expectedKeys.size()); + return expectedKeys; + } + + private void verifyRecordsWrittenWithPreservedMetadata(Set commitTimes, List expectedRecords, List allStatus) { + List records = new ArrayList<>(); + Set expectedKeys = verifyRecordKeys(expectedRecords, allStatus, records); + Map> recordsByCommitTime = records.stream() + .collect(Collectors.groupingBy(r -> r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString())); + assertTrue(commitTimes.containsAll(recordsByCommitTime.keySet())); + for (GenericRecord record : records) { + String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + assertTrue(expectedKeys.contains(recordKey)); + } + } + private List writeAndVerifyBatch(SparkRDDWriteClient client, List inserts, String commitTime, boolean populateMetaFields) throws IOException { client.startCommitWithTime(commitTime); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts, 2); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index e7deb044d5503..5bc293a4630ce 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -152,6 +152,15 @@ private static Stream populateMetaFieldsParams() { return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of); } + private static Stream populateMetaFieldsAndPreserveMetadataParams() { + return Arrays.stream(new Boolean[][] { + {true, true}, + {false, true}, + {true, false}, + {false, false} + }).map(Arguments::of); + } + @ParameterizedTest @MethodSource("populateMetaFieldsParams") public void testSimpleInsertAndUpdate(boolean populateMetaFields) throws Exception { @@ -254,25 +263,25 @@ public void testSimpleInsertAndUpdateHFile() throws Exception { } @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testSimpleClusteringNoUpdates(boolean populateMetaFields) throws Exception { + @MethodSource("populateMetaFieldsAndPreserveMetadataParams") + public void testSimpleClusteringNoUpdates(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { clean(); init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); - testClustering(false, populateMetaFields); + testClustering(false, populateMetaFields, preserveCommitMetadata); } @ParameterizedTest - @MethodSource("populateMetaFieldsParams") - public void testSimpleClusteringWithUpdates(boolean populateMetaFields) throws Exception { + @MethodSource("populateMetaFieldsAndPreserveMetadataParams") + public void testSimpleClusteringWithUpdates(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { clean(); init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields); - testClustering(true, populateMetaFields); + testClustering(true, populateMetaFields, preserveCommitMetadata); } - private void testClustering(boolean doUpdates, boolean populateMetaFields) throws Exception { + private void testClustering(boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { // set low compaction small File Size to generate more file groups. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) - .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withPreserveHoodieCommitMetadata(preserveCommitMetadata).build(); HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, 10L, clusteringConfig); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 823d651aa1589..a12470d99375c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -225,7 +225,7 @@ public void testCompactionRetryOnFailureBasedOnTime() throws Exception { // Then: 1 delta commit is done, the failed compaction is retried metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); - assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); + assertEquals(5, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @@ -262,7 +262,7 @@ public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception { // Then: 1 delta commit is done, the failed compaction is retried metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build(); - assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); + assertEquals(5, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } } diff --git a/hudi-common/src/main/avro/HoodieClusteringPlan.avsc b/hudi-common/src/main/avro/HoodieClusteringPlan.avsc index bc8ed0eda6950..87486267d1ce5 100644 --- a/hudi-common/src/main/avro/HoodieClusteringPlan.avsc +++ b/hudi-common/src/main/avro/HoodieClusteringPlan.avsc @@ -45,6 +45,11 @@ "name":"version", "type":["int", "null"], "default": 1 + }, + { + "name":"preserveHoodieMetadata", + "type":["null", "boolean"], + "default": null } ] } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/ClusteringGroupInfo.java b/hudi-common/src/main/java/org/apache/hudi/common/model/ClusteringGroupInfo.java new file mode 100644 index 0000000000000..24a666a532ffe --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/ClusteringGroupInfo.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Encapsulates all the needed information about a clustering group. This is needed because spark serialization + * does not work with avro objects. + */ +public class ClusteringGroupInfo implements Serializable { + + private List operations; + private int numOutputGroups; + + public static ClusteringGroupInfo create(HoodieClusteringGroup clusteringGroup) { + List operations = clusteringGroup.getSlices().stream() + .map(ClusteringOperation::create).collect(Collectors.toList()); + + return new ClusteringGroupInfo(operations, clusteringGroup.getNumOutputFileGroups()); + } + + // Only for serialization/de-serialization + @Deprecated + public ClusteringGroupInfo() {} + + private ClusteringGroupInfo(final List operations, final int numOutputGroups) { + this.operations = operations; + this.numOutputGroups = numOutputGroups; + } + + public List getOperations() { + return this.operations; + } + + public void setOperations(final List operations) { + this.operations = operations; + } + + public int getNumOutputGroups() { + return this.numOutputGroups; + } + + public void setNumOutputGroups(final int numOutputGroups) { + this.numOutputGroups = numOutputGroups; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClusteringGroupInfo that = (ClusteringGroupInfo) o; + return Objects.equals(getFilePathsInGroup(), that.getFilePathsInGroup()); + } + + @Override + public int hashCode() { + return Objects.hash(getFilePathsInGroup()); + } + + private String getFilePathsInGroup() { + return getOperations().stream().map(op -> op.getDataFilePath()).collect(Collectors.joining(",")); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/RewriteAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/RewriteAvroPayload.java new file mode 100644 index 0000000000000..d5c19b9116bbc --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/RewriteAvroPayload.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; + +/** + * Default payload used for rewrite use cases where we dont change schema. We dont need to serialize/deserialize avro record in payload. + */ +public class RewriteAvroPayload implements HoodieRecordPayload { + + private GenericRecord record; + + public RewriteAvroPayload(GenericRecord record) { + this.record = record; + } + + @Override + public RewriteAvroPayload preCombine(RewriteAvroPayload another) { + throw new UnsupportedOperationException("precombine is not expected for rewrite payload"); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + return getInsertValue(schema); + } + + @Override + public Option getInsertValue(Schema schema) throws IOException { + return Option.of(record); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index 1453d09a91f7b..76d1cc3d07d24 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -35,16 +35,16 @@ /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ -public class HoodieFileSliceReader implements Iterator> { - private Iterator> recordsIterator; +public class HoodieFileSliceReader implements Iterator> { + private Iterator> recordsIterator; - public static HoodieFileSliceReader getFileSliceReader( + public static HoodieFileSliceReader getFileSliceReader( HoodieFileReader baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, Option> simpleKeyGenFieldsOpt) throws IOException { Iterator baseIterator = baseFileReader.getRecordIterator(schema); while (baseIterator.hasNext()) { GenericRecord record = (GenericRecord) baseIterator.next(); - HoodieRecord hoodieRecord = simpleKeyGenFieldsOpt.isPresent() + HoodieRecord hoodieRecord = simpleKeyGenFieldsOpt.isPresent() ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get()) : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass); scanner.processNextRecord(hoodieRecord); @@ -52,7 +52,7 @@ public static HoodieFil return new HoodieFileSliceReader(scanner.iterator()); } - private HoodieFileSliceReader(Iterator> recordsItr) { + private HoodieFileSliceReader(Iterator> recordsItr) { this.recordsIterator = recordsItr; } @@ -62,7 +62,7 @@ public boolean hasNext() { } @Override - public HoodieRecord next() { + public HoodieRecord next() { return recordsIterator.next(); } }