diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
index c3cd5742482a0..2a60473c82c8d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java
@@ -23,7 +23,8 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
-import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hadoop.conf.Configuration;
@@ -32,12 +33,15 @@
import org.jetbrains.annotations.NotNull;
import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* InProcess level lock. This {@link LockProvider} implementation is to
* guard table from concurrent operations happening in the local JVM process.
+ * A separate lock is maintained per "table basepath".
*
* Note: This Lock provider implementation doesn't allow lock reentrancy.
* Attempting to reacquire the lock from the same thread will throw
@@ -47,11 +51,16 @@
public class InProcessLockProvider implements LockProvider, Serializable {
private static final Logger LOG = LogManager.getLogger(InProcessLockProvider.class);
- private static final ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();
+ private static final Map LOCK_INSTANCE_PER_BASEPATH = new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock lock;
+ private final String basePath;
private final long maxWaitTimeMillis;
public InProcessLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
TypedProperties typedProperties = lockConfiguration.getConfig();
+ basePath = lockConfiguration.getConfig().getProperty(HoodieWriteConfig.BASE_PATH.key());
+ ValidationUtils.checkArgument(basePath != null);
+ lock = LOCK_INSTANCE_PER_BASEPATH.computeIfAbsent(basePath, (ignore) -> new ReentrantReadWriteLock());
maxWaitTimeMillis = typedProperties.getLong(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS);
}
@@ -59,10 +68,10 @@ public InProcessLockProvider(final LockConfiguration lockConfiguration, final Co
@Override
public void lock() {
LOG.info(getLogMessage(LockState.ACQUIRING));
- if (LOCK.isWriteLockedByCurrentThread()) {
+ if (lock.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}
- LOCK.writeLock().lock();
+ lock.writeLock().lock();
LOG.info(getLogMessage(LockState.ACQUIRED));
}
@@ -74,13 +83,13 @@ public boolean tryLock() {
@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) {
LOG.info(getLogMessage(LockState.ACQUIRING));
- if (LOCK.isWriteLockedByCurrentThread()) {
+ if (lock.isWriteLockedByCurrentThread()) {
throw new HoodieLockException(getLogMessage(LockState.ALREADY_ACQUIRED));
}
boolean isLockAcquired;
try {
- isLockAcquired = LOCK.writeLock().tryLock(time, unit);
+ isLockAcquired = lock.writeLock().tryLock(time, unit);
} catch (InterruptedException e) {
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_ACQUIRE));
}
@@ -93,8 +102,8 @@ public boolean tryLock(long time, @NotNull TimeUnit unit) {
public void unlock() {
LOG.info(getLogMessage(LockState.RELEASING));
try {
- if (LOCK.isWriteLockedByCurrentThread()) {
- LOCK.writeLock().unlock();
+ if (lock.isWriteLockedByCurrentThread()) {
+ lock.writeLock().unlock();
LOG.info(getLogMessage(LockState.RELEASED));
} else {
LOG.warn("Cannot unlock because the current thread does not hold the lock.");
@@ -106,18 +115,19 @@ public void unlock() {
@Override
public ReentrantReadWriteLock getLock() {
- return LOCK;
+ return lock;
}
@Override
public void close() {
- if (LOCK.isWriteLockedByCurrentThread()) {
- LOCK.writeLock().unlock();
+ if (lock.isWriteLockedByCurrentThread()) {
+ lock.writeLock().unlock();
}
+ LOG.info(getLogMessage(LockState.ALREADY_RELEASED));
+ LOCK_INSTANCE_PER_BASEPATH.remove(basePath);
}
private String getLogMessage(LockState state) {
- return StringUtils.join("Thread ", String.valueOf(Thread.currentThread().getName()), " ",
- state.name(), " in-process lock.");
+ return String.format("Base Path %s, Lock Instance %s, Thread %s, In-process lock state %s", basePath, getLock().toString(), Thread.currentThread().getName(), state.name());
}
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
index 6d6c526785dc5..99ab0887e7033 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java
@@ -22,6 +22,7 @@
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -39,11 +40,20 @@ public class TestInProcessLockProvider {
private static final Logger LOG = LogManager.getLogger(TestInProcessLockProvider.class);
private final Configuration hadoopConfiguration = new Configuration();
- private final LockConfiguration lockConfiguration = new LockConfiguration(new TypedProperties());
+ private final LockConfiguration lockConfiguration1;
+ private final LockConfiguration lockConfiguration2;
+
+ public TestInProcessLockProvider() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(HoodieWriteConfig.BASE_PATH.key(), "table1");
+ lockConfiguration1 = new LockConfiguration(properties);
+ properties.put(HoodieWriteConfig.BASE_PATH.key(), "table2");
+ lockConfiguration2 = new LockConfiguration(properties);
+ }
@Test
public void testLockAcquisition() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
@@ -54,7 +64,7 @@ public void testLockAcquisition() {
@Test
public void testLockReAcquisitionBySameThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
@@ -66,9 +76,34 @@ public void testLockReAcquisitionBySameThread() {
});
}
+ @Test
+ public void testLockReAcquisitionBySameThreadWithTwoTables() {
+ InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration);
+
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.lock();
+ });
+ assertThrows(HoodieLockException.class, () -> {
+ inProcessLockProvider2.lock();
+ });
+ assertThrows(HoodieLockException.class, () -> {
+ inProcessLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.unlock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.unlock();
+ });
+ }
+
@Test
public void testLockReAcquisitionByDifferentThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
// Main test thread
@@ -104,9 +139,72 @@ public void run() {
Assertions.assertTrue(writer2Completed.get());
}
+ @Test
+ public void testLockReAcquisitionByDifferentThreadWithTwoTables() {
+ InProcessLockProvider inProcessLockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider2 = new InProcessLockProvider(lockConfiguration2, hadoopConfiguration);
+
+ final AtomicBoolean writer2Stream1Completed = new AtomicBoolean(false);
+ final AtomicBoolean writer2Stream2Completed = new AtomicBoolean(false);
+
+ // Main test thread
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.lock();
+ });
+
+ // Another writer thread in parallel, should block
+ // and later acquire the lock once it is released
+ Thread writer2Stream1 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.unlock();
+ });
+ writer2Stream1Completed.set(true);
+ }
+ });
+ Thread writer2Stream2 = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.lock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.unlock();
+ });
+ writer2Stream2Completed.set(true);
+ }
+ });
+
+ writer2Stream1.start();
+ writer2Stream2.start();
+
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider1.unlock();
+ });
+ assertDoesNotThrow(() -> {
+ inProcessLockProvider2.unlock();
+ });
+
+ try {
+ writer2Stream1.join();
+ writer2Stream2.join();
+ } catch (InterruptedException e) {
+ //
+ }
+ Assertions.assertTrue(writer2Stream1Completed.get());
+ Assertions.assertTrue(writer2Stream2Completed.get());
+ }
+
@Test
public void testTryLockAcquisition() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock());
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
@@ -115,7 +213,7 @@ public void testTryLockAcquisition() {
@Test
public void testTryLockAcquisitionWithTimeout() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS));
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
@@ -124,7 +222,7 @@ public void testTryLockAcquisitionWithTimeout() {
@Test
public void testTryLockReAcquisitionBySameThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Assertions.assertTrue(inProcessLockProvider.tryLock());
assertThrows(HoodieLockException.class, () -> {
inProcessLockProvider.tryLock(1, TimeUnit.MILLISECONDS);
@@ -136,7 +234,7 @@ public void testTryLockReAcquisitionBySameThread() {
@Test
public void testTryLockReAcquisitionByDifferentThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
// Main test thread
@@ -162,7 +260,7 @@ public void testTryLockReAcquisitionByDifferentThread() {
@Test
public void testTryUnLockByDifferentThread() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final AtomicBoolean writer3Completed = new AtomicBoolean(false);
// Main test thread
@@ -203,7 +301,7 @@ public void testTryUnLockByDifferentThread() {
@Test
public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
- final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
final int threadCount = 3;
final long awaitMaxTimeoutMs = 2000L;
final CountDownLatch latch = new CountDownLatch(threadCount);
@@ -261,7 +359,7 @@ public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
@Test
public void testLockReleaseByClose() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
@@ -272,7 +370,7 @@ public void testLockReleaseByClose() {
@Test
public void testRedundantUnlock() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.lock();
});
@@ -286,7 +384,7 @@ public void testRedundantUnlock() {
@Test
public void testUnlockWithoutLock() {
- InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
+ InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});