diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index b200c0b5bf1f..462b1e4331f4 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -4838,4 +4838,14 @@ warm up edek cache if none of key successful on OM start up. + + ozone.om.hierarchical.resource.locks.soft.limit + 1024 + Soft limit for number of lock objects that could be idle in the pool. + + + ozone.om.hierarchical.resource.locks.hard.limit + 10000 + Maximum number of lock objects that could be present in the pool. + diff --git a/hadoop-ozone/common/pom.xml b/hadoop-ozone/common/pom.xml index cb082b9d6c44..1ecafebb8b3f 100644 --- a/hadoop-ozone/common/pom.xml +++ b/hadoop-ozone/common/pom.xml @@ -77,6 +77,10 @@ org.apache.commons commons-lang3 + + org.apache.commons + commons-pool2 + org.apache.hadoop hadoop-common diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 969288ed92c8..254a49ea9a99 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -674,6 +674,14 @@ public final class OMConfigKeys { "ozone.om.snapshot.compact.non.snapshot.diff.tables"; public static final boolean OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT = false; + public static final String OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT = + "ozone.om.hierarchical.resource.locks.soft.limit"; + public static final int OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT_DEFAULT = 1024; + + public static final String OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT = + "ozone.om.hierarchical.resource.locks.hard.limit"; + public static final int OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT_DEFAULT = 10000; + /** * Never constructed. */ diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierachicalResourceLockManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java similarity index 97% rename from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierachicalResourceLockManager.java rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java index 0cc8df45e2c7..d34b199113c9 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierachicalResourceLockManager.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/HierarchicalResourceLockManager.java @@ -25,7 +25,7 @@ * there is no cyclic lock ordering on resources. * Typically, this can be used for locking elements which form a DAG like structure.(E.g. FSO tree, Snapshot chain etc.) */ -public interface HierachicalResourceLockManager extends AutoCloseable { +public interface HierarchicalResourceLockManager extends AutoCloseable { /** * Acquires a read lock on the specified resource using the provided key. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java new file mode 100644 index 000000000000..d601e31e6343 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/PoolBasedHierarchicalResourceLockManager.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT_DEFAULT; + +import com.google.common.base.Preconditions; +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +/** + * A lock manager implementation that manages hierarchical resource locks + * using a pool of reusable {@link ReadWriteLock} instances. The implementation + * ensures deterministic lock ordering for resources, avoiding cyclic + * lock dependencies, and is typically useful for structures like + * DAGs (e.g., File System trees or snapshot chains). + */ +public class PoolBasedHierarchicalResourceLockManager implements HierarchicalResourceLockManager { + private final GenericObjectPool lockPool; + private final Map> lockMap; + + public PoolBasedHierarchicalResourceLockManager(OzoneConfiguration conf) { + int softLimit = conf.getInt(OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT, + OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT_DEFAULT); + int hardLimit = conf.getInt(OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT, + OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT_DEFAULT); + GenericObjectPoolConfig config = new GenericObjectPoolConfig<>(); + config.setMaxIdle(softLimit); + config.setMaxTotal(hardLimit); + config.setBlockWhenExhausted(true); + this.lockPool = new GenericObjectPool<>(new ReadWriteLockFactory(), config); + this.lockMap = new ConcurrentHashMap<>(); + } + + private ReadWriteLock operateOnLock(FlatResource resource, String key, Consumer function) + throws IOException { + AtomicReference exception = new AtomicReference<>(); + Map resourceLockMap = + this.lockMap.computeIfAbsent(resource, k -> new ConcurrentHashMap<>()); + LockReferenceCountPair lockRef = resourceLockMap.compute(key, (k, v) -> { + if (v == null) { + try { + ReadWriteLock readWriteLock = this.lockPool.borrowObject(); + v = new LockReferenceCountPair(readWriteLock); + } catch (Exception e) { + exception.set(new IOException("Exception while initializing lock object.", e)); + return null; + } + } + function.accept(v); + Preconditions.checkState(v.getCount() >= 0); + if (v.getCount() == 0) { + this.lockPool.returnObject(v.getLock()); + return null; + } + return v; + }); + if (exception.get() != null) { + throw exception.get(); + } + return lockRef == null ? null : lockRef.getLock(); + } + + @Override + public HierarchicalResourceLock acquireReadLock(FlatResource resource, String key) throws IOException { + return acquireLock(resource, key, true); + } + + @Override + public HierarchicalResourceLock acquireWriteLock(FlatResource resource, String key) throws IOException { + return acquireLock(resource, key, false); + } + + private HierarchicalResourceLock acquireLock(FlatResource resource, String key, boolean isReadLock) + throws IOException { + ReadWriteLock readWriteLock = operateOnLock(resource, key, LockReferenceCountPair::increment); + if (readWriteLock == null) { + throw new IOException("Unable to acquire " + (isReadLock ? "read" : "write") + " lock on resource " + + resource + " and key " + key); + } + return new PoolBasedHierarchicalResourceLock(resource, key, + isReadLock ? readWriteLock.readLock() : readWriteLock.writeLock()); + } + + @Override + public void close() { + this.lockPool.close(); + } + + /** + * Represents a hierarchical resource lock mechanism that operates + * using a resource pool for acquiring and releasing locks. This class + * provides thread-safe management of read and write locks associated + * with specific hierarchical resources. + * + * A lock can either be a read lock or a write lock. This is determined + * at the time of instantiation. The lifecycle of the lock is managed + * through this class, and the lock is automatically released when the + * `close` method is invoked. + * + * This is designed to work in conjunction with the containing manager + * class, {@code PoolBasedHierarchicalResourceLockManager}, which oversees + * the lifecycle of multiple such locks. + */ + public class PoolBasedHierarchicalResourceLock implements HierarchicalResourceLock, Closeable { + + private boolean isLockAcquired; + private final Lock lock; + private final FlatResource resource; + private final String key; + + public PoolBasedHierarchicalResourceLock(FlatResource resource, String key, Lock lock) { + this.isLockAcquired = true; + this.lock = lock; + this.resource = resource; + this.key = key; + this.lock.lock(); + } + + @Override + public boolean isLockAcquired() { + return isLockAcquired; + } + + @Override + public synchronized void close() throws IOException { + if (isLockAcquired) { + this.lock.unlock(); + operateOnLock(resource, key, (LockReferenceCountPair::decrement)); + isLockAcquired = false; + } + } + } + + private static final class LockReferenceCountPair { + private int count; + private ReadWriteLock lock; + + private LockReferenceCountPair(ReadWriteLock lock) { + this.count = 0; + this.lock = lock; + } + + private void increment() { + count++; + } + + private void decrement() { + count--; + } + + private int getCount() { + return count; + } + + private ReadWriteLock getLock() { + return lock; + } + } + + private static class ReadWriteLockFactory extends BasePooledObjectFactory { + + @Override + public ReadWriteLock create() throws Exception { + return new ReentrantReadWriteLock(); + } + + @Override + public PooledObject wrap(ReadWriteLock obj) { + return new DefaultPooledObject<>(obj); + } + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java new file mode 100644 index 000000000000..19e114ae52ec --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/ReadOnlyHierarchicalResourceLockManager.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.io.IOException; + +/** + * A read only lock manager that does not acquire any lock. + */ +public class ReadOnlyHierarchicalResourceLockManager implements HierarchicalResourceLockManager { + + private static final HierarchicalResourceLock EMPTY_LOCK_ACQUIRED = new HierarchicalResourceLock() { + @Override + public boolean isLockAcquired() { + return true; + } + + @Override + public void close() { + + } + }; + + private static final HierarchicalResourceLock EMPTY_LOCK_NOT_ACQUIRED = new HierarchicalResourceLock() { + @Override + public boolean isLockAcquired() { + return false; + } + + @Override + public void close() { + } + }; + + @Override + public HierarchicalResourceLock acquireReadLock(FlatResource resource, String key) throws IOException { + return EMPTY_LOCK_ACQUIRED; + } + + @Override + public HierarchicalResourceLock acquireWriteLock(FlatResource resource, String key) throws IOException { + return EMPTY_LOCK_NOT_ACQUIRED; + } + + @Override + public void close() throws Exception { + + } +} diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java new file mode 100644 index 000000000000..d9edd003504c --- /dev/null +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestPoolBasedHierarchicalResourceLockManager.java @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager.HierarchicalResourceLock; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * Test class for {@link PoolBasedHierarchicalResourceLockManager}. + * + * This class tests the functionality of the pool-based hierarchical resource lock manager, + * including basic lock operations, concurrency scenarios, resource pool management, + * and error conditions. + */ +public class TestPoolBasedHierarchicalResourceLockManager { + + private PoolBasedHierarchicalResourceLockManager lockManager; + + @BeforeEach + public void setUp() { + OzoneConfiguration conf = new OzoneConfiguration(); + lockManager = new PoolBasedHierarchicalResourceLockManager(conf); + } + + @AfterEach + public void tearDown() { + if (lockManager != null) { + lockManager.close(); + } + } + + /** + * Test basic read lock acquisition and release. + */ + @Test + public void testBasicReadLockAcquisition() throws Exception { + String key = "test-key-1"; + + try (HierarchicalResourceLock lock = lockManager.acquireReadLock(FlatResource.SNAPSHOT_GC_LOCK, key)) { + assertNotNull(lock); + assertTrue(lock.isLockAcquired()); + } + } + + /** + * Test basic write lock acquisition and release. + */ + @Test + public void testBasicWriteLockAcquisition() throws Exception { + String key = "test-key-2"; + + try (HierarchicalResourceLock lock = lockManager.acquireWriteLock(FlatResource.SNAPSHOT_DB_LOCK, key)) { + assertNotNull(lock); + assertTrue(lock.isLockAcquired()); + } + } + + /** + * Test multiple read locks can be acquired on the same resource. + */ + @Test + public void testMultipleReadLocks() throws Exception { + String key = "test-key-3"; + + try (HierarchicalResourceLock lock1 = lockManager.acquireReadLock(FlatResource.SNAPSHOT_GC_LOCK, key); + HierarchicalResourceLock lock2 = lockManager.acquireReadLock(FlatResource.SNAPSHOT_GC_LOCK, key)) { + + assertNotNull(lock1); + assertNotNull(lock2); + assertTrue(lock1.isLockAcquired()); + assertTrue(lock2.isLockAcquired()); + } + } + + /** + * Test write lock exclusivity - only one write lock can be acquired at a time. + */ + @Test + @Timeout(10) + public void testWriteLockExclusivity() throws Exception { + String key = "test-key-4"; + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + AtomicBoolean secondLockAcquired = new AtomicBoolean(false); + + ExecutorService executor = Executors.newFixedThreadPool(2); + + try { + // First thread acquires write lock + CompletableFuture future1 = CompletableFuture.runAsync(() -> { + try (HierarchicalResourceLock lock = lockManager.acquireWriteLock(FlatResource.SNAPSHOT_DB_LOCK, key)) { + latch1.countDown(); + // Hold lock for a short time + Thread.sleep(100); + } catch (Exception e) { + fail("First thread failed to acquire lock: " + e.getMessage()); + } + }, executor); + + // Wait for first lock to be acquired + latch1.await(); + + // Second thread tries to acquire write lock + CompletableFuture future2 = CompletableFuture.runAsync(() -> { + try (HierarchicalResourceLock lock = lockManager.acquireWriteLock(FlatResource.SNAPSHOT_DB_LOCK, key)) { + secondLockAcquired.set(true); + latch2.countDown(); + } catch (Exception e) { + fail("Second thread failed to acquire lock: " + e.getMessage()); + } + }, executor); + + // Wait for both threads to complete + future1.get(5, TimeUnit.SECONDS); + future2.get(5, TimeUnit.SECONDS); + + // Second lock should have been acquired after first was released + assertTrue(secondLockAcquired.get()); + + } finally { + executor.shutdown(); + } + } + + /** + * Test read-write lock interaction - write lock blocks read locks. + */ + @Test + @Timeout(10) + public void testReadWriteLockInteraction() throws Exception { + String key = "test-key-5"; + CountDownLatch writeLockAcquired = new CountDownLatch(1); + CountDownLatch readLockAcquired = new CountDownLatch(1); + AtomicBoolean readLockBlocked = new AtomicBoolean(false); + + ExecutorService executor = Executors.newFixedThreadPool(2); + + try { + // First thread acquires write lock + CompletableFuture future1 = CompletableFuture.runAsync(() -> { + try (HierarchicalResourceLock lock = lockManager.acquireWriteLock(FlatResource.SNAPSHOT_GC_LOCK, key)) { + writeLockAcquired.countDown(); + // Hold lock for a short time + Thread.sleep(200); + } catch (Exception e) { + fail("Write lock acquisition failed: " + e.getMessage()); + } + }, executor); + + // Wait for write lock to be acquired + writeLockAcquired.await(); + + // Second thread tries to acquire read lock + CompletableFuture future2 = CompletableFuture.runAsync(() -> { + try { + // This should block until write lock is released + readLockBlocked.set(true); + try (HierarchicalResourceLock lock = lockManager.acquireReadLock(FlatResource.SNAPSHOT_GC_LOCK, key)) { + readLockAcquired.countDown(); + } + } catch (Exception e) { + fail("Read lock acquisition failed: " + e.getMessage()); + } + }, executor); + + // Wait for both threads to complete + future1.get(5, TimeUnit.SECONDS); + future2.get(5, TimeUnit.SECONDS); + + assertTrue(readLockBlocked.get()); + assertEquals(0, readLockAcquired.getCount()); + + } finally { + executor.shutdown(); + } + } + + /** + * Test lock state after closing. + */ + @Test + public void testLockStateAfterClose() throws Exception { + String key = "test-key-6"; + + HierarchicalResourceLock lock = lockManager.acquireReadLock(FlatResource.SNAPSHOT_DB_LOCK, key); + assertTrue(lock.isLockAcquired()); + + lock.close(); + assertFalse(lock.isLockAcquired()); + } + + /** + * Test double close doesn't cause issues. + */ + @Test + public void testDoubleClose() throws Exception { + String key = "test-key-7"; + + HierarchicalResourceLock lock = lockManager.acquireWriteLock(FlatResource.SNAPSHOT_GC_LOCK, key); + assertTrue(lock.isLockAcquired()); + + // First close + lock.close(); + assertFalse(lock.isLockAcquired()); + + // Second close should not throw exception + lock.close(); + assertFalse(lock.isLockAcquired()); + } + + /** + * Test different resource types can be locked independently. + */ + @Test + public void testDifferentResourceTypes() throws Exception { + + List locks = new ArrayList<>(); + for (FlatResource otherResource : FlatResource.values()) { + String key = "test-key"; + locks.add(lockManager.acquireWriteLock(otherResource, key)); + } + for (HierarchicalResourceLock lock : locks) { + assertNotNull(lock); + assertTrue(lock.isLockAcquired()); + } + for (HierarchicalResourceLock lock : locks) { + lock.close(); + } + } + + + /** + * Test different keys on same resource type can be locked concurrently. + */ + @Test + public void testDifferentKeysOnSameResource() throws Exception { + String key1 = "test-key-8a"; + String key2 = "test-key-8b"; + + try (HierarchicalResourceLock lock1 = lockManager.acquireWriteLock(FlatResource.SNAPSHOT_GC_LOCK, key1); + HierarchicalResourceLock lock2 = lockManager.acquireWriteLock(FlatResource.SNAPSHOT_GC_LOCK, key2)) { + + assertNotNull(lock1); + assertNotNull(lock2); + assertTrue(lock1.isLockAcquired()); + assertTrue(lock2.isLockAcquired()); + } + } + + /** + * Test configuration parameters are respected. + */ + @Test + public void testHardLimitsWithCustomConfiguration() + throws InterruptedException, IOException, ExecutionException, TimeoutException { + OzoneConfiguration customConf = new OzoneConfiguration(); + customConf.setInt(OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT, 100); + customConf.setInt(OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT, 500); + + try (PoolBasedHierarchicalResourceLockManager customLockManager = + new PoolBasedHierarchicalResourceLockManager(customConf)) { + + // Test that manager can be created with custom configuration + List locks = new ArrayList<>(); + assertNotNull(customLockManager); + for (int i = 0; i < 500; i++) { + try { + locks.add(customLockManager.acquireReadLock(FlatResource.SNAPSHOT_DB_LOCK, "test" + i)); + } catch (IOException e) { + fail("Lock acquisition failed with custom configuration: " + e.getMessage()); + } + } + CountDownLatch latch = new CountDownLatch(1); + CompletableFuture future = CompletableFuture.runAsync(() -> { + // Basic functionality test with custom configuration + latch.countDown(); + try (HierarchicalResourceLock lock = customLockManager.acquireReadLock(FlatResource.SNAPSHOT_DB_LOCK, + "test" + 501)) { + assertTrue(lock.isLockAcquired()); + } catch (Exception e) { + fail("Lock acquisition failed with custom configuration: " + e.getMessage()); + } + }); + Thread.sleep(1000); + latch.await(); + assertFalse(future.isDone()); + locks.get(0).close(); + future.get(5, TimeUnit.SECONDS); + for (HierarchicalResourceLock lock : locks) { + lock.close(); + } + } + } + + /** + * Test concurrent access with multiple threads. + */ + @Test + @Timeout(30) + public void testConcurrentAccess() throws Exception { + int numThreads = 10; + int operationsPerThread = 50; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch latch = new CountDownLatch(numThreads); + AtomicInteger successCount = new AtomicInteger(0); + AtomicReference exception = new AtomicReference<>(); + + try { + List> futures = new ArrayList<>(); + + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + for (int j = 0; j < operationsPerThread; j++) { + String key = "thread-" + threadId + "-op-" + j; + FlatResource resource = FlatResource.values()[j % FlatResource.values().length]; + + // Randomly choose read or write lock + boolean isReadLock = (j % 2 == 0); + + try (HierarchicalResourceLock lock = isReadLock ? + lockManager.acquireReadLock(resource, key) : + lockManager.acquireWriteLock(resource, key)) { + + assertTrue(lock.isLockAcquired()); + + // Simulate some work + Thread.sleep(1); + + successCount.incrementAndGet(); + } + } + } catch (Exception e) { + exception.set(e); + } finally { + latch.countDown(); + } + }, executor); + + futures.add(future); + } + + // Wait for all threads to complete + assertTrue(latch.await(25, TimeUnit.SECONDS)); + + // Check for exceptions + if (exception.get() != null) { + fail("Concurrent access test failed: " + exception.get().getMessage()); + } + + // Verify all operations succeeded + assertEquals(numThreads * operationsPerThread, successCount.get()); + for (CompletableFuture future : futures) { + future.get(); + } + } finally { + executor.shutdown(); + } + } + + /** + * Test resource pool behavior under stress. + */ + @Test + @Timeout(20) + public void testResourcePoolStress() throws Exception { + // Use smaller pool limits for stress testing + OzoneConfiguration stressConf = new OzoneConfiguration(); + stressConf.setInt(OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_SOFT_LIMIT, 10); + stressConf.setInt(OZONE_OM_HIERARCHICAL_RESOURCE_LOCKS_HARD_LIMIT, 20); + + try (PoolBasedHierarchicalResourceLockManager stressLockManager = + new PoolBasedHierarchicalResourceLockManager(stressConf)) { + + int numThreads = 5; + int operationsPerThread = 20; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch latch = new CountDownLatch(numThreads); + AtomicInteger successCount = new AtomicInteger(0); + AtomicReference exception = new AtomicReference<>(); + + try { + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + executor.submit(() -> { + try { + for (int j = 0; j < operationsPerThread; j++) { + String key = "stress-" + threadId + "-" + j; + + try (HierarchicalResourceLock lock = + stressLockManager.acquireWriteLock(FlatResource.SNAPSHOT_GC_LOCK, key)) { + + assertTrue(lock.isLockAcquired()); + + // Hold lock for a bit to stress the pool + Thread.sleep(10); + + successCount.incrementAndGet(); + } + } + } catch (Exception e) { + exception.set(e); + } finally { + latch.countDown(); + } + }); + } + + // Wait for all threads to complete + assertTrue(latch.await(15, TimeUnit.SECONDS)); + + // Check for exceptions + if (exception.get() != null) { + fail("Resource pool stress test failed: " + exception.get().getMessage()); + } + + // Verify all operations succeeded + assertEquals(numThreads * operationsPerThread, successCount.get()); + + } finally { + executor.shutdown(); + } + } + } + + /** + * Test manager close functionality. + */ + @Test + public void testManagerClose() throws Exception { + String key = "test-key-close"; + + // Acquire a lock + HierarchicalResourceLock lock = lockManager.acquireReadLock(FlatResource.SNAPSHOT_DB_LOCK, key); + assertTrue(lock.isLockAcquired()); + + // Close the lock + lock.close(); + assertFalse(lock.isLockAcquired()); + + // Close the manager + lockManager.close(); + + // Manager should be closed gracefully + // Note: We don't test acquiring locks after manager close as behavior is undefined + } + + /** + * Test null key handling. + */ + @Test + public void testNullKey() { + assertThrows(NullPointerException.class, () -> { + lockManager.acquireReadLock(FlatResource.SNAPSHOT_GC_LOCK, null); + }); + } + + /** + * Test null resource handling. + */ + @Test + public void testNullResource() { + assertThrows(NullPointerException.class, () -> { + lockManager.acquireWriteLock(null, "test-key"); + }); + } + + /** + * Test empty key handling. + */ + @Test + public void testEmptyKey() throws Exception { + // Empty key should be allowed + try (HierarchicalResourceLock lock = lockManager.acquireReadLock(FlatResource.SNAPSHOT_GC_LOCK, "")) { + assertNotNull(lock); + assertTrue(lock.isLockAcquired()); + } + } + + /** + * Test various key formats. + */ + @ParameterizedTest + @ValueSource(strings = {"simple", "key-with-dashes", "key_with_underscores", + "key.with.dots", "key/with/slashes", "123456789", + "key with spaces", "very-long-key-name-that-exceeds-normal-length-expectations"}) + public void testVariousKeyFormats(String key) throws Exception { + try (HierarchicalResourceLock lock = lockManager.acquireWriteLock(FlatResource.SNAPSHOT_DB_LOCK, key)) { + assertNotNull(lock); + assertTrue(lock.isLockAcquired()); + } + } + + /** + * Test reentrant lock behavior - same thread can acquire multiple locks on same resource. + */ + @Test + public void testReentrantLockBehavior() throws Exception { + String key = "reentrant-test"; + + // Acquire first lock + try (HierarchicalResourceLock lock1 = lockManager.acquireReadLock(FlatResource.SNAPSHOT_GC_LOCK, key)) { + assertTrue(lock1.isLockAcquired()); + + // Acquire second lock on same resource from same thread + try (HierarchicalResourceLock lock2 = lockManager.acquireReadLock(FlatResource.SNAPSHOT_GC_LOCK, key)) { + assertTrue(lock2.isLockAcquired()); + + // Both locks should be active + assertTrue(lock1.isLockAcquired()); + assertTrue(lock2.isLockAcquired()); + } + + // First lock should still be active after second is released + assertTrue(lock1.isLockAcquired()); + } + } + + /** + * Test that IOException is properly propagated from pool operations. + */ + @Test + public void testIOExceptionPropagation() { + // This test verifies that IOExceptions from pool operations are properly handled + // In normal circumstances, the pool should not throw IOExceptions during basic operations + // but the code should handle them gracefully if they occur + + String key = "exception-test"; + + try (HierarchicalResourceLock lock = lockManager.acquireReadLock(FlatResource.SNAPSHOT_DB_LOCK, key)) { + assertNotNull(lock); + assertTrue(lock.isLockAcquired()); + // If we reach here, no IOException was thrown, which is expected for normal operation + } catch (Exception e) { + // If Exception is thrown, it should be properly propagated + assertNotNull(e.getMessage()); + } + } +} diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index 5713f218bd5c..7a0872277341 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -51,6 +51,7 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager; import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket; import org.apache.hadoop.ozone.security.OzoneTokenIdentifier; @@ -90,6 +91,11 @@ public interface OMMetadataManager extends DBStoreHAManager, AutoCloseable { */ IOzoneManagerLock getLock(); + /** + * Returns the Hierarchical ResourceLock used on Metadata DB. + */ + HierarchicalResourceLockManager getHierarchicalLockManager(); + /** * Returns the epoch associated with current OM process. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index ca9f45f8d24c..c7b071a6e8d9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -103,9 +103,12 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.helpers.WithMetadata; +import org.apache.hadoop.ozone.om.lock.HierarchicalResourceLockManager; import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock; import org.apache.hadoop.ozone.om.lock.OzoneManagerLock; +import org.apache.hadoop.ozone.om.lock.PoolBasedHierarchicalResourceLockManager; +import org.apache.hadoop.ozone.om.lock.ReadOnlyHierarchicalResourceLockManager; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils; @@ -133,6 +136,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager, private DBStore store; private final IOzoneManagerLock lock; + private final HierarchicalResourceLockManager hierarchicalLockManager; private TypedTable userTable; private TypedTable volumeTable; @@ -197,6 +201,7 @@ public OmMetadataManagerImpl(OzoneConfiguration conf, this.perfMetrics = this.ozoneManager.getPerfMetrics(); } this.lock = new OzoneManagerLock(conf); + this.hierarchicalLockManager = new PoolBasedHierarchicalResourceLockManager(conf); this.omEpoch = OmUtils.getOMEpoch(); start(conf); } @@ -207,6 +212,7 @@ public OmMetadataManagerImpl(OzoneConfiguration conf, protected OmMetadataManagerImpl() { OzoneConfiguration conf = new OzoneConfiguration(); this.lock = new OzoneManagerLock(conf); + this.hierarchicalLockManager = new PoolBasedHierarchicalResourceLockManager(conf); this.omEpoch = 0; perfMetrics = null; } @@ -239,6 +245,7 @@ public static OmMetadataManagerImpl createCheckpointMetadataManager( protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name) throws IOException { lock = new OmReadOnlyLock(); + hierarchicalLockManager = new ReadOnlyHierarchicalResourceLockManager(); omEpoch = 0; int maxOpenFiles = conf.getInt(OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES, OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT); @@ -258,6 +265,7 @@ protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name) OmMetadataManagerImpl(OzoneConfiguration conf, String snapshotDirName, int maxOpenFiles) throws IOException { try { lock = new OmReadOnlyLock(); + hierarchicalLockManager = new ReadOnlyHierarchicalResourceLockManager(); omEpoch = 0; String snapshotDir = OMStorage.getOmDbDir(conf) + OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR; @@ -475,6 +483,11 @@ public void stop() throws IOException { store.close(); store = null; } + try { + hierarchicalLockManager.close(); + } catch (Exception e) { + LOG.error("Error closing hierarchical lock manager", e); + } tableCacheMetricsMap.values().forEach(TableCacheMetrics::unregister); // OzoneManagerLock cleanup lock.cleanup(); @@ -644,6 +657,11 @@ public IOzoneManagerLock getLock() { return lock; } + @Override + public HierarchicalResourceLockManager getHierarchicalLockManager() { + return hierarchicalLockManager; + } + @Override public long getOmEpoch() { return omEpoch;