diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index ae805ca016e89..91acd3075377a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -73,9 +73,17 @@ public class HoodieClusteringConfig extends DefaultHoodieConfig { public static final String CLUSTERING_TARGET_FILE_MAX_BYTES = CLUSTERING_STRATEGY_PARAM_PREFIX + "target.file.max.bytes"; public static final String DEFAULT_CLUSTERING_TARGET_FILE_MAX_BYTES = String.valueOf(1 * 1024 * 1024 * 1024L); // 1GB - // constants related to clustering that may be used by more than 1 strategy. + // Constants related to clustering that may be used by more than 1 strategy. public static final String CLUSTERING_SORT_COLUMNS_PROPERTY = HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns"; + // When file groups is in clustering, need to handle the update to these file groups. Default strategy just reject the update + public static final String CLUSTERING_UPDATES_STRATEGY_PROP = "hoodie.clustering.updates.strategy"; + public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY = "org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy"; + + // Async clustering + public static final String ASYNC_CLUSTERING_ENABLE_OPT_KEY = "hoodie.clustering.async.enabled"; + public static final String DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL = "false"; + public HoodieClusteringConfig(Properties props) { super(props); } @@ -135,8 +143,8 @@ public Builder withClusteringTargetFileMaxBytes(long targetFileSize) { return this; } - public Builder withInlineClustering(Boolean inlineCompaction) { - props.setProperty(INLINE_CLUSTERING_PROP, String.valueOf(inlineCompaction)); + public Builder withInlineClustering(Boolean inlineClustering) { + props.setProperty(INLINE_CLUSTERING_PROP, String.valueOf(inlineClustering)); return this; } @@ -150,8 +158,19 @@ public Builder fromProperties(Properties props) { return this; } + public Builder withClusteringUpdatesStrategy(String updatesStrategyClass) { + props.setProperty(CLUSTERING_UPDATES_STRATEGY_PROP, updatesStrategyClass); + return this; + } + + public Builder withAsyncClustering(Boolean asyncClustering) { + props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY, String.valueOf(asyncClustering)); + return this; + } + public HoodieClusteringConfig build() { HoodieClusteringConfig config = new HoodieClusteringConfig(props); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_PLAN_STRATEGY_CLASS), CLUSTERING_PLAN_STRATEGY_CLASS, DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS); setDefaultOnCondition(props, !props.containsKey(CLUSTERING_EXECUTION_STRATEGY_CLASS), @@ -170,6 +189,10 @@ public HoodieClusteringConfig build() { DEFAULT_CLUSTERING_TARGET_PARTITIONS); setDefaultOnCondition(props, !props.containsKey(CLUSTERING_PLAN_SMALL_FILE_LIMIT), CLUSTERING_PLAN_SMALL_FILE_LIMIT, DEFAULT_CLUSTERING_PLAN_SMALL_FILE_LIMIT); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_UPDATES_STRATEGY_PROP), CLUSTERING_UPDATES_STRATEGY_PROP, + DEFAULT_CLUSTERING_UPDATES_STRATEGY); + setDefaultOnCondition(props, !props.containsKey(ASYNC_CLUSTERING_ENABLE_OPT_KEY), ASYNC_CLUSTERING_ENABLE_OPT_KEY, + DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL); return config; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index bf9e203627e6b..ff43965235068 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -50,6 +50,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; + /** * Class storing configs for the HoodieWriteClient. */ @@ -394,6 +395,15 @@ public boolean isInlineClustering() { return Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_PROP)); } + public boolean isAsyncClusteringEnabled() { + return Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY)); + } + + public boolean isClusteringEnabled() { + // TODO: future support async clustering + return isInlineClustering() || isAsyncClusteringEnabled(); + } + public int getInlineClusterMaxCommits() { return Integer.parseInt(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP)); } @@ -414,6 +424,10 @@ public Boolean shouldCleanBootstrapBaseFile() { return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED)); } + public String getClusteringUpdatesStrategyClass() { + return props.getProperty(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP); + } + /** * Clustering properties. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java new file mode 100644 index 0000000000000..68b62a5421706 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class HoodieClusteringUpdateException extends HoodieException { + public HoodieClusteringUpdateException(String msg) { + super(msg); + } + + public HoodieClusteringUpdateException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java new file mode 100644 index 0000000000000..667a58b355781 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecordPayload; + +import java.util.Set; + +/** + * When file groups in clustering, write records to these file group need to check. + */ +public abstract class UpdateStrategy, I> { + + protected final HoodieEngineContext engineContext; + protected Set fileGroupsInPendingClustering; + + protected UpdateStrategy(HoodieEngineContext engineContext, Set fileGroupsInPendingClustering) { + this.engineContext = engineContext; + this.fileGroupsInPendingClustering = fileGroupsInPendingClustering; + } + + /** + * Check the update records to the file group in clustering. + * @param taggedRecordsRDD the records to write, tagged with target file id, + * future can update tagged records location to a different fileId. + * @return the recordsRDD strategy updated + */ + public abstract I handleUpdate(I taggedRecordsRDD); + +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java new file mode 100644 index 0000000000000..134e490246680 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.update.strategy; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.exception.HoodieClusteringUpdateException; +import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.HashSet; +import java.util.List; + +/** + * Update strategy based on following. + * if some file group have update record, throw exception + */ +public class SparkRejectUpdateStrategy> extends UpdateStrategy>> { + private static final Logger LOG = LogManager.getLogger(SparkRejectUpdateStrategy.class); + + public SparkRejectUpdateStrategy(HoodieSparkEngineContext engineContext, HashSet fileGroupsInPendingClustering) { + super(engineContext, fileGroupsInPendingClustering); + } + + private List getGroupIdsWithUpdate(JavaRDD> inputRecords) { + List fileGroupIdsWithUpdates = inputRecords + .filter(record -> record.getCurrentLocation() != null) + .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect(); + return fileGroupIdsWithUpdates; + } + + @Override + public JavaRDD> handleUpdate(JavaRDD> taggedRecordsRDD) { + List fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD); + fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> { + if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) { + String msg = String.format("Not allowed to update the clustering file group %s. " + + "For pending clustering operations, we are not going to support update for now.", + fileGroupIdWithRecordUpdate.toString()); + LOG.error(msg); + throw new HoodieClusteringUpdateException(msg); + } + }); + return taggedRecordsRDD; + } + +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 73be8d4127413..1fd5dad688553 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -28,11 +28,13 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; @@ -46,6 +48,7 @@ import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -59,11 +62,13 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.stream.Collectors; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.Map; public abstract class BaseSparkCommitActionExecutor extends @@ -88,6 +93,18 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext context, super(context, config, table, instantTime, operationType, extraMetadata); } + private JavaRDD> clusteringHandleUpdate(JavaRDD> inputRecordsRDD) { + if (config.isClusteringEnabled()) { + Set fileGroupsInPendingClustering = + table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet()); + UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils + .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); + return (JavaRDD>)updateStrategy.handleUpdate(inputRecordsRDD); + } else { + return inputRecordsRDD; + } + } + @Override public HoodieWriteMetadata> execute(JavaRDD> inputRecordsRDD) { HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); @@ -107,9 +124,12 @@ public HoodieWriteMetadata> execute(JavaRDD saveWorkloadProfileMetadataToInflight(profile, instantTime); } + // handle records update with clustering + JavaRDD> inputRecordsRDDWithClusteringUpdate = clusteringHandleUpdate(inputRecordsRDD); + // partition using the insert partitioner final Partitioner partitioner = getPartitioner(profile); - JavaRDD> partitionedRecords = partition(inputRecordsRDD, partitioner); + JavaRDD> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner); JavaRDD writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> { if (WriteOperationType.isChangingRecords(operationType)) { return handleUpsertPartition(instantTime, partition, recordItr, partitioner); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index b28c89a536469..a84e9127e2dcd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -129,6 +129,34 @@ private int addUpdateBucket(String partitionPath, String fileIdHint) { return bucket; } + /** + * Get the in pending clustering fileId for each partition path. + * @return partition path to pending clustering file groups id + */ + private Map> getPartitionPathToPendingClusteringFileGroupsId() { + Map> partitionPathToInPendingClusteringFileId = + table.getFileSystemView().getFileGroupsInPendingClustering() + .map(fileGroupIdAndInstantPair -> + Pair.of(fileGroupIdAndInstantPair.getKey().getPartitionPath(), fileGroupIdAndInstantPair.getKey().getFileId())) + .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet()))); + return partitionPathToInPendingClusteringFileId; + } + + /** + * Exclude small file handling for clustering since update path is not supported. + * @param pendingClusteringFileGroupsId pending clustering file groups id of partition + * @param smallFiles small files of partition + * @return smallFiles not in clustering + */ + private List filterSmallFilesInClustering(final Set pendingClusteringFileGroupsId, final List smallFiles) { + if (this.config.isClusteringEnabled()) { + return smallFiles.stream() + .filter(smallFile -> !pendingClusteringFileGroupsId.contains(smallFile.location.getFileId())).collect(Collectors.toList()); + } else { + return smallFiles; + } + } + private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) { // for new inserts, compute buckets depending on how many records we have for each partition Set partitionPaths = profile.getPartitionPaths(); @@ -140,11 +168,16 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) Map> partitionSmallFilesMap = getSmallFilesForPartitions(new ArrayList(partitionPaths), context); + Map> partitionPathToPendingClusteringFileGroupsId = getPartitionPathToPendingClusteringFileGroupsId(); + for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { - List smallFiles = partitionSmallFilesMap.get(partitionPath); + List smallFiles = + filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath, Collections.emptySet()), + partitionSmallFilesMap.get(partitionPath)); + this.smallFiles.addAll(smallFiles); LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index d9a396d0f64b5..c201efd709a31 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -21,6 +21,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -30,15 +32,19 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; @@ -52,10 +58,12 @@ import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -78,6 +86,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -86,6 +95,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.Properties; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0; import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths; @@ -97,6 +107,8 @@ import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys; import static org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet; import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet; +import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY; +import static org.apache.hudi.config.HoodieClusteringConfig.DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -110,6 +122,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { private static final Logger LOG = LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class); + private static final Map STRATEGY_PARAMS = new HashMap() { + { + put("sortColumn", "record_key"); + } + }; + private HoodieTestTable testTable; @BeforeEach @@ -681,6 +699,70 @@ private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set, List> insertBatchRecords(SparkRDDWriteClient client, String commitTime, + Integer recordNum, int expectStatueSize) { + client.startCommitWithTime(commitTime); + List inserts1 = dataGen.generateInserts(commitTime, recordNum); + JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); + List statuses = client.upsert(insertRecordsRDD1, commitTime).collect(); + assertNoWriteErrors(statuses); + assertEquals(expectStatueSize, statuses.size(), "check expect statue size."); + return Pair.of(statuses, inserts1); + } + + @Test + public void testUpdateRejectForClustering() throws IOException { + final String testPartitionPath = "2016/09/26"; + dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + Properties props = new Properties(); + props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY, "true"); + HoodieWriteConfig config = getSmallInsertWriteConfig(100, + TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), props); + SparkRDDWriteClient client = getHoodieWriteClient(config, false); + HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); + + //1. insert to generate 2 file group + String commitTime1 = "001"; + Pair, List> upsertResult = insertBatchRecords(client, commitTime1, 600, 2); + List inserts1 = upsertResult.getValue(); + List fileGroupIds1 = table.getFileSystemView().getAllFileGroups(testPartitionPath) + .map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList()); + assertEquals(2, fileGroupIds1.size()); + + // 2. generate clustering plan for fileGroupIds1 file groups + String commitTime2 = "002"; + List> firstInsertFileSlicesList = table.getFileSystemView().getAllFileGroups(testPartitionPath) + .map(fileGroup -> fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList()); + List[] fileSlices = (List[])firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]); + createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices); + + // 3. insert one record with no updating reject exception, and not merge the small file, just generate a new file group + String commitTime3 = "003"; + insertBatchRecords(client, commitTime3, 1, 1).getKey(); + List fileGroupIds2 = table.getFileSystemView().getAllFileGroups(testPartitionPath) + .map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList()); + assertEquals(3, fileGroupIds2.size()); + + // 4. update one record for the clustering two file groups, throw reject update exception + String commitTime4 = "004"; + client.startCommitWithTime(commitTime4); + List insertsAndUpdates3 = new ArrayList<>(); + insertsAndUpdates3.addAll(dataGen.generateUpdates(commitTime4, inserts1)); + String assertMsg = String.format("Not allowed to update the clustering files in partition: %s " + + "For pending clustering operations, we are not going to support update for now.", testPartitionPath); + assertThrows(HoodieUpsertException.class, () -> { + writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1), commitTime3).collect(); }, assertMsg); + + // 5. insert one record with no updating reject exception, will merge the small file + String commitTime5 = "005"; + List statuses = insertBatchRecords(client, commitTime5, 1, 1).getKey(); + fileGroupIds2.removeAll(fileGroupIds1); + assertEquals(fileGroupIds2.get(0), statuses.get(0).getFileId()); + List firstInsertFileGroupIds4 = table.getFileSystemView().getAllFileGroups(testPartitionPath) + .map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList()); + assertEquals(3, firstInsertFileGroupIds4.size()); + } + /** * Test scenario of new file-group getting added during upsert(). */ @@ -1467,8 +1549,12 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA; return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize); } - + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) { + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize, new Properties()); + } + + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, Properties props) { HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr); return builder .withCompactionConfig( @@ -1479,6 +1565,19 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String HoodieStorageConfig.newBuilder() .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)) .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) + .withProps(props) .build(); } + + protected HoodieInstant createRequestedReplaceInstant(HoodieTableMetaClient metaClient, String clusterTime, List[] fileSlices) throws IOException { + HoodieClusteringPlan clusteringPlan = + ClusteringUtils.createClusteringPlan(DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS, STRATEGY_PARAMS, fileSlices, Collections.emptyMap()); + + HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime); + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build(); + metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)); + return clusteringInstant; + } + }