Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)."
+ " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained.");

public static final ConfigProperty<Integer> MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT = ConfigProperty
.key("hoodie.merge.small.file.group.candidates.limit")
.defaultValue(1)
.withDocumentation("Limits number of file groups, whose base file satisfies small-file limit, to consider for appending records during upsert operation. "
+ "Only applicable to MOR tables");

public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS = ConfigProperty
.key("hoodie.client.heartbeat.interval_in_ms")
.defaultValue(60 * 1000)
Expand Down Expand Up @@ -1035,6 +1041,10 @@ public boolean allowDuplicateInserts() {
return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE);
}

public int getSmallFileGroupCandidatesLimit() {
return getInt(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT);
}

public EngineType getEngineType() {
return engineType;
}
Expand Down Expand Up @@ -2116,6 +2126,11 @@ public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles)
return this;
}

public Builder withMergeSmallFileGroupCandidatesLimit(int limit) {
writeConfig.setValue(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT, String.valueOf(limit));
return this;
}

public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS, String.valueOf(heartbeatIntervalInMs));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {
return smallFileLocations;
}

public List<BucketInfo> getBucketInfos() {
return Collections.unmodifiableList(new ArrayList<>(bucketInfoMap.values()));
}

public BucketInfo getBucketInfo(int bucketNumber) {
return bucketInfoMap.get(bucketNumber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;

import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.table.action.commit.UpsertPartitioner;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -51,68 +52,68 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng

@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {

// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();

// Init here since this class (and member variables) might not have been initialized
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();

// Find out all eligible small file slices
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// find smallest file in partition and append to it
List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
if (!table.getIndex().canIndexLogFiles()) {
// TODO : choose last N small files since there can be multiple small files written to a single partition
// by different spark partitions in a single batch
Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
.filter(
fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
.getParquetSmallFileLimit())
.min((FileSlice left, FileSlice right) ->
left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
if (smallFileSlice.isPresent()) {
allSmallFileSlices.add(smallFileSlice.get());
}
if (commitTimeline.empty()) {
return Collections.emptyList();
}

HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();

// Find out all eligible small file slices, looking for
// smallest file in the partition to append to
List<FileSlice> smallFileSlicesCandidates = getSmallFileCandidates(partitionPath, latestCommitTime);
List<SmallFile> smallFileLocations = new ArrayList<>();

// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getBaseFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
// If we can index log files, we can add more inserts to log files for fileIds NOT including those under
// pending compaction
List<FileSlice> allFileSlices =
table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
allSmallFileSlices.add(fileSlice);
}
}
}
// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : allSmallFileSlices) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getBaseFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
}
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
}
}
return smallFileLocations;
}

@Nonnull
private List<FileSlice> getSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) {
// If we can index log files, we can add more inserts to log files for fileIds NOT including those under
// pending compaction
if (table.getIndex().canIndexLogFiles()) {
return table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
.filter(this::isSmallFile)
.collect(Collectors.toList());
}

// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
return table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
.filter(
fileSlice ->
// NOTE: We can not pad slices with existing log-files w/o compacting these,
// hence skipping
fileSlice.getLogFiles().count() < 1
&& fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit())
.sorted(Comparator.comparing(fileSlice -> fileSlice.getBaseFile().get().getFileSize()))
.limit(config.getSmallFileGroupCandidatesLimit())
.collect(Collectors.toList());
}

public List<String> getSmallFileIds() {
return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
return smallFiles.stream().map(smallFile -> smallFile.location.getFileId())
.collect(Collectors.toList());
}

Expand All @@ -132,8 +133,12 @@ private boolean isSmallFile(FileSlice fileSlice) {

// TODO (NA) : Make this static part of utility
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
.filter(size -> size > 0).reduce(Long::sum).orElse(0L);
long totalSizeOfLogFiles =
hoodieLogFiles.stream()
.map(HoodieLogFile::getFileSize)
.filter(size -> size > 0)
.reduce(Long::sum)
.orElse(0L);
// Here we assume that if there is no base parquet file, all log files contain only inserts.
// We can then just get the parquet equivalent size of these log files, compare that with
// {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -336,7 +337,6 @@ public void testUpsertPartitionerWithSmallFileHandlingWithInflightCompactionWith
HoodieCompactionPlan plan = CompactionTestUtils.createCompactionPlan(metaClient, "001", "002", 1, true, false);
FileCreateUtils.createRequestedCompactionCommit(basePath, "002", plan);
// Simulate one more commit so that inflight compaction is considered when building file groups in file system view
//
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "2", 1);
FileCreateUtils.createCommit(basePath, "003");

Expand Down Expand Up @@ -434,6 +434,49 @@ public void testUpsertPartitionerWithSmallFileHandlingWithCanIndexLogFiles() thr
"Insert should be assigned to fg1");
}

@Test
public void testUpsertPartitionerWithSmallFileHandlingPickingMultipleCandidates() throws Exception {
final String partitionPath = DEFAULT_PARTITION_PATHS[0];

HoodieWriteConfig config =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will land this for now. But when you get a chance, can you make this test configurable w/ diff values for the config that we just added. Just to make sure the config is honored for diff values. just 2 diff values would suffice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin let's keep the code styling more consistent with whats there already?

HoodieWriteConfig config = makeHoodieClientConfigBuilder()

instead of

HoodieWriteConfig config =
                makeHoodieClientConfigBuilder

makeHoodieClientConfigBuilder()
.withMergeSmallFileGroupCandidatesLimit(3)
.withStorageConfig(
HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(2048)
.build()
)
.build();

// Bootstrap base files ("small-file targets")
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-1", 1024);
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-2", 1024);
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-3", 1024);

FileCreateUtils.createCommit(basePath, "002");

HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {partitionPath});
// Default estimated record size will be 1024 based on last file group created.
// Only 1 record can be added to small file
WorkloadProfile profile =
new WorkloadProfile(buildProfile(jsc.parallelize(dataGenerator.generateInserts("003", 3))));

HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(this.metaClient);

HoodieSparkTable<?> table = HoodieSparkTable.create(config, context, reloadedMetaClient);

SparkUpsertDeltaCommitPartitioner<?> partitioner = new SparkUpsertDeltaCommitPartitioner<>(profile, context, table, config);

assertEquals(3, partitioner.numPartitions());
assertEquals(
Arrays.asList(
new BucketInfo(BucketType.UPDATE, "fg-1", partitionPath),
new BucketInfo(BucketType.UPDATE, "fg-2", partitionPath),
new BucketInfo(BucketType.UPDATE, "fg-3", partitionPath)
),
partitioner.getBucketInfos());
}

private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
// Prepare the AvroParquetIO
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString());
Expand Down