Skip to content

testMultiWriterWithAsyncTableServicesWithConflict is flaky #16883

@hudi-bot

Description

@hudi-bot

{code:java}
public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class> providerClass,
ConflictResolutionStrategy resolutionStrategy) throws Exception {
// create inserts X 1
if (tableType == MERGE_ON_READ) {
setUpMORTestTable();
}

// Use RDD API to perform clustering (TODO: Fix row-writer API)
Properties properties = new Properties();
properties.put("hoodie.datasource.write.row.writer.enable", String.valueOf(false));

// Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withAutoClean(false)
.withAsyncClean(true)
.retainCommits(0)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(2).build())
.withEmbeddedTimelineServerEnabled(false)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(
FileSystemViewStorageType.MEMORY).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass)
.withConflictResolutionStrategy(resolutionStrategy)
.build()).withAutoCommit(false).withProperties(lockProperties)
.withProperties(properties);

Set validInstants = new HashSet<>();

// Create the first commit with inserts
HoodieWriteConfig cfg = writeConfigBuilder.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
String firstCommitTime = client.createNewInstantTime();
createCommitWithInserts(cfg, client, "000", firstCommitTime, 200, true);
validInstants.add(firstCommitTime);

// Create 2 commits with upserts
String secondCommitTime = client.createNewInstantTime();
createCommitWithUpserts(cfg, client, firstCommitTime, Option.of("000"), secondCommitTime, 100);
String thirdCommitTime = client.createNewInstantTime();
createCommitWithUpserts(cfg, client, secondCommitTime, Option.of("000"), thirdCommitTime, 100);
validInstants.add(secondCommitTime);
validInstants.add(thirdCommitTime);

// Three clients running actions in parallel
final int threadCount = 3;
final CountDownLatch scheduleCountDownLatch = new CountDownLatch(threadCount);
final ExecutorService executors = Executors.newFixedThreadPool(threadCount);

// Write config with clustering enabled
final HoodieWriteConfig cfg2 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(true)
.withInlineClusteringNumCommits(1)
.build())
.build();
final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);

// Test with concurrent operations could be flaky, to reduce possibility of wrong ordering some queue is added
// For InProcessLockProvider we could wait less
final int waitAndRunFirst = providerClass.isAssignableFrom(InProcessLockProvider.class) ? 2000 : 20000;
final int waitAndRunSecond = providerClass.isAssignableFrom(InProcessLockProvider.class) ? 3000 : 30000;

// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
final String newCommitTime = client1.createNewInstantTime();
final int numRecords = 100;
final String commitTimeBetweenPrevAndNew = secondCommitTime;

// We want the upsert to go through only after the compaction
// and cleaning schedule completion. So, waiting on latch here.
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunSecond);
if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy instanceof PreferWriterConflictResolutionStrategy)) {
  // HUDI-6897: Improve SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC
  // There is no need to throw concurrent modification exception for the simple strategy under NB-CC, because the compactor would finally resolve the conflicts instead.

vvvvvvvvvvvvvvvvvvvvvvvv this assertion failed vvvvvvvvvvvvvvvvvvvvvvvvvvvv
// Since the concurrent modifications went in, this upsert has
// to fail
assertThrows(HoodieWriteConflictException.class, () -> {
createCommitWithUpserts(cfg, client1, thirdCommitTime, Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
});
} else {
// We don't have the compaction for COW and so this upsert
// has to pass
assertDoesNotThrow(() -> {
createCommitWithUpserts(cfg, client1, thirdCommitTime, Option.of(commitTimeBetweenPrevAndNew), newCommitTime, numRecords);
});
validInstants.add(newCommitTime);
}
});

Future future2 = executors.submit(() -> {
if (tableType == MERGE_ON_READ) {
assertDoesNotThrow(() -> {
String compactionTimeStamp = client2.createNewInstantTime();
client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT);
});
}
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
});

Future future3 = executors.submit(() -> {
assertDoesNotThrow(() -> {
latchCountDownAndWait(scheduleCountDownLatch, waitAndRunFirst);
String cleanCommitTime = client3.createNewInstantTime();
client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN);
});
});
future1.get(); {code}
 

spark-tests (scala-2.12, spark3.5)
{code:java}
Caused by: org.opentest4j.AssertionFailedError: Expected org.apache.hudi.exception.HoodieWriteConflictException to be thrown, but nothing was thrown.
6648 at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:71)
6649 at org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:37)
6650 at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3082)
6651 at org.apache.hudi.client.TestHoodieClientMultiWriter.lambda$testMultiWriterWithAsyncTableServicesWithConflict$12(TestHoodieClientMultiWriter.java:837)
6652 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
6653 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
6654 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
6655 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
6656 at java.lang.Thread.run(Thread.java:750) {code}
 

JIRA info


Comments

17/Mar/25 11:48;codope;We had already added a countdown latch in the test and fixed the order of operations in the test:

[https://github.com//pull/4046]

[https://github.com//pull/12137]

Did we run the test locally N times? Does it fail for an external lock provider? Should we try to increase the wait time?;;;

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions