Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, Stri
*/
protected void mayBeCleanAndArchive(HoodieTable table) {
autoCleanOnCommit();
autoArchiveOnCommit(table);
// reload table to that timeline reflects the clean commit
autoArchiveOnCommit(createTable(config, hadoopConf));
}

protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {

// collect active commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(5) {
assertResult(4) {
Copy link
Contributor

@wombatu-kun wombatu-kun Jul 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan @yihua
Can somebody explain it? What is the root cause of the changes in the results here? Why when we "insert data to table, will generate 5 active commits and 2 archived commits" get only 4 commits here?

commits.length
}

// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect()
assertResult(2) {
assertResult(3) {
archivedCommits.length
}
}
Expand Down Expand Up @@ -106,14 +106,14 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {

// collect active commits for table
val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
assertResult(5) {
assertResult(4) {
commits.length
}

// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect()
assertResult(2) {
assertResult(3) {
archivedCommits.length
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,15 +1109,31 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean, HoodieR
TestHelpers.addRecordMerger(recordType, configs);
cfg.configs = configs;
cfg.continuousMode = false;
// timeline as of now. no cleaner and archival kicked in.
// c1, c2, rc3, c4, c5, rc6,

for (int i = 0; i < 2; i++) {
ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
}

// Step 5 : FirstReplaceHoodieInstant is retained for clean.
ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
// after 1 round of sync, timeline will be as follows
// just before clean
// c1, c2, rc3, c4, c5, rc6, c7
// after clean
// c1, c2, rc3, c4, c5, rc6, c7, c8.clean (earliest commit to retain is c7)
// after archival (retain 4 commits)
// c4, c5, rc6, c7, c8.clean

// old code has 2 sync() calls. book-keeping the sequence for now.
// after 2nd round of sync
// just before clean
// c4, c5, rc6, c7, c8.clean, c9
// after clean
// c4, c5, rc6, c7, c8.clean, c9, c10.clean (earliest commit to retain c9)
// after archival
// c5, rc6, c7, c8.clean, c9, c10.clean

// Step 5 : FirstReplaceHoodieInstant should not be retained.
long count = meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant -> firstReplaceHoodieInstant.get().equals(instant)).count();
assertEquals(1, count);
assertEquals(0, count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want construct a test where the replacecommit should be retained?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan I'll land this PR to unblock CI. Let's follow up on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.


// Step 6 : All the replaced files in firstReplaceHoodieInstant should be deleted through sync/async cleaner.
for (String replacedFilePath : replacedFilePaths) {
Expand Down Expand Up @@ -2387,7 +2403,7 @@ private void insertInTable(String tableBasePath, int count, WriteOperationType o
if (cfg.configs == null) {
cfg.configs = new ArrayList<>();
}
cfg.configs.add("hoodie.cleaner.commits.retained=3");
cfg.configs.add("hoodie.cleaner.commits.retained=2");
cfg.configs.add("hoodie.keep.min.commits=4");
cfg.configs.add("hoodie.keep.max.commits=5");
cfg.configs.add("hoodie.test.source.generate.inserts=true");
Expand Down