diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java index 5342f0b3e9cc8..f0c2e6a1fe2f5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java @@ -94,8 +94,10 @@ private void parseInputPaths(Path[] inputPaths, List incrementalTables) throws IOException { for (Path inputPath : inputPaths) { boolean basePathKnown = false; + String inputPathStr = inputPath.toString(); for (HoodieTableMetaClient metaClient : tableMetaClientMap.values()) { - if (inputPath.toString().contains(metaClient.getBasePath().toString())) { + String basePathStr = metaClient.getBasePath().toString(); + if (inputPathStr.equals(basePathStr) || inputPathStr.startsWith(basePathStr + "/")) { // We already know the base path for this inputPath. basePathKnown = true; // Check if this is for a snapshot query diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index c3e9a8504d072..aa24bd36fd92b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -436,44 +436,6 @@ public static List filterIncrementalFileStatus(Job job, HoodieTableM return returns; } - /** - * Takes in a list of filesStatus and a list of table metadata. Groups the files status list - * based on given table metadata. - * - * @param fileStatuses - * @param fileExtension - * @param metaClientList - * @return - * @throws IOException - */ - public static Map> groupFileStatusForSnapshotPaths( - FileStatus[] fileStatuses, String fileExtension, Collection metaClientList) { - // This assumes the paths for different tables are grouped together - Map> grouped = new HashMap<>(); - HoodieTableMetaClient metadata = null; - for (FileStatus status : fileStatuses) { - Path inputPath = status.getPath(); - if (!inputPath.getName().endsWith(fileExtension)) { - //FIXME(vc): skip non data files for now. This wont be needed once log file name start - // with "." - continue; - } - if ((metadata == null) || (!inputPath.toString().contains(metadata.getBasePath().toString()))) { - for (HoodieTableMetaClient metaClient : metaClientList) { - if (inputPath.toString().contains(metaClient.getBasePath().toString())) { - metadata = metaClient; - if (!grouped.containsKey(metadata)) { - grouped.put(metadata, new ArrayList<>()); - } - break; - } - } - } - grouped.get(metadata).add(status); - } - return grouped; - } - public static Map> groupSnapshotPathsByMetaClient( Collection metaClientList, List snapshotPaths @@ -481,9 +443,14 @@ public static Map> groupSnapshotPathsByMetaCli Map> grouped = new HashMap<>(); metaClientList.forEach(metaClient -> grouped.put(metaClient, new ArrayList<>())); for (Path path : snapshotPaths) { + String inputPathStr = path.toString(); // Find meta client associated with the input path - metaClientList.stream().filter(metaClient -> path.toString().contains(metaClient.getBasePath().toString())) - .forEach(metaClient -> grouped.get(metaClient).add(path)); + Option matchedMetaClient = Option.fromJavaOptional(metaClientList.stream() + .filter(metaClient -> { + String basePathStr = metaClient.getBasePath().toString(); + return inputPathStr.equals(basePathStr) || inputPathStr.startsWith(basePathStr + "/"); }) + .findFirst()); + matchedMetaClient.ifPresent(metaClient -> grouped.get(metaClient).add(path)); } return grouped; } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java index e1bdc6829320b..2a9380b321402 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java @@ -54,6 +54,7 @@ public class TestInputPathHandler { // snapshot Table public static final String ETL_TRIPS_TEST_NAME = "etl_trips"; + public static final String MODEL_TRIPS_COW_TEST_NAME = "model_trips_cow"; // non Hoodie table public static final String TRIPS_STATS_TEST_NAME = "trips_stats"; @@ -77,6 +78,7 @@ public class TestInputPathHandler { private static String basePathTable4 = null; // non hoodie Path private static String basePathTable5 = null; private static String basePathTable6 = null; + private static String basePathTable7 = null; private static List incrementalTables; private static List incrementalPaths; private static List snapshotPaths; @@ -123,6 +125,7 @@ static void initTables() throws IOException { String tempPath = "/tmp/"; basePathTable5 = tempPath + EMPTY_SNAPSHOT_TEST_NAME; basePathTable6 = tempPath + EMPTY_INCREMENTAL_TEST_NAME; + basePathTable7 = parentPath.resolve(MODEL_TRIPS_COW_TEST_NAME).toAbsolutePath().toString(); dfs.mkdirs(new Path(basePathTable1)); initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ); @@ -145,6 +148,10 @@ static void initTables() throws IOException { initTableType(dfs.getConf(), basePathTable6, EMPTY_INCREMENTAL_TEST_NAME, HoodieTableType.MERGE_ON_READ); incrementalPaths.add(new Path(basePathTable6)); + dfs.mkdirs(new Path(basePathTable7)); + initTableType(dfs.getConf(), basePathTable7, MODEL_TRIPS_COW_TEST_NAME, HoodieTableType.COPY_ON_WRITE); + snapshotPaths.addAll(generatePartitions(dfs, basePathTable7)); + inputPaths.addAll(incrementalPaths); inputPaths.addAll(snapshotPaths); inputPaths.addAll(nonHoodiePaths);