Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class TestIncrementalReadWithFullTableScan extends HoodieSparkClientTestBase {
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option("hoodie.cleaner.commits.retained", "3")
.option("hoodie.keep.min.commits", "4")
.option("hoodie.keep.max.commits", "5")
.option("hoodie.keep.max.commits", "7")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
Expand All @@ -106,6 +106,10 @@ class TestIncrementalReadWithFullTableScan extends HoodieSparkClientTestBase {
val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
val archivedInstants = hoodieMetaClient.getArchivedTimeline.filterCompletedInstants()
.getInstantsAsStream.distinct().toArray // C0 to C3
val nCompletedCommits = completedCommits.getInstants.size
val nArchivedInstants = archivedInstants.size
assertTrue(nCompletedCommits >= 3)
assertTrue(nArchivedInstants >= 3)

//Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files
val startUnarchivedCommitTs = completedCommits.nthInstant(0).get().getTimestamp //C4
Expand All @@ -125,16 +129,16 @@ class TestIncrementalReadWithFullTableScan extends HoodieSparkClientTestBase {

// Test start commit is archived, end commit is not archived
shouldThrowIfFallbackIsFalse(tableType,
() => runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, 5, false))
runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, 5, true)
() => runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, nArchivedInstants + 1, false))
runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, nArchivedInstants + 1, true)

// Test both start commit and end commits are not archived but got cleaned
shouldThrowIfFallbackIsFalse(tableType,
() => runIncrementalQueryAndCompare(startUnarchivedCommitTs, endUnarchivedCommitTs, 1, false))
runIncrementalQueryAndCompare(startUnarchivedCommitTs, endUnarchivedCommitTs, 1, true)

// Test start commit is not archived, end commits is out of the timeline
runIncrementalQueryAndCompare(startUnarchivedCommitTs, endOutOfRangeCommitTs, 5, true)
runIncrementalQueryAndCompare(startUnarchivedCommitTs, endOutOfRangeCommitTs, nCompletedCommits - 1, true)

// Test both start commit and end commits are out of the timeline
runIncrementalQueryAndCompare(startOutOfRangeCommitTs, endOutOfRangeCommitTs, 0, false)
Expand Down