diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index d0896645427ec..859d568d707e2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -517,8 +517,8 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg); // Create upserts, schedule cleaning, schedule compaction in parallel + String fourthCommitTime = client.createNewInstantTime(); Future future1 = executors.submit(() -> { - final String newCommitTime = client1.createNewInstantTime(); final int numRecords = 100; final String commitTimeBetweenPrevAndNew = secondCommitTime; @@ -531,34 +531,35 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta // Since the concurrent modifications went in, this upsert has // to fail + // Note: the start time of upsert must be smaller than the compaction scheduling time assertThrows(HoodieWriteConflictException.class, () -> { - createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, fourthCommitTime, numRecords); }); } else { // We don't have the compaction for COW and so this upsert // has to pass assertDoesNotThrow(() -> { - createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, fourthCommitTime, numRecords); }); - validInstants.add(newCommitTime); + validInstants.add(fourthCommitTime); } }); + String fifthCommitTime = client2.createNewInstantTime(); Future future2 = executors.submit(() -> { if (tableType == HoodieTableType.MERGE_ON_READ) { assertDoesNotThrow(() -> { - String compactionTimeStamp = client2.createNewInstantTime(); - client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT); + client2.scheduleTableService(fifthCommitTime, Option.empty(), TableServiceType.COMPACT); }); } latchCountDownAndWait(scheduleCountDownLatch, 30000); }); + String sixthCommitTime = client3.createNewInstantTime(); Future future3 = executors.submit(() -> { assertDoesNotThrow(() -> { latchCountDownAndWait(scheduleCountDownLatch, 30000); - String cleanCommitTime = client3.createNewInstantTime(); - client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN); + client3.scheduleTableService(sixthCommitTime, Option.empty(), TableServiceType.CLEAN); }); }); future1.get();