Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ private void parseInputPaths(Path[] inputPaths, List<String> incrementalTables)
throws IOException {
for (Path inputPath : inputPaths) {
boolean basePathKnown = false;
String inputPathStr = inputPath.toString();
for (HoodieTableMetaClient metaClient : tableMetaClientMap.values()) {
if (inputPath.toString().contains(metaClient.getBasePath().toString())) {
String basePathStr = metaClient.getBasePath().toString();
if (inputPathStr.equals(basePathStr) || inputPathStr.startsWith(basePathStr + "/")) {
// We already know the base path for this inputPath.
basePathKnown = true;
// Check if this is for a snapshot query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,54 +436,21 @@ public static List<FileStatus> filterIncrementalFileStatus(Job job, HoodieTableM
return returns;
}

/**
* Takes in a list of filesStatus and a list of table metadata. Groups the files status list
* based on given table metadata.
*
* @param fileStatuses
* @param fileExtension
* @param metaClientList
* @return
* @throws IOException
*/
public static Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatusForSnapshotPaths(
FileStatus[] fileStatuses, String fileExtension, Collection<HoodieTableMetaClient> metaClientList) {
// This assumes the paths for different tables are grouped together
Map<HoodieTableMetaClient, List<FileStatus>> grouped = new HashMap<>();
HoodieTableMetaClient metadata = null;
for (FileStatus status : fileStatuses) {
Path inputPath = status.getPath();
if (!inputPath.getName().endsWith(fileExtension)) {
//FIXME(vc): skip non data files for now. This wont be needed once log file name start
// with "."
continue;
}
if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath().toString()))) {
for (HoodieTableMetaClient metaClient : metaClientList) {
if (inputPath.toString().contains(metaClient.getBasePath().toString())) {
metadata = metaClient;
if (!grouped.containsKey(metadata)) {
grouped.put(metadata, new ArrayList<>());
}
break;
}
}
}
grouped.get(metadata).add(status);
}
return grouped;
}

public static Map<HoodieTableMetaClient, List<Path>> groupSnapshotPathsByMetaClient(
Collection<HoodieTableMetaClient> metaClientList,
List<Path> snapshotPaths
) {
Map<HoodieTableMetaClient, List<Path>> grouped = new HashMap<>();
metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>()));
for (Path path : snapshotPaths) {
String inputPathStr = path.toString();
// Find meta client associated with the input path
metaClientList.stream().filter(metaClient -> path.toString().contains(metaClient.getBasePath().toString()))
.forEach(metaClient -> grouped.get(metaClient).add(path));
Option<HoodieTableMetaClient> matchedMetaClient = Option.fromJavaOptional(metaClientList.stream()
.filter(metaClient -> {
String basePathStr = metaClient.getBasePath().toString();
return inputPathStr.equals(basePathStr) || inputPathStr.startsWith(basePathStr + "/"); })
Copy link
Copy Markdown
Contributor

@wombatu-kun wombatu-kun Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we do the same fix for checks in groupFileStatusForSnapshotPaths (461 and 463 lines)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The groupFileStatusForSnapshotPaths is not used anywhere. In the HUDI-3094 the HoodieParquetInputFormat.listStatus() which was the caller of this method was removed and related logic was moved to InputPathHandler where it belongs now. So I deleted this method entirely.

.findFirst());
matchedMetaClient.ifPresent(metaClient -> grouped.get(metaClient).add(path));
}
return grouped;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class TestInputPathHandler {

// snapshot Table
public static final String ETL_TRIPS_TEST_NAME = "etl_trips";
public static final String MODEL_TRIPS_COW_TEST_NAME = "model_trips_cow";

// non Hoodie table
public static final String TRIPS_STATS_TEST_NAME = "trips_stats";
Expand All @@ -77,6 +78,7 @@ public class TestInputPathHandler {
private static String basePathTable4 = null; // non hoodie Path
private static String basePathTable5 = null;
private static String basePathTable6 = null;
private static String basePathTable7 = null;
private static List<String> incrementalTables;
private static List<Path> incrementalPaths;
private static List<Path> snapshotPaths;
Expand Down Expand Up @@ -123,6 +125,7 @@ static void initTables() throws IOException {
String tempPath = "/tmp/";
basePathTable5 = tempPath + EMPTY_SNAPSHOT_TEST_NAME;
basePathTable6 = tempPath + EMPTY_INCREMENTAL_TEST_NAME;
basePathTable7 = parentPath.resolve(MODEL_TRIPS_COW_TEST_NAME).toAbsolutePath().toString();

dfs.mkdirs(new Path(basePathTable1));
initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
Expand All @@ -145,6 +148,10 @@ static void initTables() throws IOException {
initTableType(dfs.getConf(), basePathTable6, EMPTY_INCREMENTAL_TEST_NAME, HoodieTableType.MERGE_ON_READ);
incrementalPaths.add(new Path(basePathTable6));

dfs.mkdirs(new Path(basePathTable7));
initTableType(dfs.getConf(), basePathTable7, MODEL_TRIPS_COW_TEST_NAME, HoodieTableType.COPY_ON_WRITE);
snapshotPaths.addAll(generatePartitions(dfs, basePathTable7));

inputPaths.addAll(incrementalPaths);
inputPaths.addAll(snapshotPaths);
inputPaths.addAll(nonHoodiePaths);
Expand Down