Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

package org.apache.hudi.sink.bucket;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.sink.StreamWriteFunction;

Expand All @@ -32,11 +36,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Arrays;
import java.util.function.Predicate;

/**
* A stream write function with bucket hash index.
Expand Down Expand Up @@ -68,6 +75,12 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
*/
private Map<String, Map<Integer, String>> bucketIndex;

/**
* BucketID to file group mapping in each partition loaded from fileSystem.
* Map(partition -> Map(bucketId, fileID)).
*/
private Map<String, Map<Integer, String>> fsBucketIndex;

/**
* Incremental bucket index of the current checkpoint interval,
* it is needed because the bucket type('I' or 'U') should be decided based on the committed files view,
Expand Down Expand Up @@ -100,6 +113,7 @@ public void open(Configuration parameters) throws IOException {
this.bucketIndex = new HashMap<>();
this.incBucketIndex = new HashSet<>();
this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent();
this.fsBucketIndex = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -130,7 +144,11 @@ public void processElement(I i, ProcessFunction<I, Object>.Context context, Coll
} else if (bucketToFileId.containsKey(bucketNum)) {
location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
} else {
Map<Integer, String> fsBucketToFileId = fsBucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
if (fsBucketToFileId.containsKey(bucketNum)) {
newFileId = fsBucketToFileId.get(bucketNum);
}
location = new HoodieRecordLocation("I", newFileId);
bucketToFileId.put(bucketNum, newFileId);
incBucketIndex.add(bucketId);
Expand Down Expand Up @@ -161,7 +179,7 @@ private Set<Integer> getBucketToLoad() {
* Get partition_bucket -> fileID mapping from the existing hudi table.
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
*/
private void bootstrapIndexIfNeed(String partition) {
private void bootstrapIndexIfNeed(String partition) throws IOException {
if (isEmptyTable || bucketIndex.containsKey(partition)) {
return;
}
Expand All @@ -185,5 +203,30 @@ private void bootstrapIndexIfNeed(String partition) {
}
});
bucketIndex.put(partition, bucketToFileIDMap);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we only need to fix line 197 using:

getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, boolean includeFileSlicesInPendingCompaction)

with includeFileSlicesInPendingCompaction as true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was Empty After test getLatestFileSlicesBeforeOrOn , and I also test

this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(group -> {
            String fileID = group.getFileGroupId().getFileId();

that works. What do you think

// no need to load from file System
boolean noNeedLoadFiles = bucketToFileIDMap.size() == bucketToLoad.size();
if (noNeedLoadFiles || OptionsResolver.isCowTable(config)) {
return;
}
// reuse unCommitted log file id
try {
Map<Integer, String> partitionFsBucketToFileIDMap = new HashMap<>();
Predicate<FileStatus> rtFilePredicate = fileStatus -> fileStatus.getPath().getName()
.contains(metaClient.getTableConfig().getLogFileFormat().getFileExtension());
Arrays.stream(this.metaClient.getFs()
.listStatus(new Path(this.metaClient.getBasePath() + "/" + partition)))
.filter(rtFilePredicate)
.map(HoodieLogFile::new)
.forEach(s -> {
int bucketNumber = BucketIdentifier.bucketIdFromFileId(s.getFileId());
if (bucketToLoad.contains(bucketNumber)) {
partitionFsBucketToFileIDMap.put(bucketNumber, s.getFileId());
}
});
fsBucketIndex.put(partition, partitionFsBucketToFileIDMap);
} catch (FileNotFoundException fileNotFoundException) {
LOG.warn("May be table was not initialed");
}
}
}