Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public static String readStartCommitTime(JobContext job, String tableName) {
return job.getConfiguration().get(startCommitTimestampName);
}

/**
* Gets the n'th parent for the Path. Assumes the path has at-least n components
*
* @param path
* @param n
* @return
*/
public static Path getNthParent(Path path, int n) {
Path parent = path;
for (int i = 0; i < n; i++) {
Expand All @@ -105,6 +112,12 @@ public static Path getNthParent(Path path, int n) {
return parent;
}

/**
* Returns a list of tableNames for which hoodie.<tableName>.consume.mode is set to incremental else returns empty List
*
* @param job
* @return
*/
public static List<String> getIncrementalTableNames(JobContext job) {
Map<String, String> tablesModeMap = job.getConfiguration()
.getValByRegex(HOODIE_CONSUME_MODE_PATTERN_STRING.pattern());
Expand All @@ -122,19 +135,42 @@ public static List<String> getIncrementalTableNames(JobContext job) {
return result;
}

/**
* Depending on the configs hoodie.%s.consume.pending.commits and hoodie.%s.consume.commit of job
*
* (hoodie.<tableName>.consume.pending.commits, hoodie.<tableName>.consume.commit) ->
* (true, validCommit) -> returns activeTimeline filtered until validCommit
* (true, InValidCommit) -> Raises HoodieIOException
* (true, notSet) -> Raises HoodieIOException
* (false, validCommit) -> returns completedTimeline filtered until validCommit
* (false, InValidCommit) -> Raises HoodieIOException
* (false or notSet, notSet) -> returns completedTimeline unfiltered
*
* validCommit is one which exists in the timeline being checked and vice versa
*
* @param tableName
* @param job
* @param metaClient
* @return
*/
public static HoodieTimeline getTableTimeline(final String tableName, final JobConf job, final HoodieTableMetaClient metaClient) {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline();

boolean includePendingCommits = job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false);
if (includePendingCommits) {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline();
String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, tableName));
if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
LOG.info("Timestamp configured for validation: " + maxCommit + " commits timeline:" + timeline + " table: " + tableName);
throw new HoodieIOException("Valid timestamp is required for " + HOODIE_CONSUME_COMMIT + " in validate mode");
}
return timeline.findInstantsBeforeOrEquals(maxCommit);
String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, tableName));

if (!includePendingCommits && maxCommit == null) {
return timeline.filterCompletedInstants();
}

// by default return all completed commits.
return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
return filterIfInstantExists(tableName, includePendingCommits ? timeline : timeline.filterCompletedInstants(), maxCommit);
}

private static HoodieTimeline filterIfInstantExists(String tableName, HoodieTimeline timeline, String maxCommit) {
if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
LOG.info("Timestamp " + maxCommit + " doesn't exist in the commits timeline:" + timeline + " table: " + tableName);
throw new HoodieIOException("Valid timestamp is required for " + HOODIE_CONSUME_COMMIT + " in snapshot mode");
}
return timeline.findInstantsBeforeOrEquals(maxCommit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -120,7 +121,6 @@ public void testPendingCompactionWithActiveCommits() throws IOException {
assertFalse(filteredTimeline.containsInstant(t4));
assertFalse(filteredTimeline.containsInstant(t5));
assertFalse(filteredTimeline.containsInstant(t6));

// remove compaction instant and setup timeline again
instants.remove(t3);
timeline = new HoodieActiveTimeline(metaClient);
Expand Down Expand Up @@ -193,6 +193,55 @@ public void testInputFormatUpdates() throws IOException {
+ "files from 200 commit", files, "100", 5);
}

@Test
public void testSnapshotWithInvalidCommitShouldThrowException() throws IOException {
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
InputFormatTestUtil.commit(basePath, "100");

FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "1");
Exception exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf));
assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage());

InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "1");
exception = assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf));
assertEquals("Valid timestamp is required for hoodie.%s.consume.commit in snapshot mode", exception.getMessage());
}

@Test
public void testPointInTimeQueryWithUpdates() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
InputFormatTestUtil.commit(basePath, "100");

// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());

FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);

// update files
InputFormatTestUtil.simulateUpdates(partitionDir, baseFileExtension, "100", 5, "200", true);
// Before the commit
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
ensureFilesInCommit("Commit 200 has not been committed. We should not see files from this commit", files, "200", 0);
InputFormatTestUtil.commit(basePath, "200");

InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "100");

files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
ensureFilesInCommit("We shouldn't have any file pertaining to commit 200", files, "200", 0);
ensureFilesInCommit("All files should be from commit 100", files, "100", 10);

InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "200");
files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
ensureFilesInCommit("5 files for commit 200", files, "200", 5);
ensureFilesInCommit("5 files for commit 100", files, "100", 5);
}

@Test
public void testInputFormatWithCompaction() throws IOException {
// initial commit
Expand Down Expand Up @@ -496,6 +545,11 @@ public void testSnapshotPreCommitValidate() throws IOException {
// expected because validate is called with invalid instantTime
}

//Creating a new jobCOnf Object because old one has hoodie.%.consume.commit set
jobConf = new JobConf();
inputFormat.setConf(jobConf);
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());

// verify that snapshot mode skips uncommitted files
InputFormatTestUtil.setupSnapshotScanMode(jobConf);
files = inputFormat.listStatus(jobConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,30 @@ public static void setupSnapshotIncludePendingCommits(JobConf jobConf, String in
jobConf.set(validateTimestampName, instantTime);
}

public static void setupSnapshotMaxCommitTimeQueryMode(JobConf jobConf, String maxInstantTime) {
setUpScanMode(jobConf);
String validateTimestampName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_COMMIT, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(validateTimestampName, maxInstantTime);
}

public static void setupSnapshotScanMode(JobConf jobConf) {
setupSnapshotScanMode(jobConf, false);
}

private static void setupSnapshotScanMode(JobConf jobConf, boolean includePending) {
String modePropertyName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtils.SNAPSHOT_SCAN_MODE);
setUpScanMode(jobConf);
String includePendingCommitsName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_PENDING_COMMITS, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.setBoolean(includePendingCommitsName, includePending);
}

private static void setUpScanMode(JobConf jobConf) {
String modePropertyName =
String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME);
jobConf.set(modePropertyName, HoodieHiveUtils.SNAPSHOT_SCAN_MODE);
}

public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles,
int numberOfRecords, String commitNumber) throws IOException {
return prepareParquetTable(basePath, schema, numberOfFiles, numberOfRecords, commitNumber, HoodieTableType.COPY_ON_WRITE);
Expand Down