diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 27f83bd145297..6dd9ff12b5310 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -37,11 +38,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.testutils.HoodieClientTestBase; - -import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -53,12 +51,16 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -132,7 +134,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E } } - @Disabled + @Test public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception { testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE); } @@ -202,6 +204,21 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table }); } + /** + * Count down the latch and await for all the needed threads to join. + * + * @param latch - Count down latch + * @param waitTimeMillis - Max wait time in millis for waiting + */ + private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) { + latch.countDown(); + try { + latch.await(waitTimeMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // + } + } + private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception { // create inserts X 1 if (tableType == HoodieTableType.MERGE_ON_READ) { @@ -238,82 +255,101 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t createCommitWithUpserts(cfg, client, "002", "000", "003", 100); validInstants.add("002"); validInstants.add("003"); - ExecutorService executors = Executors.newFixedThreadPool(2); - // write config with clustering enabled - HoodieWriteConfig cfg2 = writeConfigBuilder - .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build()) + + // Three clients running actions in parallel + final int threadCount = 3; + final CountDownLatch scheduleCountDownLatch = new CountDownLatch(threadCount); + final ExecutorService executors = Executors.newFixedThreadPool(threadCount); + + // Write config with clustering enabled + final HoodieWriteConfig cfg2 = writeConfigBuilder + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withInlineClustering(true) + .withInlineClusteringNumCommits(1) + .build()) .build(); - SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2); - SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2); + final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg); + // Create upserts, schedule cleaning, schedule compaction in parallel Future future1 = executors.submit(() -> { - String newCommitTime = "004"; - int numRecords = 100; - String commitTimeBetweenPrevAndNew = "002"; - try { - createCommitWithUpserts(cfg2, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); - if (tableType == HoodieTableType.MERGE_ON_READ) { - fail("Conflicts not handled correctly"); - } - validInstants.add("004"); - } catch (Exception e1) { - if (tableType == HoodieTableType.MERGE_ON_READ) { - assertTrue(e1 instanceof HoodieWriteConflictException); - } + final String newCommitTime = "004"; + final int numRecords = 100; + final String commitTimeBetweenPrevAndNew = "002"; + + // We want the upsert to go through only after the compaction + // and cleaning schedule completion. So, waiting on latch here. + latchCountDownAndWait(scheduleCountDownLatch, 30000); + if (tableType == HoodieTableType.MERGE_ON_READ) { + // Since the compaction already went in, this upsert has + // to fail + assertThrows(IllegalArgumentException.class, () -> { + createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + }); + } else { + // We don't have the compaction for COW and so this upsert + // has to pass + assertDoesNotThrow(() -> { + createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + }); + validInstants.add(newCommitTime); } }); + Future future2 = executors.submit(() -> { - try { - client2.scheduleTableService("005", Option.empty(), TableServiceType.COMPACT); - } catch (Exception e2) { - if (tableType == HoodieTableType.MERGE_ON_READ) { - throw new RuntimeException(e2); - } + if (tableType == HoodieTableType.MERGE_ON_READ) { + assertDoesNotThrow(() -> { + client2.scheduleTableService("005", Option.empty(), TableServiceType.COMPACT); + }); } + latchCountDownAndWait(scheduleCountDownLatch, 30000); }); + Future future3 = executors.submit(() -> { - try { - client2.scheduleTableService("006", Option.empty(), TableServiceType.CLEAN); - } catch (Exception e2) { - throw new RuntimeException(e2); - } + assertDoesNotThrow(() -> { + latchCountDownAndWait(scheduleCountDownLatch, 30000); + client3.scheduleTableService("006", Option.empty(), TableServiceType.CLEAN); + }); }); future1.get(); future2.get(); future3.get(); + + CountDownLatch runCountDownLatch = new CountDownLatch(threadCount); // Create inserts, run cleaning, run compaction in parallel future1 = executors.submit(() -> { - String newCommitTime = "007"; - int numRecords = 100; - try { - createCommitWithInserts(cfg2, client1, "003", newCommitTime, numRecords); + final String newCommitTime = "007"; + final int numRecords = 100; + latchCountDownAndWait(runCountDownLatch, 30000); + assertDoesNotThrow(() -> { + createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords); validInstants.add("007"); - } catch (Exception e1) { - throw new RuntimeException(e1); - } + }); }); + future2 = executors.submit(() -> { - try { - JavaRDD writeStatusJavaRDD = (JavaRDD) client2.compact("005"); - client2.commitCompaction("005", writeStatusJavaRDD, Option.empty()); - validInstants.add("005"); - } catch (Exception e2) { - if (tableType == HoodieTableType.MERGE_ON_READ) { - throw new RuntimeException(e2); - } + latchCountDownAndWait(runCountDownLatch, 30000); + if (tableType == HoodieTableType.MERGE_ON_READ) { + assertDoesNotThrow(() -> { + JavaRDD writeStatusJavaRDD = (JavaRDD) client2.compact("005"); + client2.commitCompaction("005", writeStatusJavaRDD, Option.empty()); + validInstants.add("005"); + }); } }); + future3 = executors.submit(() -> { - try { - client2.clean("006", false); + latchCountDownAndWait(runCountDownLatch, 30000); + assertDoesNotThrow(() -> { + client3.clean("006", false); validInstants.add("006"); - } catch (Exception e2) { - throw new RuntimeException(e2); - } + }); }); future1.get(); future2.get(); future3.get(); + validInstants.addAll( metaClient.reloadActiveTimeline().getCompletedReplaceTimeline() .filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));