Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.util.Option;

import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
import java.util.stream.Stream;
Expand Down Expand Up @@ -71,6 +72,15 @@ public FileSlice(HoodieFileGroupId fileGroupId, String baseInstantTime) {
this.logFiles = new TreeSet<>(HoodieLogFile.getReverseLogFileComparator());
}

public FileSlice(HoodieFileGroupId fileGroupId, String baseInstantTime,
HoodieBaseFile baseFile, List<HoodieLogFile> logFiles) {
this.fileGroupId = fileGroupId;
this.baseInstantTime = baseInstantTime;
this.baseFile = baseFile;
this.logFiles = new TreeSet<>(HoodieLogFile.getReverseLogFileComparator());
this.logFiles.addAll(logFiles);
}

public void setBaseFile(HoodieBaseFile baseFile) {
this.baseFile = baseFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
import org.apache.hudi.common.util.collection.Pair;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
Copy link
Contributor

@danny0405 danny0405 Sep 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the import sequences of all the files. org.apache.hudi package should be in the front.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import org.apache.log4j.Logger;

Expand All @@ -37,6 +41,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -236,6 +241,44 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except
return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
}

/**
* parse the bytes of deltacommit, and get the base file and the log files belonging to this
* provided file group.
*/
// TODO: refactor this method to avoid doing the json tree walking (HUDI-4822).
public static Option<Pair<String, List<String>>> getFileSliceForFileGroupFromDeltaCommit(
byte[] bytes, HoodieFileGroupId fileGroupId) {
String jsonStr = new String(bytes, StandardCharsets.UTF_8);
if (jsonStr.isEmpty()) {
return Option.empty();
}

try {
JsonNode ptToWriteStatsMap = JsonUtils.getObjectMapper().readTree(jsonStr).get("partitionToWriteStats");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why do we even need this method? We have the utils to deser the HoodieCommitMetadata, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have any usable method to get the log files and base file from the commit metadata.
so discussed with @xushiyan @prasannarajaperumal, leave a ticket to solve this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron we might not have the method to get all the log files but we shouldn't be producing new methods for deserializing HoodieCommitMetadata if we already have the methods to do so. Let's follow-up on this one.

Iterator<Map.Entry<String, JsonNode>> pts = ptToWriteStatsMap.fields();
while (pts.hasNext()) {
Map.Entry<String, JsonNode> ptToWriteStats = pts.next();
if (ptToWriteStats.getValue().isArray()) {
for (JsonNode writeStat : ptToWriteStats.getValue()) {
HoodieFileGroupId fgId = new HoodieFileGroupId(ptToWriteStats.getKey(), writeStat.get("fileId").asText());
if (fgId.equals(fileGroupId)) {
String baseFile = writeStat.get("baseFile").asText();
ArrayNode logFilesNode = (ArrayNode) writeStat.get("logFiles");
List<String> logFiles = new ArrayList<>();
for (JsonNode logFile : logFilesNode) {
logFiles.add(logFile.asText());
}
return Option.of(Pair.of(baseFile, logFiles));
}
}
}
}
return Option.empty();
} catch (Exception e) {
throw new HoodieException("Fail to parse the base file and log files from DeltaCommit", e);
}
}

// Here the functions are named "fetch" instead of "get", to get avoid of the json conversion.
public long fetchTotalPartitionsWritten() {
return partitionToWriteStats.size();
Expand Down
Loading