Skip to content

Commit 7129974

Browse files
committed
address comments
1 parent 452be51 commit 7129974

File tree

2 files changed

+17
-18
lines changed

2 files changed

+17
-18
lines changed

hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -472,40 +472,39 @@ private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFi
472472
}
473473

474474
/**
475-
* List affected file status based on given commits.
475+
* Iterate through a list of commits in ascending order, and extract the file status of
476+
* all affected files from the commits metadata grouping by partition path. If the files has
477+
* been touched multiple times in the given commits, the return value will keep the one
478+
* from the latest commit.
476479
* @param basePath
477480
* @param commitsToCheck
478481
* @param timeline
479482
* @return HashMap<partitionPath, HashMap<fileName, FileStatus>>
480483
* @throws IOException
481484
*/
482-
public static HashMap<String, HashMap<String, FileStatus>> listStatusForAffectedPartitions(
485+
public static HashMap<String, HashMap<String, FileStatus>> listAffectedFilesForCommits(
483486
Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException {
484-
// Extract files touched by these commits.
485-
// TODO This might need to be done in parallel like listStatus parallelism ?
487+
// TODO: Use HoodieMetaTable to extract affected file directly.
486488
HashMap<String, HashMap<String, FileStatus>> partitionToFileStatusesMap = new HashMap<>();
487-
for (HoodieInstant commit: commitsToCheck) {
489+
List<HoodieInstant> sortedCommitsToCheck = new ArrayList<>(commitsToCheck);
490+
sortedCommitsToCheck.sort(HoodieInstant::compareTo);
491+
// Iterate through the given commits.
492+
for (HoodieInstant commit: sortedCommitsToCheck) {
488493
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
489494
HoodieCommitMetadata.class);
495+
// Iterate through all the affected partitions of a commit.
490496
for (Map.Entry<String, List<HoodieWriteStat>> entry: commitMetadata.getPartitionToWriteStats().entrySet()) {
491497
if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
492498
partitionToFileStatusesMap.put(entry.getKey(), new HashMap<>());
493499
}
500+
// Iterate through all the written files of this partition.
494501
for (HoodieWriteStat stat : entry.getValue()) {
495502
String relativeFilePath = stat.getPath();
496503
Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
497504
if (fullPath != null) {
498-
if (partitionToFileStatusesMap.get(entry.getKey()).containsKey(fullPath.getName())) {
499-
// If filesystem support Append. Update the FileStatus of log file if being appended.
500-
FileStatus prevFileStatus = partitionToFileStatusesMap.get(entry.getKey()).get(fullPath.getName());
501-
FileStatus combinedFs = new FileStatus(prevFileStatus.getLen() + stat.getTotalWriteBytes(),
502-
false, 0, 0, 0, fullPath);
503-
partitionToFileStatusesMap.get(entry.getKey()).put(fullPath.getName(), combinedFs);
504-
} else {
505-
FileStatus fs = new FileStatus(stat.getTotalWriteBytes(), false, 0, 0,
506-
0, fullPath);
507-
partitionToFileStatusesMap.get(entry.getKey()).put(fullPath.getName(), fs);
508-
}
505+
FileStatus fs = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0,
506+
0, fullPath);
507+
partitionToFileStatusesMap.get(entry.getKey()).put(fullPath.getName(), fs);
509508
}
510509
}
511510
}

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord
2424
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
2525
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
2626
import org.apache.hudi.exception.HoodieException
27-
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listStatusForAffectedPartitions
27+
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
2828
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
2929
import org.apache.log4j.LogManager
3030
import org.apache.spark.deploy.SparkHadoopUtil
@@ -152,7 +152,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
152152
}
153153

154154
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
155-
val partitionsWithFileStatus = listStatusForAffectedPartitions(new Path(metaClient.getBasePath),
155+
val partitionsWithFileStatus = listAffectedFilesForCommits(new Path(metaClient.getBasePath),
156156
commitsToReturn, commitsTimelineToReturn)
157157
val affectedFileStatus = new ListBuffer[FileStatus]
158158
partitionsWithFileStatus.iterator.foreach(p =>

0 commit comments

Comments
 (0)