diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 3ac81510b5a42..3c0a51155c384 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -146,7 +146,7 @@ private Map> getPartitionPathToPendingClusteringFileGroupsId * @return smallFiles not in clustering */ private List filterSmallFilesInClustering(final Set pendingClusteringFileGroupsId, final List smallFiles) { - if (this.config.isClusteringEnabled()) { + if (!pendingClusteringFileGroupsId.isEmpty()) { return smallFiles.stream() .filter(smallFile -> !pendingClusteringFileGroupsId.contains(smallFile.location.getFileId())).collect(Collectors.toList()); } else { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index c495ff60000fa..3be0bb34075ac 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -18,21 +18,26 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.ClusteringTestUtils; import org.apache.hudi.common.testutils.CompactionTestUtils; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieHBaseIndexConfig; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; @@ -350,6 +355,44 @@ public void testUpsertPartitionerWithSmallFileHandlingWithInflightCompactionWith "Should be assigned to only file id not pending compaction which is 2"); } + @Test + public void testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan() throws Exception { + final String testPartitionPath = DEFAULT_PARTITION_PATHS[0]; + + // create HoodieWriteConfig and set inline and async clustering disable here. + HoodieWriteConfig config = makeHoodieClientConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(false).withAsyncClustering(false).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) + .build(); + + // create file slice with instantTime 001 and build clustering plan including this created 001 file slice. + HoodieClusteringPlan clusteringPlan = ClusteringTestUtils.createClusteringPlan(metaClient, "001", "1"); + // create requested replace commit + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build(); + FileCreateUtils.createRequestedReplaceCommit(basePath,"002", Option.of(requestedReplaceMetadata)); + + // create file slice 002 + FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "2", 1); + FileCreateUtils.createCommit(basePath, "002"); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + // generate new data to be ingested + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + List insertRecords = dataGenerator.generateInserts("003", 100); + WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords))); + + HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient); + // create UpsertPartitioner + UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config); + + // for now we have file slice1 and file slice2 and file slice1 is contained in pending clustering plan + // So that only file slice2 can be used for ingestion. + assertEquals(1, partitioner.smallFiles.size(), "Should have 1 small file to be ingested."); + } + @Test public void testUpsertPartitionerWithSmallFileHandlingWithCanIndexLogFiles() throws Exception { // Note this is used because it is same partition path used in CompactionTestUtils.createCompactionPlan() diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/ClusteringTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/ClusteringTestUtils.java new file mode 100644 index 0000000000000..b142fe90b4cfa --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/ClusteringTestUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.exception.HoodieException; + +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; +import static org.apache.hudi.common.testutils.FileCreateUtils.createBaseFile; +import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; + +public class ClusteringTestUtils { + + public static HoodieClusteringPlan createClusteringPlan(HoodieTableMetaClient metaClient, String instantTime, String fileId) { + try { + String basePath = metaClient.getBasePath(); + String partition = DEFAULT_PARTITION_PATHS[0]; + createBaseFile(basePath, partition, instantTime, fileId, 1); + FileSlice slice = new FileSlice(partition, instantTime, fileId); + slice.setBaseFile(new CompactionTestUtils.DummyHoodieBaseFile(Paths.get(basePath, partition, + baseFileName(instantTime, fileId)).toString())); + List[] fileSliceGroups = new List[] {Collections.singletonList(slice)}; + HoodieClusteringPlan clusteringPlan = ClusteringUtils.createClusteringPlan("strategy", new HashMap<>(), + fileSliceGroups, Collections.emptyMap()); + return clusteringPlan; + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); + } + } +}