Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);

// Create upserts, schedule cleaning, schedule compaction in parallel
String fourthCommitTime = client.createNewInstantTime();
Future future1 = executors.submit(() -> {
final String newCommitTime = client1.createNewInstantTime();
final int numRecords = 100;
final String commitTimeBetweenPrevAndNew = secondCommitTime;

Expand All @@ -531,34 +531,35 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta

// Since the concurrent modifications went in, this upsert has
// to fail
// Note: the start time of upsert must be smaller than the compaction scheduling time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, why this upsert commit must be smaller than compaction scheduling time, can you describe the problem in detail.

assertThrows(HoodieWriteConflictException.class, () -> {
createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, fourthCommitTime, numRecords);
});
} else {
// We don't have the compaction for COW and so this upsert
// has to pass
assertDoesNotThrow(() -> {
createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, fourthCommitTime, numRecords);
});
validInstants.add(newCommitTime);
validInstants.add(fourthCommitTime);
}
});

String fifthCommitTime = client2.createNewInstantTime();
Future future2 = executors.submit(() -> {
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
String compactionTimeStamp = client2.createNewInstantTime();
client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT);
client2.scheduleTableService(fifthCommitTime, Option.empty(), TableServiceType.COMPACT);
});
}
latchCountDownAndWait(scheduleCountDownLatch, 30000);
});

String sixthCommitTime = client3.createNewInstantTime();
Future future3 = executors.submit(() -> {
assertDoesNotThrow(() -> {
latchCountDownAndWait(scheduleCountDownLatch, 30000);
String cleanCommitTime = client3.createNewInstantTime();
client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN);
client3.scheduleTableService(sixthCommitTime, Option.empty(), TableServiceType.CLEAN);
});
});
future1.get();
Expand Down