diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index b36885106cce0..9b0fbf9fc8b35 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -236,7 +236,7 @@ public static Option getAffectedPartitions(List commitsTo return false; }) .collect(Collectors.joining(",")); - return Option.of(incrementalInputPaths); + return StringUtils.isNullOrEmpty(incrementalInputPaths) ? Option.empty() : Option.of(incrementalInputPaths); } /** diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 27838897d5ec3..59214ac445049 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -117,7 +117,6 @@ public void testPendingCompactionWithActiveCommits() throws IOException { assertFalse(filteredTimeline.containsInstant(t5)); assertFalse(filteredTimeline.containsInstant(t6)); - // remove compaction instant and setup timeline again instants.remove(t3); timeline = new HoodieActiveTimeline(metaClient); @@ -239,6 +238,33 @@ public void testIncrementalSimple() throws IOException { "We should exclude commit 100 when returning incremental pull with start commit time as 100"); } + @Test + public void testIncrementalEmptyPartitions() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100"); + createCommitFile(basePath, "100", "2016/05/01"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + InputFormatTestUtil.setupIncremental(jobConf, "000", 1); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length, + "We should include only 1 commit 100 when returning incremental pull with start commit time as 100"); + ensureFilesInCommit("Pulling 1 commits from 000, should get us the 10 files from 100 commit", files, "100", 10); + + // Add new commit only to a new partition + partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "200"); + createCommitFile(basePath, "200", "2017/05/01"); + + InputFormatTestUtil.setupIncremental(jobConf, "100", 1); + files = inputFormat.listStatus(jobConf); + + assertEquals(0, files.length, + "We should exclude commit 200 when returning incremental pull with start commit time as 100 as filePaths does not include new partition"); + } + private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath) throws IOException { List writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1); @@ -355,7 +381,7 @@ public void testGetIncrementalTableNames() throws IOException { String incrementalMode1 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[0]); conf.set(incrementalMode1, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); String incrementalMode2 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, expectedincrTables[1]); - conf.set(incrementalMode2,HoodieHiveUtils.INCREMENTAL_SCAN_MODE); + conf.set(incrementalMode2, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); String incrementalMode3 = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.model_trips"); conf.set(incrementalMode3, HoodieHiveUtils.INCREMENTAL_SCAN_MODE.toLowerCase()); String defaultmode = String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips");