Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,24 @@ public static Path getPartitionPath(Path basePath, String partitionPath) {
return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new Path(basePath, partitionPath);
}

/**
* Extracts the file name from the relative path based on the table base path. For example:
* "/2022/07/29/file1.parquet", "/2022/07/29" -> "file1.parquet"
* "2022/07/29/file2.parquet", "2022/07/29" -> "file2.parquet"
* "/file3.parquet", "" -> "file3.parquet"
* "file4.parquet", "" -> "file4.parquet"
*
* @param filePathWithPartition the relative file path based on the table base path.
* @param partition the relative partition path. For partitioned table, `partition` contains the relative partition path;
* for non-partitioned table, `partition` is empty
* @return Extracted file name in String.
*/
public static String getFileName(String filePathWithPartition, String partition) {
int offset = StringUtils.isNullOrEmpty(partition)
? (filePathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
return filePathWithPartition.substring(offset);
}

/**
* Get DFS full partition path (e.g. hdfs://ip-address:8020:/<absolute path>)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,13 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
return map;
}

int offset = partition.equals(NON_PARTITIONED_NAME)
? (pathWithPartition.startsWith("/") ? 1 : 0)
: partition.length() + 1;
String filename = pathWithPartition.substring(offset);
String fileName = FSUtils.getFileName(pathWithPartition, partitionStatName);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the change, the partition identifier is used, instead of partitionStatName. For a partitioned table, there is no difference; for a non-partitioned table, the partition identifier is . while partitionStatName could be empty or /. The new logic depends on partitionStatName instead of partition identified, and the file name extracted is not affected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new util getFileName() uses partition, right? why not pass partition to check? as how previously done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use partition here which is generated by getPartitionIdentifier(partitionStatName), changing the empty relative partition path to . partition identifier. getFileName() expects plain relative partition path, instead of the partition identifier used in the metadata table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i see. i was confused by the var name partition, which should actually be called partitionIdentifier


// Since write-stats are coming in no particular order, if the same
// file have previously been appended to w/in the txn, we simply pick max
// of the sizes as reported after every write, since file-sizes are
// monotonically increasing (ie file-size never goes down, unless deleted)
map.merge(filename, stat.getFileSizeInBytes(), Math::max);
map.merge(fileName, stat.getFileSizeInBytes(), Math::max);

return map;
},
Expand Down Expand Up @@ -410,12 +407,7 @@ public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(
return Collections.emptyListIterator();
}

// For partitioned table, "partition" contains the relative partition path;
// for non-partitioned table, "partition" is empty
int offset = StringUtils.isNullOrEmpty(partition)
? (pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;

final String fileName = pathWithPartition.substring(offset);
String fileName = FSUtils.getFileName(pathWithPartition, partition);
if (!FSUtils.isBaseFile(new Path(fileName))) {
return Collections.emptyListIterator();
}
Expand Down Expand Up @@ -1162,13 +1154,8 @@ private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
boolean isDeleted) {
String partitionName = getPartitionIdentifier(partitionPath);
// NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like
// absolute path
String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;
String fileName = partitionName.equals(NON_PARTITIONED_NAME)
? filePartitionPath
: filePartitionPath.substring(partitionName.length() + 1);
String fileName = FSUtils.getFileName(filePath, partitionPath);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here, using partitionPath directly instead of the partition identified.


if (isDeleted) {
// TODO we should delete records instead of stubbing them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,17 @@ public void testFileNameRelatedFunctions() throws Exception {
Files.createFile(partitionPath.resolve(log3));

assertEquals(3, (int) FSUtils.getLatestLogVersion(FSUtils.getFs(basePath, new Configuration()),
new Path(partitionPath.toString()), fileId, LOG_EXTENTION, instantTime).get().getLeft());
new Path(partitionPath.toString()), fileId, LOG_EXTENTION, instantTime).get().getLeft());
assertEquals(4, FSUtils.computeNextLogVersion(FSUtils.getFs(basePath, new Configuration()),
new Path(partitionPath.toString()), fileId, LOG_EXTENTION, instantTime));
new Path(partitionPath.toString()), fileId, LOG_EXTENTION, instantTime));
}

@Test
public void testGetFilename() {
assertEquals("file1.parquet", FSUtils.getFileName("/2022/07/29/file1.parquet", "/2022/07/29"));
assertEquals("file2.parquet", FSUtils.getFileName("2022/07/29/file2.parquet", "2022/07/29"));
assertEquals("file3.parquet", FSUtils.getFileName("/file3.parquet", ""));
assertEquals("file4.parquet", FSUtils.getFileName("file4.parquet", ""));
}

private void prepareTestDirectory(FileSystem fileSystem, String rootDir) throws IOException {
Expand Down