Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand Down Expand Up @@ -461,7 +462,7 @@ private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) {

@ParameterizedTest
@MethodSource("providerClassResolutionStrategyAndTableType")
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass,
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class<? extends LockProvider<?>> providerClass,
ConflictResolutionStrategy resolutionStrategy) throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
Expand Down Expand Up @@ -526,6 +527,11 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);

// Test with concurrent operations could be flaky, to reduce possibility of wrong ordering some queue is added
// For InProcessLockProvider we could wait less
final int waitAndRunFirst = providerClass.isAssignableFrom(InProcessLockProvider.class) ? 2000 : 20000;
final int waitAndRunSecond = providerClass.isAssignableFrom(InProcessLockProvider.class) ? 3000 : 30000;

// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
final String newCommitTime = client1.createNewInstantTime();
Expand All @@ -534,7 +540,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta

// We want the upsert to go through only after the compaction
// and cleaning schedule completion. So, waiting on latch here.
latchCountDownAndWait(scheduleCountDownLatch, 30000);
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunSecond);
if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy instanceof PreferWriterConflictResolutionStrategy)) {
// HUDI-6897: Improve SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC
// There is no need to throw concurrent modification exception for the simple strategy under NB-CC, because the compactor would finally resolve the conflicts instead.
Expand All @@ -561,12 +567,12 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT);
});
}
latchCountDownAndWait(scheduleCountDownLatch, 30000);
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
});

Future future3 = executors.submit(() -> {
assertDoesNotThrow(() -> {
latchCountDownAndWait(scheduleCountDownLatch, 30000);
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
String cleanCommitTime = client3.createNewInstantTime();
client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN);
});
Expand All @@ -590,15 +596,15 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
future1 = executors.submit(() -> {
final String newCommitTime = client1.createNewInstantTime();
final int numRecords = 100;
latchCountDownAndWait(runCountDownLatch, 30000);
latchCountDownAndWait(runCountDownLatch, waitAndRunSecond);
assertDoesNotThrow(() -> {
createCommitWithInserts(cfg, client1, thirdCommitTime, newCommitTime, numRecords, true);
validInstants.add(newCommitTime);
});
});

future2 = executors.submit(() -> {
latchCountDownAndWait(runCountDownLatch, 30000);
latchCountDownAndWait(runCountDownLatch, waitAndRunFirst);
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client2.compact(pendingCompactionTime);
Expand All @@ -609,7 +615,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
});

future3 = executors.submit(() -> {
latchCountDownAndWait(runCountDownLatch, 30000);
latchCountDownAndWait(runCountDownLatch, waitAndRunFirst);
assertDoesNotThrow(() -> {
client3.clean(pendingCleanTime, false);
validInstants.add(pendingCleanTime);
Expand Down