diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateFixedFileHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateFixedFileHandleFactory.java new file mode 100644 index 0000000000000..cb16c7162bd1b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/CreateFixedFileHandleFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A HoodieCreateHandleFactory is used to write all data in the spark partition into a single data file. + * + * Please use this with caution. This can end up creating very large files if not used correctly. + */ +public class CreateFixedFileHandleFactory extends WriteHandleFactory { + + private AtomicBoolean isHandleCreated = new AtomicBoolean(false); + private String fileId; + + public CreateFixedFileHandleFactory(String fileId) { + super(); + this.fileId = fileId; + } + + @Override + public HoodieWriteHandle create(final HoodieWriteConfig hoodieConfig, final String commitTime, + final HoodieTable hoodieTable, final String partitionPath, + final String fileIdPrefix, TaskContextSupplier taskContextSupplier) { + + if (isHandleCreated.compareAndSet(false, true)) { + return new HoodieCreateFixedHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, + fileId, // ignore idPfx, always use same fileId + taskContextSupplier); + } + + throw new HoodieIOException("Fixed handle create is only expected to be invoked once"); + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateFixedHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateFixedHandle.java new file mode 100644 index 0000000000000..5ec4d26129806 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateFixedHandle.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.avro.Schema; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Map; + +/** + * A HoodieCreateHandle which writes all data into a single file. + * + * Please use this with caution. This can end up creating very large files if not used correctly. + */ +public class HoodieCreateFixedHandle extends HoodieCreateHandle { + + private static final Logger LOG = LogManager.getLogger(HoodieCreateFixedHandle.class); + + public HoodieCreateFixedHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config), + taskContextSupplier); + } + + public HoodieCreateFixedHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, Pair writerSchemaIncludingAndExcludingMetadataPair, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, writerSchemaIncludingAndExcludingMetadataPair, + taskContextSupplier); + } + + /** + * Called by the compactor code path. + */ + public HoodieCreateFixedHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, Map> recordMap, + TaskContextSupplier taskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier); + } + + @Override + public boolean canWrite(HoodieRecord record) { + return true; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java index dce6eeac3bd0b..cbecd2ea46411 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -27,6 +28,7 @@ import org.apache.log4j.Logger; import java.io.Serializable; +import java.util.List; import java.util.Map; /** @@ -51,7 +53,7 @@ public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engine * Note that commit is not done as part of strategy. commit is callers responsibility. */ public abstract O performClustering(final I inputRecords, final int numOutputGroups, final String instantTime, - final Map strategyParams, final Schema schema); + final Map strategyParams, final Schema schema, final List inputFileIds); protected HoodieTable getHoodieTable() { return this.hoodieTable; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index 07f9bc14f05d6..11612872c60d3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -38,6 +39,7 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -66,7 +68,7 @@ public SparkSortAndSizeExecutionStrategy(HoodieSparkMergeOnReadTable table, @Override public JavaRDD performClustering(final JavaRDD> inputRecords, final int numOutputGroups, - final String instantTime, final Map strategyParams, final Schema schema) { + final String instantTime, final Map strategyParams, final Schema schema, final List inputFileIds) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); Properties props = getWriteConfig().getProps(); props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM, String.valueOf(numOutputGroups)); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index bf9f90a0c948f..b18f9ad4d2a3d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -57,8 +57,20 @@ public SparkLazyInsertIterable(Iterator> recordItr, String idPrefix, TaskContextSupplier taskContextSupplier, WriteHandleFactory writeHandleFactory) { + this(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory, false); + } + + public SparkLazyInsertIterable(Iterator> recordItr, + boolean areRecordsSorted, + HoodieWriteConfig config, + String instantTime, + HoodieTable hoodieTable, + String idPrefix, + TaskContextSupplier taskContextSupplier, + WriteHandleFactory writeHandleFactory, + boolean useWriterSchema) { super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory); - this.useWriterSchema = false; + this.useWriterSchema = useWriterSchema; } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index 9f6e39afc19ea..4cfe02829db8e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -61,9 +62,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -98,7 +101,7 @@ public HoodieWriteMetadata> execute() { JavaRDD[] writeStatuses = convertStreamToArray(writeStatusRDDStream); JavaRDD writeStatusRDD = engineContext.union(writeStatuses); - HoodieWriteMetadata> writeMetadata = buildWriteMetadata(writeStatusRDD); + HoodieWriteMetadata> writeMetadata = new HoodieWriteMetadata<>(); JavaRDD statuses = updateIndex(writeStatusRDD, writeMetadata); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect()); // validate clustering action before committing result @@ -148,9 +151,12 @@ private CompletableFuture> runClusteringForGroupAsync(Hoodi JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); JavaRDD> inputRecords = readRecordsForGroup(jsc, clusteringGroup); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + List inputFileIds = clusteringGroup.getSlices().stream() + .map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())) + .collect(Collectors.toList()); return ((ClusteringExecutionStrategy>, JavaRDD, JavaRDD>) ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), table, context, config)) - .performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema); + .performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds); }); return writeStatusesFuture; @@ -163,8 +169,10 @@ protected String getCommitActionType() { @Override protected Map> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { - return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).collect( - Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); + Set newFilesWritten = new HashSet(writeStatuses.map(s -> s.getFileId()).collect()); + return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan) + .filter(fg -> !newFilesWritten.contains(fg)) + .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); } /** @@ -257,12 +265,4 @@ private HoodieRecord transform(IndexedRecord inde return hoodieRecord; } - private HoodieWriteMetadata> buildWriteMetadata(JavaRDD writeStatusJavaRDD) { - HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); - result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD)); - result.setWriteStatuses(writeStatusJavaRDD); - result.setCommitMetadata(Option.empty()); - result.setCommitted(false); - return result; - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/ClusteringIdentityTestExecutionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/ClusteringIdentityTestExecutionStrategy.java new file mode 100644 index 0000000000000..5264289f35137 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/ClusteringIdentityTestExecutionStrategy.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import org.apache.avro.Schema; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.execution.SparkLazyInsertIterable; +import org.apache.hudi.io.CreateFixedFileHandleFactory; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkMergeOnReadTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Sample clustering strategy for testing. This actually doesnt transform data, but simply rewrites the same data + * in a new file. + */ +public class ClusteringIdentityTestExecutionStrategy> + extends ClusteringExecutionStrategy>, JavaRDD, JavaRDD> { + + private static final Logger LOG = LogManager.getLogger(ClusteringIdentityTestExecutionStrategy.class); + + public ClusteringIdentityTestExecutionStrategy(HoodieSparkCopyOnWriteTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public ClusteringIdentityTestExecutionStrategy(HoodieSparkMergeOnReadTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + public JavaRDD performClustering( + final JavaRDD> inputRecords, + final int numOutputGroups, + final String instantTime, + final Map strategyParams, + final Schema schema, + final List inputFileIds) { + if (inputRecords.getNumPartitions() != 1 || inputFileIds.size() != 1) { + throw new HoodieClusteringException("Expect only one partition for test strategy: " + getClass().getName()); + } + + Properties props = getWriteConfig().getProps(); + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); + TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier(); + final HoodieTable hoodieTable = getHoodieTable(); + final String schemaString = schema.toString(); + + JavaRDD writeStatus = inputRecords.mapPartitions(recordItr -> + insertRecords(recordItr, newConfig, instantTime, hoodieTable, schemaString, inputFileIds.get(0).getFileId(), taskContextSupplier)) + .flatMap(List::iterator); + return writeStatus; + } + + private static > Iterator> insertRecords(final Iterator> recordItr, + final HoodieWriteConfig newConfig, + final String instantTime, + final HoodieTable hoodieTable, + final String schema, + final String fileId, + final TaskContextSupplier taskContextSupplier) { + + return new SparkLazyInsertIterable(recordItr, true, newConfig, instantTime, hoodieTable, + fileId, taskContextSupplier, new CreateFixedFileHandleFactory(fileId), true); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index bee5c82c81821..44ddf2c039e1b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -1114,6 +1114,16 @@ public void testClusteringWithSortColumns() throws Exception { .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); testClustering(clusteringConfig); } + + @Test + public void testClusteringWithOneFilePerGroup() throws Exception { + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringMaxBytesInGroup(10) // set small number so each file is considered as separate clustering group + .withClusteringExecutionStrategyClass("org.apache.hudi.ClusteringIdentityTestExecutionStrategy") + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + + testClustering(clusteringConfig, true); + } @Test public void testPendingClusteringRollback() throws Exception { @@ -1145,7 +1155,7 @@ public void testPendingClusteringRollback() throws Exception { } private List testClustering(HoodieClusteringConfig clusteringConfig) throws Exception { - return testClustering(clusteringConfig, false); + return testClustering(clusteringConfig, true); } private List testClustering(HoodieClusteringConfig clusteringConfig, boolean completeClustering) throws Exception { @@ -1167,7 +1177,7 @@ private List testClustering(HoodieClusteringConfig clusteringConfi fileIdIntersection.retainAll(fileIds2); assertEquals(0, fileIdIntersection.size()); - config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(completeClustering) + config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false) .withClusteringConfig(clusteringConfig).build(); // create client with new config.