diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java index 9eab061ddfa29..8e57190d1a9b9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/InProcessLockProvider.java @@ -124,7 +124,6 @@ public void close() { lock.writeLock().unlock(); } LOG.info(getLogMessage(LockState.ALREADY_RELEASED)); - LOCK_INSTANCE_PER_BASEPATH.remove(basePath); } private String getLogMessage(LockState state) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java index 8d39b8b5f2da0..d1d43d7f3ae0b 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestInProcessLockProvider.java @@ -24,12 +24,15 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieLockException; +import junit.framework.AssertionFailedError; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,6 +55,120 @@ public TestInProcessLockProvider() { lockConfiguration2 = new LockConfiguration(properties); } + @Test + public void testLockIdentity() throws InterruptedException { + // The lifecycle of an InProcessLockProvider should not affect the singleton lock + // for a single table, i.e., all three writers should hold the same underlying lock instance + // on the same table. + // Writer 1: lock |----------------| unlock and close + // Writer 2: try lock | ... lock |------| unlock and close + // Writer 3: try lock | ... lock |------| unlock and close + List lockProviderList = new ArrayList<>(); + InProcessLockProvider lockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + lockProviderList.add(lockProvider1); + AtomicBoolean writer1Completed = new AtomicBoolean(false); + AtomicBoolean writer2TryLock = new AtomicBoolean(false); + AtomicBoolean writer2Locked = new AtomicBoolean(false); + AtomicBoolean writer2Completed = new AtomicBoolean(false); + AtomicBoolean writer3TryLock = new AtomicBoolean(false); + AtomicBoolean writer3Completed = new AtomicBoolean(false); + + // Writer 1 + assertDoesNotThrow(() -> { + LOG.info("Writer 1 tries to acquire the lock."); + lockProvider1.lock(); + LOG.info("Writer 1 acquires the lock."); + }); + // Writer 2 thread in parallel, should block + // and later acquire the lock once it is released + Thread writer2 = new Thread(() -> { + InProcessLockProvider lockProvider2 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + lockProviderList.add(lockProvider2); + assertDoesNotThrow(() -> { + LOG.info("Writer 2 tries to acquire the lock."); + writer2TryLock.set(true); + lockProvider2.lock(); + LOG.info("Writer 2 acquires the lock."); + }); + writer2Locked.set(true); + + while (!writer3TryLock.get()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + assertDoesNotThrow(() -> { + lockProvider2.unlock(); + LOG.info("Writer 2 releases the lock."); + }); + lockProvider2.close(); + LOG.info("Writer 2 closes the lock provider."); + writer2Completed.set(true); + }); + + Thread writer3 = new Thread(() -> { + while (!writer2Locked.get() || !writer1Completed.get()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + // Lock instance of Writer 3 should be held by Writer 2 + InProcessLockProvider lockProvider3 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration); + lockProviderList.add(lockProvider3); + boolean isLocked = lockProvider3.getLock().isWriteLocked(); + if (!isLocked) { + writer3TryLock.set(true); + throw new AssertionFailedError("The lock instance in Writer 3 should be held by Writer 2: " + + lockProvider3.getLock()); + } + assertDoesNotThrow(() -> { + LOG.info("Writer 3 tries to acquire the lock."); + writer3TryLock.set(true); + lockProvider3.lock(); + LOG.info("Writer 3 acquires the lock."); + }); + + assertDoesNotThrow(() -> { + lockProvider3.unlock(); + LOG.info("Writer 3 releases the lock."); + }); + lockProvider3.close(); + LOG.info("Writer 3 closes the lock provider."); + writer3Completed.set(true); + }); + + writer2.start(); + writer3.start(); + + while (!writer2TryLock.get()) { + Thread.sleep(100); + } + + assertDoesNotThrow(() -> { + lockProvider1.unlock(); + LOG.info("Writer 1 releases the lock."); + lockProvider1.close(); + LOG.info("Writer 1 closes the lock provider."); + writer1Completed.set(true); + }); + + try { + writer2.join(); + writer3.join(); + } catch (InterruptedException e) { + // Ignore any exception + } + Assertions.assertTrue(writer2Completed.get()); + Assertions.assertTrue(writer3Completed.get()); + Assertions.assertEquals(lockProviderList.get(0).getLock(), lockProviderList.get(1).getLock()); + Assertions.assertEquals(lockProviderList.get(1).getLock(), lockProviderList.get(2).getLock()); + } + @Test public void testLockAcquisition() { InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);