Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -32,12 +33,15 @@
import org.jetbrains.annotations.NotNull;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* InProcess level lock. This {@link LockProvider} implementation is to
* guard table from concurrent operations happening in the local JVM process.
* A separate lock is maintained per "table basepath".
* <p>
* Note: This Lock provider implementation doesn't allow lock reentrancy.
* Attempting to reacquire the lock from the same thread will throw
Expand All @@ -47,22 +51,27 @@
public class InProcessLockProvider implements LockProvider<ReentrantReadWriteLock>, Serializable {

private static final Logger LOG = LogManager.getLogger(InProcessLockProvider.class);
private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
private static final Map<String, ReentrantReadWriteLock> LOCK_INSTANCE_PER_BASEPATH = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock lock;
private final String basePath;
private final long maxWaitTimeMillis;

public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
TypedProperties typedProperties = lockConfiguration.getConfig();
basePath = lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key());
ValidationUtils.checkArgument(basePath != null);
lock = LOCK_INSTANCE_PER_BASEPATH.computeIfAbsent(basePath, (ignore) -> new ReentrantReadWriteLock());
maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS);
}

@Override
public void lock() {
LOG.info(getLogMessage(LockState.ACQUIRING));
if (LOCK.isWriteLockedByCurrentThread()) {
if (lock.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}
LOCK.writeLock().lock();
lock.writeLock().lock();
LOG.info(getLogMessage(LockState.ACQUIRED));
}

Expand All @@ -74,13 +83,13 @@ public boolean tryLock() {
@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) {
LOG.info(getLogMessage(LockState.ACQUIRING));
if (LOCK.isWriteLockedByCurrentThread()) {
if (lock.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}

boolean isLockAcquired;
try {
isLockAcquired = LOCK.writeLock().tryLock(time, unit);
isLockAcquired = lock.writeLock().tryLock(time, unit);
} catch (InterruptedException e) {
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE));
}
Expand All @@ -93,8 +102,8 @@ public boolean tryLock(long time, @NotNull TimeUnit unit) {
public void unlock() {
LOG.info(getLogMessage(LockState.RELEASING));
try {
if (LOCK.isWriteLockedByCurrentThread()) {
LOCK.writeLock().unlock();
if (lock.isWriteLockedByCurrentThread()) {
lock.writeLock().unlock();
LOG.info(getLogMessage(LockState.RELEASED));
} else {
LOG.warn("Cannot unlock because the current thread does not hold the lock.");
Expand All @@ -106,18 +115,19 @@ public void unlock() {

@Override
public ReentrantReadWriteLock getLock() {
return LOCK;
return lock;
}

@Override
public void close() {
if (LOCK.isWriteLockedByCurrentThread()) {
LOCK.writeLock().unlock();
if (lock.isWriteLockedByCurrentThread()) {
lock.writeLock().unlock();
}
LOG.info(getLogMessage(LockState.ALREADY_RELEASED));
LOCK_INSTANCE_PER_BASEPATH.remove(basePath);
}

private String getLogMessage(LockState state) {
return StringUtils.join("Thread ", String.valueOf(Thread.currentThread().getName()), " ",
state.name(), " in-process lock.");
return String.format("Base Path %s, Lock Instance %s, Thread %s, In-process lock state %s", basePath, getLock().toString(), Thread.currentThread().getName(), state.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -39,11 +40,20 @@ public class TestInProcessLockProvider {

private static final Logger LOG = LogManager.getLogger(TestInProcessLockProvider.class);
private final Configuration hadoopConfiguration = new Configuration();
private final LockConfiguration lockConfiguration = new LockConfiguration(new TypedProperties());
private final LockConfiguration lockConfiguration1;
private final LockConfiguration lockConfiguration2;

public TestInProcessLockProvider() {
TypedProperties properties = new TypedProperties();
properties.put(HoodieWriteConfig.BASE_PATH.key(), "table1");
lockConfiguration1 = new LockConfiguration(properties);
properties.put(HoodieWriteConfig.BASE_PATH.key(), "table2");
lockConfiguration2 = new LockConfiguration(properties);
}

@Test
public void testLockAcquisition() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
Expand All @@ -54,7 +64,7 @@ public void testLockAcquisition() {

@Test
public void testLockReAcquisitionBySameThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
Expand All @@ -66,9 +76,34 @@ public void testLockReAcquisitionBySameThread() {
});
}

@Test
public void testLockReAcquisitionBySameThreadWithTwoTables() {
InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration);

assertDoesNotThrow(() -> {
inProcessLockProvider1.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider2.lock();
});
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider2.lock();
});
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider1.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider1.unlock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider2.unlock();
});
}

@Test
public void testLockReAcquisitionByDifferentThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);

// Main test thread
Expand Down Expand Up @@ -104,9 +139,72 @@ public void run() {
Assertions.assertTrue(writer2Completed.get());
}

@Test
public void testLockReAcquisitionByDifferentThreadWithTwoTables() {
InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration);

final AtomicBoolean writer2Stream1Completed = new AtomicBoolean(false);
final AtomicBoolean writer2Stream2Completed = new AtomicBoolean(false);

// Main test thread
assertDoesNotThrow(() -> {
inProcessLockProvider1.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider2.lock();
});

// Another writer thread in parallel, should block
// and later acquire the lock once it is released
Thread writer2Stream1 = new Thread(new Runnable() {
@Override
public void run() {
assertDoesNotThrow(() -> {
inProcessLockProvider1.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider1.unlock();
});
writer2Stream1Completed.set(true);
}
});
Thread writer2Stream2 = new Thread(new Runnable() {
@Override
public void run() {
assertDoesNotThrow(() -> {
inProcessLockProvider2.lock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider2.unlock();
});
writer2Stream2Completed.set(true);
}
});

writer2Stream1.start();
writer2Stream2.start();

assertDoesNotThrow(() -> {
inProcessLockProvider1.unlock();
});
assertDoesNotThrow(() -> {
inProcessLockProvider2.unlock();
});

try {
writer2Stream1.join();
writer2Stream2.join();
} catch (InterruptedException e) {
//
}
Assertions.assertTrue(writer2Stream1Completed.get());
Assertions.assertTrue(writer2Stream2Completed.get());
}

@Test
public void testTryLockAcquisition() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock());
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
Expand All @@ -115,7 +213,7 @@ public void testTryLockAcquisition() {

@Test
public void testTryLockAcquisitionWithTimeout() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS));
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
Expand All @@ -124,7 +222,7 @@ public void testTryLockAcquisitionWithTimeout() {

@Test
public void testTryLockReAcquisitionBySameThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock());
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS);
Expand All @@ -136,7 +234,7 @@ public void testTryLockReAcquisitionBySameThread() {

@Test
public void testTryLockReAcquisitionByDifferentThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);

// Main test thread
Expand All @@ -162,7 +260,7 @@ public void testTryLockReAcquisitionByDifferentThread() {

@Test
public void testTryUnLockByDifferentThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer3Completed = new AtomicBoolean(false);

// Main test thread
Expand Down Expand Up @@ -203,7 +301,7 @@ public void testTryUnLockByDifferentThread() {

@Test
public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final int threadCount = 3;
final long awaitMaxTimeoutMs = 2000L;
final CountDownLatch latch = new CountDownLatch(threadCount);
Expand Down Expand Up @@ -261,7 +359,7 @@ public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {

@Test
public void testLockReleaseByClose() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
Expand All @@ -272,7 +370,7 @@ public void testLockReleaseByClose() {

@Test
public void testRedundantUnlock() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
Expand All @@ -286,7 +384,7 @@ public void testRedundantUnlock() {

@Test
public void testUnlockWithoutLock() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
Expand Down