diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7988e9307522..7d96ec388fbe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -354,6 +354,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)." + " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained."); + public static final ConfigProperty MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT = ConfigProperty + .key("hoodie.merge.small.file.group.candidates.limit") + .defaultValue(1) + .withDocumentation("Limits number of file groups, whose base file satisfies small-file limit, to consider for appending records during upsert operation. " + + "Only applicable to MOR tables"); + public static final ConfigProperty CLIENT_HEARTBEAT_INTERVAL_IN_MS = ConfigProperty .key("hoodie.client.heartbeat.interval_in_ms") .defaultValue(60 * 1000) @@ -1035,6 +1041,10 @@ public boolean allowDuplicateInserts() { return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE); } + public int getSmallFileGroupCandidatesLimit() { + return getInt(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT); + } + public EngineType getEngineType() { return engineType; } @@ -2116,6 +2126,11 @@ public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles) return this; } + public Builder withMergeSmallFileGroupCandidatesLimit(int limit) { + writeConfig.setValue(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT, String.valueOf(limit)); + return this; + } + public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) { writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS, String.valueOf(heartbeatIntervalInMs)); return this; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 35a8bddf94fb..a30fe302b9f5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -288,6 +288,10 @@ protected List getSmallFiles(String partitionPath) { return smallFileLocations; } + public List getBucketInfos() { + return Collections.unmodifiableList(new ArrayList<>(bucketInfoMap.values())); + } + public BucketInfo getBucketInfo(int bucketNumber) { return bucketInfoMap.get(bucketNumber); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java index a2bcbf5d2cc4..8dd3146f5161 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java @@ -26,15 +26,16 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; - import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.table.action.commit.UpsertPartitioner; +import javax.annotation.Nonnull; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -51,68 +52,68 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng @Override protected List getSmallFiles(String partitionPath) { - - // smallFiles only for partitionPath - List smallFileLocations = new ArrayList<>(); - // Init here since this class (and member variables) might not have been initialized HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - // Find out all eligible small file slices - if (!commitTimeline.empty()) { - HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - // find smallest file in partition and append to it - List allSmallFileSlices = new ArrayList<>(); - // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to - // it. Doing this overtime for a partition, we ensure that we handle small file issues - if (!table.getIndex().canIndexLogFiles()) { - // TODO : choose last N small files since there can be multiple small files written to a single partition - // by different spark partitions in a single batch - Option smallFileSlice = Option.fromJavaOptional(table.getSliceView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .filter( - fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config - .getParquetSmallFileLimit()) - .min((FileSlice left, FileSlice right) -> - left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1)); - if (smallFileSlice.isPresent()) { - allSmallFileSlices.add(smallFileSlice.get()); - } + if (commitTimeline.empty()) { + return Collections.emptyList(); + } + + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + + // Find out all eligible small file slices, looking for + // smallest file in the partition to append to + List smallFileSlicesCandidates = getSmallFileCandidates(partitionPath, latestCommitTime); + List smallFileLocations = new ArrayList<>(); + + // Create SmallFiles from the eligible file slices + for (FileSlice smallFileSlice : smallFileSlicesCandidates) { + SmallFile sf = new SmallFile(); + if (smallFileSlice.getBaseFile().isPresent()) { + // TODO : Move logic of file name, file id, base commit time handling inside file slice + String filename = smallFileSlice.getBaseFile().get().getFileName(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); } else { - // If we can index log files, we can add more inserts to log files for fileIds NOT including those under - // pending compaction - List allFileSlices = - table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) - .collect(Collectors.toList()); - for (FileSlice fileSlice : allFileSlices) { - if (isSmallFile(fileSlice)) { - allSmallFileSlices.add(fileSlice); - } - } - } - // Create SmallFiles from the eligible file slices - for (FileSlice smallFileSlice : allSmallFileSlices) { - SmallFile sf = new SmallFile(); - if (smallFileSlice.getBaseFile().isPresent()) { - // TODO : Move logic of file name, file id, base commit time handling inside file slice - String filename = smallFileSlice.getBaseFile().get().getFileName(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); - } else { - HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); - sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), - FSUtils.getFileIdFromLogPath(logFile.getPath())); - sf.sizeBytes = getTotalFileSize(smallFileSlice); - smallFileLocations.add(sf); - } + HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); + sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), + FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.sizeBytes = getTotalFileSize(smallFileSlice); + smallFileLocations.add(sf); } } return smallFileLocations; } + @Nonnull + private List getSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) { + // If we can index log files, we can add more inserts to log files for fileIds NOT including those under + // pending compaction + if (table.getIndex().canIndexLogFiles()) { + return table.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false) + .filter(this::isSmallFile) + .collect(Collectors.toList()); + } + + // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to + // it. Doing this overtime for a partition, we ensure that we handle small file issues + return table.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false) + .filter( + fileSlice -> + // NOTE: We can not pad slices with existing log-files w/o compacting these, + // hence skipping + fileSlice.getLogFiles().count() < 1 + && fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit()) + .sorted(Comparator.comparing(fileSlice -> fileSlice.getBaseFile().get().getFileSize())) + .limit(config.getSmallFileGroupCandidatesLimit()) + .collect(Collectors.toList()); + } + public List getSmallFileIds() { - return (List) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId()) + return smallFiles.stream().map(smallFile -> smallFile.location.getFileId()) .collect(Collectors.toList()); } @@ -132,8 +133,12 @@ private boolean isSmallFile(FileSlice fileSlice) { // TODO (NA) : Make this static part of utility public long convertLogFilesSizeToExpectedParquetSize(List hoodieLogFiles) { - long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize) - .filter(size -> size > 0).reduce(Long::sum).orElse(0L); + long totalSizeOfLogFiles = + hoodieLogFiles.stream() + .map(HoodieLogFile::getFileSize) + .filter(size -> size > 0) + .reduce(Long::sum) + .orElse(0L); // Here we assume that if there is no base parquet file, all log files contain only inserts. // We can then just get the parquet equivalent size of these log files, compare that with // {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 7b5cc27d3728..1e5f8029a714 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -336,7 +337,6 @@ public void testUpsertPartitionerWithSmallFileHandlingWithInflightCompactionWith HoodieCompactionPlan plan = CompactionTestUtils.createCompactionPlan(metaClient, "001", "002", 1, true, false); FileCreateUtils.createRequestedCompactionCommit(basePath, "002", plan); // Simulate one more commit so that inflight compaction is considered when building file groups in file system view - // FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "2", 1); FileCreateUtils.createCommit(basePath, "003"); @@ -434,6 +434,49 @@ public void testUpsertPartitionerWithSmallFileHandlingWithCanIndexLogFiles() thr "Insert should be assigned to fg1"); } + @Test + public void testUpsertPartitionerWithSmallFileHandlingPickingMultipleCandidates() throws Exception { + final String partitionPath = DEFAULT_PARTITION_PATHS[0]; + + HoodieWriteConfig config = + makeHoodieClientConfigBuilder() + .withMergeSmallFileGroupCandidatesLimit(3) + .withStorageConfig( + HoodieStorageConfig.newBuilder() + .parquetMaxFileSize(2048) + .build() + ) + .build(); + + // Bootstrap base files ("small-file targets") + FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-1", 1024); + FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-2", 1024); + FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-3", 1024); + + FileCreateUtils.createCommit(basePath, "002"); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {partitionPath}); + // Default estimated record size will be 1024 based on last file group created. + // Only 1 record can be added to small file + WorkloadProfile profile = + new WorkloadProfile(buildProfile(jsc.parallelize(dataGenerator.generateInserts("003", 3)))); + + HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(this.metaClient); + + HoodieSparkTable table = HoodieSparkTable.create(config, context, reloadedMetaClient); + + SparkUpsertDeltaCommitPartitioner partitioner = new SparkUpsertDeltaCommitPartitioner<>(profile, context, table, config); + + assertEquals(3, partitioner.numPartitions()); + assertEquals( + Arrays.asList( + new BucketInfo(BucketType.UPDATE, "fg-1", partitionPath), + new BucketInfo(BucketType.UPDATE, "fg-2", partitionPath), + new BucketInfo(BucketType.UPDATE, "fg-3", partitionPath) + ), + partitioner.getBucketInfos()); + } + private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() { // Prepare the AvroParquetIO return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString());