diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index dae43602b66a3..d9983bd65f86b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -97,6 +97,13 @@ public static String readStartCommitTime(JobContext job, String tableName) { return job.getConfiguration().get(startCommitTimestampName); } + /** + * Gets the n'th parent for the Path. Assumes the path has at-least n components + * + * @param path + * @param n + * @return + */ public static Path getNthParent(Path path, int n) { Path parent = path; for (int i = 0; i < n; i++) { @@ -105,6 +112,12 @@ public static Path getNthParent(Path path, int n) { return parent; } + /** + * Returns a list of tableNames for which hoodie..consume.mode is set to incremental else returns empty List + * + * @param job + * @return + */ public static List getIncrementalTableNames(JobContext job) { Map tablesModeMap = job.getConfiguration() .getValByRegex(HOODIE_CONSUME_MODE_PATTERN_STRING.pattern()); @@ -122,19 +135,42 @@ public static List getIncrementalTableNames(JobContext job) { return result; } + /** + * Depending on the configs hoodie.%s.consume.pending.commits and hoodie.%s.consume.commit of job + * + * (hoodie..consume.pending.commits, hoodie..consume.commit) -> + * (true, validCommit) -> returns activeTimeline filtered until validCommit + * (true, InValidCommit) -> Raises HoodieIOException + * (true, notSet) -> Raises HoodieIOException + * (false, validCommit) -> returns completedTimeline filtered until validCommit + * (false, InValidCommit) -> Raises HoodieIOException + * (false or notSet, notSet) -> returns completedTimeline unfiltered + * + * validCommit is one which exists in the timeline being checked and vice versa + * + * @param tableName + * @param job + * @param metaClient + * @return + */ public static HoodieTimeline getTableTimeline(final String tableName, final JobConf job, final HoodieTableMetaClient metaClient) { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline(); + boolean includePendingCommits = job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false); - if (includePendingCommits) { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline(); - String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, tableName)); - if (maxCommit == null || !timeline.containsInstant(maxCommit)) { - LOG.info("Timestamp configured for validation: " + maxCommit + " commits timeline:" + timeline + " table: " + tableName); - throw new HoodieIOException("Valid timestamp is required for " + HOODIE_CONSUME_COMMIT + " in validate mode"); - } - return timeline.findInstantsBeforeOrEquals(maxCommit); + String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, tableName)); + + if (!includePendingCommits && maxCommit == null) { + return timeline.filterCompletedInstants(); } - // by default return all completed commits. - return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + return filterIfInstantExists(tableName, includePendingCommits ? timeline : timeline.filterCompletedInstants(), maxCommit); + } + + private static HoodieTimeline filterIfInstantExists(String tableName, HoodieTimeline timeline, String maxCommit) { + if (maxCommit == null || !timeline.containsInstant(maxCommit)) { + LOG.info("Timestamp " + maxCommit + " doesn't exist in the commits timeline:" + timeline + " table: " + tableName); + throw new HoodieIOException("Valid timestamp is required for " + HOODIE_CONSUME_COMMIT + " in snapshot mode"); + } + return timeline.findInstantsBeforeOrEquals(maxCommit); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index fb095fb2dbac1..c4fed987dd011 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -59,6 +59,7 @@ import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -120,7 +121,6 @@ public void testPendingCompactionWithActiveCommits() throws IOException { assertFalse(filteredTimeline.containsInstant(t4)); assertFalse(filteredTimeline.containsInstant(t5)); assertFalse(filteredTimeline.containsInstant(t6)); - // remove compaction instant and setup timeline again instants.remove(t3); timeline = new HoodieActiveTimeline(metaClient); @@ -193,6 +193,55 @@ public void testInputFormatUpdates() throws IOException { + "files from 200 commit", files, "100", 5); } + @Test + public void testSnapshotWithInvalidCommitShouldThrowException() throws IOException { + File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100"); + InputFormatTestUtil.commit(basePath, "100"); + + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "1"); + Exception exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf)); + assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage()); + + InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "1"); + exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf)); + assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage()); + } + + @Test + public void testPointInTimeQueryWithUpdates() throws IOException { + // initial commit + File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100"); + InputFormatTestUtil.commit(basePath, "100"); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + + // update files + InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", true); + // Before the commit + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + ensureFilesInCommit("Commit 200 has not been committed. We should not see files from this commit", files, "200", 0); + InputFormatTestUtil.commit(basePath, "200"); + + InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "100"); + + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + ensureFilesInCommit("We shouldn't have any file pertaining to commit 200", files, "200", 0); + ensureFilesInCommit("All files should be from commit 100", files, "100", 10); + + InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "200"); + files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + ensureFilesInCommit("5 files for commit 200", files, "200", 5); + ensureFilesInCommit("5 files for commit 100", files, "100", 5); + } + @Test public void testInputFormatWithCompaction() throws IOException { // initial commit @@ -496,6 +545,11 @@ public void testSnapshotPreCommitValidate() throws IOException { // expected because validate is called with invalid instantTime } + //Creating a new jobCOnf Object because old one has hoodie.%.consume.commit set + jobConf = new JobConf(); + inputFormat.setConf(jobConf); + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + // verify that snapshot mode skips uncommitted files InputFormatTestUtil.setupSnapshotScanMode(jobConf); files = inputFormat.listStatus(jobConf); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 5d3c4469b6dad..19568938d7ed7 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -141,19 +141,30 @@ public static void setupSnapshotIncludePendingCommits(JobConf jobConf, String in jobConf.set(validateTimestampName, instantTime); } + public static void setupSnapshotMaxCommitTimeQueryMode(JobConf jobConf, String maxInstantTime) { + setUpScanMode(jobConf); + String validateTimestampName = + String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(validateTimestampName, maxInstantTime); + } + public static void setupSnapshotScanMode(JobConf jobConf) { setupSnapshotScanMode(jobConf, false); } private static void setupSnapshotScanMode(JobConf jobConf, boolean includePending) { - String modePropertyName = - String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); - jobConf.set(modePropertyName, HoodieHiveUtils.SNAPSHOT_SCAN_MODE); + setUpScanMode(jobConf); String includePendingCommitsName = String.format(HoodieHiveUtils.HOODIE_CONSUME_PENDING_COMMITS, HoodieTestUtils.RAW_TRIPS_TEST_NAME); jobConf.setBoolean(includePendingCommitsName, includePending); } + private static void setUpScanMode(JobConf jobConf) { + String modePropertyName = + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtils.SNAPSHOT_SCAN_MODE); + } + public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException { return prepareParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);