Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,78 +36,83 @@
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;

/**
* This lock provider is used for testing purposes only. It provides a simple file system based lock using HDFS atomic
* create operation. This lock does not support cleaning/expiring the lock after a failed write hence cannot be used
* in production environments.
* This lock provider is used for testing purposes only. It provides a simple file system based lock
* using filesystem's atomic create operation. This lock does not support cleaning/expiring the lock
* after a failed write. Must not be used in production environments.
*/
public class FileSystemBasedLockProviderTestClass implements LockProvider<String>, Serializable {

private static final String LOCK_NAME = "acquired";
private static final String LOCK = "lock";

private String lockPath;
private final int retryMaxCount;
private final int retryWaitTimeMs;
private transient FileSystem fs;
private transient Path lockFile;
protected LockConfiguration lockConfiguration;

public FileSystemBasedLockProviderTestClass(final LockConfiguration lockConfiguration, final Configuration configuration) {
this.lockConfiguration = lockConfiguration;
this.lockPath = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
this.fs = FSUtils.getFs(this.lockPath, configuration);
}

public void acquireLock() {
try {
fs.create(new Path(lockPath + "/" + LOCK_NAME), false).close();
} catch (IOException e) {
throw new HoodieIOException("Failed to acquire lock", e);
}
final String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY);
this.retryMaxCount = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY);
this.lockFile = new Path(lockDirectory + "/" + LOCK);
this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
}

@Override
public void close() {
try {
fs.delete(new Path(lockPath + "/" + LOCK_NAME), true);
} catch (IOException e) {
throw new HoodieLockException("Unable to release lock", e);
synchronized (LOCK) {
try {
fs.delete(this.lockFile, true);
} catch (IOException e) {
throw new HoodieLockException("Unable to release lock: " + getLock(), e);
}
}
}

@Override
public boolean tryLock(long time, TimeUnit unit) {
try {
int numRetries = 0;
while (fs.exists(new Path(lockPath + "/" + LOCK_NAME))
&& (numRetries++ <= lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY))) {
Thread.sleep(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY));
}
synchronized (LOCK_NAME) {
if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) {
return false;
synchronized (LOCK) {
while (fs.exists(this.lockFile)) {
LOCK.wait(retryWaitTimeMs);
numRetries++;
if (numRetries > retryMaxCount) {
return false;
}
}
acquireLock();
return fs.exists(this.lockFile);
}
return true;
} catch (IOException | InterruptedException e) {
throw new HoodieLockException("Failed to acquire lock", e);
throw new HoodieLockException("Failed to acquire lock: " + getLock(), e);
}
}

@Override
public void unlock() {
try {
if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) {
fs.delete(new Path(lockPath + "/" + LOCK_NAME), true);
synchronized (LOCK) {
try {
if (fs.exists(this.lockFile)) {
fs.delete(this.lockFile, true);
}
} catch (IOException io) {
throw new HoodieIOException("Unable to delete lock " + getLock() + "on disk", io);
}
} catch (IOException io) {
throw new HoodieIOException("Unable to delete lock on disk", io);
}
}

@Override
public String getLock() {
return this.lockFile.toString();
}

private void acquireLock() {
try {
return fs.listStatus(new Path(lockPath))[0].getPath().toString();
} catch (Exception e) {
throw new HoodieLockException("Failed to retrieve lock status from lock path " + lockPath);
fs.create(this.lockFile, false).close();
} catch (IOException e) {
throw new HoodieIOException("Failed to acquire lock: " + getLock(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
Expand All @@ -40,7 +41,6 @@
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
Expand All @@ -53,6 +53,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -61,6 +62,7 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -96,49 +98,72 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10");
HoodieWriteConfig cfg = getConfigBuilder()
HoodieWriteConfig writeConfig = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build()).withAutoCommit(false).withProperties(properties).build();

// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
ExecutorService executors = Executors.newFixedThreadPool(2);
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
AtomicBoolean writer1Conflict = new AtomicBoolean(false);
AtomicBoolean writer2Conflict = new AtomicBoolean(false);
createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200);

final int threadCount = 2;
final ExecutorService executors = Executors.newFixedThreadPool(2);
final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig);

final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);

Future future1 = executors.submit(() -> {
String newCommitTime = "004";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e1) {
assertTrue(e1 instanceof HoodieWriteConflictException);
writer1Conflict.set(true);
final String nextCommitTime = "002";
final JavaRDD<WriteStatus> writeStatusList = startCommitForUpdate(writeConfig, client1, nextCommitTime, 100);

// Wait for the 2nd writer to start the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);

// Commit the update before the 2nd writer
assertDoesNotThrow(() -> {
client1.commit(nextCommitTime, writeStatusList);
});

// Signal the 2nd writer to go ahead for his commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
writer1Completed.set(true);
} catch (Exception e) {
writer1Completed.set(false);
}
});

Future future2 = executors.submit(() -> {
String newCommitTime = "005";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e2) {
assertTrue(e2 instanceof HoodieWriteConflictException);
writer2Conflict.set(true);
final String nextCommitTime = "003";

// Wait for the 1st writer to make progress with the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
final JavaRDD<WriteStatus> writeStatusList = startCommitForUpdate(writeConfig, client2, nextCommitTime, 100);

// Wait for the 1st writer to complete the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
assertThrows(HoodieWriteConflictException.class, () -> {
client2.commit(nextCommitTime, writeStatusList);
});
writer2Completed.set(true);
} catch (Exception e) {
writer2Completed.set(false);
}
});

future1.get();
future2.get();
Assertions.assertTrue(writer1Conflict.get() || writer2Conflict.get(), "Either of writer1 or writer2 should have failed "
+ "with conflict");
Assertions.assertFalse(writer1Conflict.get() && writer2Conflict.get(), "Both writer1 and writer2 should not result "
+ "in conflict");

// both should have been completed successfully. I mean, we already assert for conflict for writer2 at L155.
assertTrue(writer1Completed.get() && writer2Completed.get());
}

@Test
Expand Down Expand Up @@ -443,4 +468,33 @@ private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient
numRecords, 200, 2);
client.commit(newCommitTime, result);
}

/**
* Start the commit for an update operation with given number of records
*
* @param writeConfig - Write config
* @param writeClient - Write client for starting the commit
* @param newCommitTime - Commit time for the update
* @param numRecords - Number of records to update
* @return RDD of write status from the update
* @throws Exception
*/
private JavaRDD<WriteStatus> startCommitForUpdate(HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient,
String newCommitTime, int numRecords) throws Exception {
// Start the new commit
writeClient.startCommitWithTime(newCommitTime);

// Prepare update records
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
generateWrapRecordsFn(false, writeConfig, dataGen::generateUniqueUpdates);
final List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecords);
final JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);

// Write updates
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = SparkRDDWriteClient::upsert;
JavaRDD<WriteStatus> result = writeFn.apply(writeClient, writeRecords, newCommitTime);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
return result;
}
}