Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public void close() {
lock.writeLock().unlock();
}
LOG.info(getLogMessage(LockState.ALREADY_RELEASED));
LOCK_INSTANCE_PER_BASEPATH.remove(basePath);
}

private String getLogMessage(LockState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;

import junit.framework.AssertionFailedError;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -52,6 +55,120 @@ public TestInProcessLockProvider() {
lockConfiguration2 = new LockConfiguration(properties);
}

@Test
public void testLockIdentity() throws InterruptedException {
// The lifecycle of an InProcessLockProvider should not affect the singleton lock
// for a single table, i.e., all three writers should hold the same underlying lock instance
// on the same table.
// Writer 1: lock |----------------| unlock and close
// Writer 2: try lock | ... lock |------| unlock and close
// Writer 3: try lock | ... lock |------| unlock and close
List<InProcessLockProvider> lockProviderList = new ArrayList<>();
InProcessLockProvider lockProvider1 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
lockProviderList.add(lockProvider1);
AtomicBoolean writer1Completed = new AtomicBoolean(false);
AtomicBoolean writer2TryLock = new AtomicBoolean(false);
AtomicBoolean writer2Locked = new AtomicBoolean(false);
AtomicBoolean writer2Completed = new AtomicBoolean(false);
AtomicBoolean writer3TryLock = new AtomicBoolean(false);
AtomicBoolean writer3Completed = new AtomicBoolean(false);

// Writer 1
assertDoesNotThrow(() -> {
LOG.info("Writer 1 tries to acquire the lock.");
lockProvider1.lock();
LOG.info("Writer 1 acquires the lock.");
});
// Writer 2 thread in parallel, should block
// and later acquire the lock once it is released
Thread writer2 = new Thread(() -> {
InProcessLockProvider lockProvider2 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
lockProviderList.add(lockProvider2);
assertDoesNotThrow(() -> {
LOG.info("Writer 2 tries to acquire the lock.");
writer2TryLock.set(true);
lockProvider2.lock();
LOG.info("Writer 2 acquires the lock.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CDL for writer2 lock acquisition : count down by 1.

});
writer2Locked.set(true);

while (!writer3TryLock.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CDL for writer3 start. await for it

try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

assertDoesNotThrow(() -> {
lockProvider2.unlock();
LOG.info("Writer 2 releases the lock.");
});
lockProvider2.close();
LOG.info("Writer 2 closes the lock provider.");
writer2Completed.set(true);
});

Thread writer3 = new Thread(() -> {
while (!writer2Locked.get() || !writer1Completed.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait for CDL for writer2 lock.
CDL for writer3 start : count down by 1

try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// Lock instance of Writer 3 should be held by Writer 2
InProcessLockProvider lockProvider3 = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
lockProviderList.add(lockProvider3);
boolean isLocked = lockProvider3.getLock().isWriteLocked();
if (!isLocked) {
writer3TryLock.set(true);
throw new AssertionFailedError("The lock instance in Writer 3 should be held by Writer 2: "
+ lockProvider3.getLock());
}
assertDoesNotThrow(() -> {
LOG.info("Writer 3 tries to acquire the lock.");
writer3TryLock.set(true);
lockProvider3.lock();
LOG.info("Writer 3 acquires the lock.");
});

assertDoesNotThrow(() -> {
lockProvider3.unlock();
LOG.info("Writer 3 releases the lock.");
});
lockProvider3.close();
LOG.info("Writer 3 closes the lock provider.");
writer3Completed.set(true);
});

writer2.start();
writer3.start();

while (!writer2TryLock.get()) {
Thread.sleep(100);
}

assertDoesNotThrow(() -> {
lockProvider1.unlock();
LOG.info("Writer 1 releases the lock.");
lockProvider1.close();
LOG.info("Writer 1 closes the lock provider.");
writer1Completed.set(true);
});

try {
writer2.join();
writer3.join();
} catch (InterruptedException e) {
// Ignore any exception
}
Assertions.assertTrue(writer2Completed.get());
Assertions.assertTrue(writer3Completed.get());
Assertions.assertEquals(lockProviderList.get(0).getLock(), lockProviderList.get(1).getLock());
Assertions.assertEquals(lockProviderList.get(1).getLock(), lockProviderList.get(2).getLock());
}

@Test
public void testLockAcquisition() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration1, hadoopConfiguration);
Expand Down