Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand All @@ -37,11 +38,8 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -53,12 +51,16 @@
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -132,7 +134,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E
}
}

@Disabled
@Test
public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception {
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE);
}
Expand Down Expand Up @@ -202,6 +204,21 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table
});
}

/**
* Count down the latch and await for all the needed threads to join.
*
* @param latch - Count down latch
* @param waitTimeMillis - Max wait time in millis for waiting
*/
private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) {
latch.countDown();
try {
latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//
}
}

private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception {
// create inserts X 1
if (tableType == HoodieTableType.MERGE_ON_READ) {
Expand Down Expand Up @@ -238,82 +255,101 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t
createCommitWithUpserts(cfg, client, "002", "000", "003", 100);
validInstants.add("002");
validInstants.add("003");
ExecutorService executors = Executors.newFixedThreadPool(2);
// write config with clustering enabled
HoodieWriteConfig cfg2 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build())

// Three clients running actions in parallel
final int threadCount = 3;
final CountDownLatch scheduleCountDownLatch = new CountDownLatch(threadCount);
final ExecutorService executors = Executors.newFixedThreadPool(threadCount);

// Write config with clustering enabled
final HoodieWriteConfig cfg2 = writeConfigBuilder
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withInlineClustering(true)
.withInlineClusteringNumCommits(1)
.build())
.build();
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2);
final SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg);

// Create upserts, schedule cleaning, schedule compaction in parallel
Future future1 = executors.submit(() -> {
String newCommitTime = "004";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg2, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
if (tableType == HoodieTableType.MERGE_ON_READ) {
fail("Conflicts not handled correctly");
}
validInstants.add("004");
} catch (Exception e1) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertTrue(e1 instanceof HoodieWriteConflictException);
}
final String newCommitTime = "004";
final int numRecords = 100;
final String commitTimeBetweenPrevAndNew = "002";

// We want the upsert to go through only after the compaction
// and cleaning schedule completion. So, waiting on latch here.
latchCountDownAndWait(scheduleCountDownLatch, 30000);
if (tableType == HoodieTableType.MERGE_ON_READ) {
// Since the compaction already went in, this upsert has
// to fail
assertThrows(IllegalArgumentException.class, () -> {
createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
});
} else {
// We don't have the compaction for COW and so this upsert
// has to pass
assertDoesNotThrow(() -> {
createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
});
validInstants.add(newCommitTime);
}
});

Future future2 = executors.submit(() -> {
try {
client2.scheduleTableService("005", Option.empty(), TableServiceType.COMPACT);
} catch (Exception e2) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
throw new RuntimeException(e2);
}
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
client2.scheduleTableService("005", Option.empty(), TableServiceType.COMPACT);
});
}
latchCountDownAndWait(scheduleCountDownLatch, 30000);
});

Future future3 = executors.submit(() -> {
try {
client2.scheduleTableService("006", Option.empty(), TableServiceType.CLEAN);
} catch (Exception e2) {
throw new RuntimeException(e2);
}
assertDoesNotThrow(() -> {
latchCountDownAndWait(scheduleCountDownLatch, 30000);
client3.scheduleTableService("006", Option.empty(), TableServiceType.CLEAN);
});
});
future1.get();
future2.get();
future3.get();

CountDownLatch runCountDownLatch = new CountDownLatch(threadCount);
// Create inserts, run cleaning, run compaction in parallel
future1 = executors.submit(() -> {
String newCommitTime = "007";
int numRecords = 100;
try {
createCommitWithInserts(cfg2, client1, "003", newCommitTime, numRecords);
final String newCommitTime = "007";
final int numRecords = 100;
latchCountDownAndWait(runCountDownLatch, 30000);
assertDoesNotThrow(() -> {
createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords);
validInstants.add("007");
} catch (Exception e1) {
throw new RuntimeException(e1);
}
});
});

future2 = executors.submit(() -> {
try {
JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) client2.compact("005");
client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
validInstants.add("005");
} catch (Exception e2) {
if (tableType == HoodieTableType.MERGE_ON_READ) {
throw new RuntimeException(e2);
}
latchCountDownAndWait(runCountDownLatch, 30000);
if (tableType == HoodieTableType.MERGE_ON_READ) {
assertDoesNotThrow(() -> {
JavaRDD<WriteStatus> writeStatusJavaRDD = (JavaRDD<WriteStatus>) client2.compact("005");
client2.commitCompaction("005", writeStatusJavaRDD, Option.empty());
validInstants.add("005");
});
}
});

future3 = executors.submit(() -> {
try {
client2.clean("006", false);
latchCountDownAndWait(runCountDownLatch, 30000);
assertDoesNotThrow(() -> {
client3.clean("006", false);
validInstants.add("006");
} catch (Exception e2) {
throw new RuntimeException(e2);
}
});
});
future1.get();
future2.get();
future3.get();

validInstants.addAll(
metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()));
Expand Down