diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index a20efbaf54b36..ba6fb446a893a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.lock.LockProvider; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -461,7 +462,7 @@ private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) { @ParameterizedTest @MethodSource("providerClassResolutionStrategyAndTableType") - public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass, + public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class> providerClass, ConflictResolutionStrategy resolutionStrategy) throws Exception { // create inserts X 1 if (tableType == HoodieTableType.MERGE_ON_READ) { @@ -526,6 +527,11 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg); + // Test with concurrent operations could be flaky, to reduce possibility of wrong ordering some queue is added + // For InProcessLockProvider we could wait less + final int waitAndRunFirst = providerClass.isAssignableFrom(InProcessLockProvider.class) ? 2000 : 20000; + final int waitAndRunSecond = providerClass.isAssignableFrom(InProcessLockProvider.class) ? 3000 : 30000; + // Create upserts, schedule cleaning, schedule compaction in parallel Future future1 = executors.submit(() -> { final String newCommitTime = client1.createNewInstantTime(); @@ -534,7 +540,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta // We want the upsert to go through only after the compaction // and cleaning schedule completion. So, waiting on latch here. - latchCountDownAndWait(scheduleCountDownLatch, 30000); + latchCountDownAndWait(scheduleCountDownLatch, waitAndRunSecond); if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy instanceof PreferWriterConflictResolutionStrategy)) { // HUDI-6897: Improve SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC // There is no need to throw concurrent modification exception for the simple strategy under NB-CC, because the compactor would finally resolve the conflicts instead. @@ -561,12 +567,12 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT); }); } - latchCountDownAndWait(scheduleCountDownLatch, 30000); + latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst); }); Future future3 = executors.submit(() -> { assertDoesNotThrow(() -> { - latchCountDownAndWait(scheduleCountDownLatch, 30000); + latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst); String cleanCommitTime = client3.createNewInstantTime(); client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN); }); @@ -590,7 +596,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta future1 = executors.submit(() -> { final String newCommitTime = client1.createNewInstantTime(); final int numRecords = 100; - latchCountDownAndWait(runCountDownLatch, 30000); + latchCountDownAndWait(runCountDownLatch, waitAndRunSecond); assertDoesNotThrow(() -> { createCommitWithInserts(cfg, client1, thirdCommitTime, newCommitTime, numRecords, true); validInstants.add(newCommitTime); @@ -598,7 +604,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta }); future2 = executors.submit(() -> { - latchCountDownAndWait(runCountDownLatch, 30000); + latchCountDownAndWait(runCountDownLatch, waitAndRunFirst); if (tableType == HoodieTableType.MERGE_ON_READ) { assertDoesNotThrow(() -> { HoodieWriteMetadata> compactionMetadata = client2.compact(pendingCompactionTime); @@ -609,7 +615,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta }); future3 = executors.submit(() -> { - latchCountDownAndWait(runCountDownLatch, 30000); + latchCountDownAndWait(runCountDownLatch, waitAndRunFirst); assertDoesNotThrow(() -> { client3.clean(pendingCleanTime, false); validInstants.add(pendingCleanTime);