Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@

package org.apache.hudi.table.action.rollback;

import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.spark.api.java.JavaRDD;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
Expand Down Expand Up @@ -96,4 +99,61 @@ protected void twoUpsertCommitDataWithTwoPartitions(List<FileSlice> firstPartiti
assertEquals(1, secondPartitionCommit2FileSlices.size());
}
}

protected void insertOverwriteCommitDataWithTwoPartitions(List<FileSlice> firstPartitionCommit2FileSlices,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now have the insertOverwriteCommitDataWithTwoPartitions in HoodieClientRollbackTestBase.java . Is it necessary to add the unit test for TestMergeOnReadRollbackActionExecutor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I want to add test for MOR too. But because of time constraints just added one for COW. I plan to add MOR test later on.

List<FileSlice> secondPartitionCommit2FileSlices,
HoodieWriteConfig cfg,
boolean commitSecondInsertOverwrite) throws IOException {
//just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
/**
* Write 1 (upsert)
*/
String newCommitTime = "001";
List<HoodieRecord> records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is the same as twoUpsertCommitDataWithTwoPartitions, can we reuse it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only reuse two lines because some of the state generated (List) is needed for subsequent operations. So i'm not inclined to add another method for this. let me know if you have strong opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,Make sense

JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.startCommitWithTime(newCommitTime);
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
Assertions.assertNoWriteErrors(statuses.collect());
client.commit(newCommitTime, statuses);

// get fileIds written
HoodieTable table = this.getHoodieTable(metaClient, cfg);
SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
List<HoodieFileGroup> firstPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionCommit1FileGroups.size());
Set<String> partition1Commit1FileIds = firstPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());
List<HoodieFileGroup> secondPartitionCommit1FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, secondPartitionCommit1FileGroups.size());
Set<String> partition2Commit1FileIds = secondPartitionCommit1FileGroups.get(0).getAllFileSlices().map(FileSlice::getFileId).collect(Collectors.toSet());

/**
* Write 2 (one insert_overwrite)
*/
String commitActionType = HoodieTimeline.REPLACE_COMMIT_ACTION;
newCommitTime = "002";
records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
writeRecords = jsc.parallelize(records, 1);
client.startCommitWithTime(newCommitTime, commitActionType);
HoodieWriteResult result = client.insertOverwrite(writeRecords, newCommitTime);
statuses = result.getWriteStatuses();
Assertions.assertNoWriteErrors(statuses.collect());
if (commitSecondInsertOverwrite) {
client.commit(newCommitTime, statuses, Option.empty(), commitActionType, result.getPartitionToReplaceFileIds());
}
metaClient.reloadActiveTimeline();
// get new fileIds written as part of insert_overwrite
fsView = getFileSystemViewWithUnCommittedSlices(metaClient);
List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH)
.filter(fg -> !partition1Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
firstPartitionCommit2FileSlices.addAll(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));
List<HoodieFileGroup> secondPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH)
.filter(fg -> !partition2Commit1FileIds.contains(fg.getFileGroupId().getFileId())).collect(Collectors.toList());
secondPartitionCommit2FileSlices.addAll(secondPartitionCommit2FileGroups.get(0).getAllFileSlices().collect(Collectors.toList()));

assertEquals(1, firstPartitionCommit2FileSlices.size());
assertEquals(1, secondPartitionCommit2FileSlices.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -125,6 +124,19 @@ public void testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile()
assertFalse(testTable.baseFileExists(p2, "002", "id22"));
}

// Verify that rollback works with replacecommit
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCopyOnWriteRollbackWithReplaceCommits(boolean isUsingMarkers) throws IOException {
//1. prepare data and assert data result
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
this.insertOverwriteCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
HoodieTable table = this.getHoodieTable(metaClient, cfg);
performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws IOException {
Expand All @@ -133,8 +145,14 @@ public void testCopyOnWriteRollbackActionExecutor(boolean isUsingMarkers) throws
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();
this.twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, !isUsingMarkers);
metaClient.reloadActiveTimeline();
HoodieTable table = this.getHoodieTable(metaClient, cfg);

performRollbackAndValidate(isUsingMarkers, cfg, table, firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices);
}

private void performRollbackAndValidate(boolean isUsingMarkers, HoodieWriteConfig cfg, HoodieTable table,
List<FileSlice> firstPartitionCommit2FileSlices,
List<FileSlice> secondPartitionCommit2FileSlices) throws IOException {
//2. rollback
HoodieInstant commitInstant;
if (isUsingMarkers) {
Expand Down