diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java index fac864b2135a..6926b7d9bf23 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om.lock; import com.google.common.annotations.VisibleForTesting; +import java.util.Collection; /** * Interface for OM Metadata locks. @@ -27,9 +28,15 @@ public interface IOzoneManagerLock { OMLockDetails acquireReadLock(OzoneManagerLock.Resource resource, String... resources); + OMLockDetails acquireReadLocks(OzoneManagerLock.Resource resource, Collection resources); + + OMLockDetails acquireWriteLock(OzoneManagerLock.Resource resource, String... resources); + OMLockDetails acquireWriteLocks(OzoneManagerLock.Resource resource, + Collection resources); + boolean acquireMultiUserLock(String firstUser, String secondUser); void releaseMultiUserLock(String firstUser, String secondUser); @@ -37,9 +44,15 @@ OMLockDetails acquireWriteLock(OzoneManagerLock.Resource resource, OMLockDetails releaseWriteLock(OzoneManagerLock.Resource resource, String... resources); + OMLockDetails releaseWriteLocks(OzoneManagerLock.Resource resource, + Collection resources); + OMLockDetails releaseReadLock(OzoneManagerLock.Resource resource, String... resources); + OMLockDetails releaseReadLocks(OzoneManagerLock.Resource resource, + Collection resources); + @VisibleForTesting int getReadHoldCount(OzoneManagerLock.Resource resource, String... resources); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java index b1b4296cba7d..059536fe0a58 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.ozone.om.lock.OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED; import static org.apache.hadoop.ozone.om.lock.OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED; +import java.util.Collection; import org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource; /** @@ -34,12 +35,22 @@ public OMLockDetails acquireReadLock(Resource resource, String... resources) { return EMPTY_DETAILS_LOCK_ACQUIRED; } + @Override + public OMLockDetails acquireReadLocks(Resource resource, Collection resources) { + return EMPTY_DETAILS_LOCK_ACQUIRED; + } + @Override public OMLockDetails acquireWriteLock(Resource resource, String... resources) { return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; } + @Override + public OMLockDetails acquireWriteLocks(Resource resource, Collection resources) { + return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; + } + @Override public boolean acquireMultiUserLock(String firstUser, String secondUser) { return false; @@ -56,11 +67,21 @@ public OMLockDetails releaseWriteLock(Resource resource, return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; } + @Override + public OMLockDetails releaseWriteLocks(Resource resource, Collection resources) { + return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; + } + @Override public OMLockDetails releaseReadLock(Resource resource, String... resources) { return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; } + @Override + public OMLockDetails releaseReadLocks(Resource resource, Collection resources) { + return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; + } + @Override public int getReadHoldCount(Resource resource, String... resources) { return 0; diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java index ab33a4f0c24d..9fd567344cd0 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java @@ -27,14 +27,19 @@ import com.google.common.util.concurrent.Striped; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.utils.CompositeKey; import org.apache.hadoop.hdds.utils.SimpleStriped; import org.apache.hadoop.ipc.ProcessingDetails.Timing; import org.apache.hadoop.ipc.Server; @@ -122,6 +127,17 @@ private Striped createStripeLock(Resource r, return SimpleStriped.readWriteLock(size, fair); } + private Iterable bulkGetLock(Resource resource, Collection keys) { + Striped striped = stripedLockByResource.get(resource); + List lockKeys = new ArrayList<>(keys.size()); + for (String[] key : keys) { + if (Objects.nonNull(key)) { + lockKeys.add(CompositeKey.combineKeys(key)); + } + } + return striped.bulkGet(lockKeys); + } + private ReentrantReadWriteLock getLock(Resource resource, String... keys) { Striped striped = stripedLockByResource.get(resource); Object key = combineKeys(keys); @@ -150,6 +166,28 @@ public OMLockDetails acquireReadLock(Resource resource, String... keys) { return acquireLock(resource, true, keys); } + /** + * Acquire read locks on a list of resources. + * + * For S3_BUCKET_LOCK, VOLUME_LOCK, BUCKET_LOCK type resource, same + * thread acquiring lock again is allowed. + * + * For USER_LOCK, PREFIX_LOCK, S3_SECRET_LOCK type resource, same thread + * acquiring lock again is not allowed. + * + * Special Note for USER_LOCK: Single thread can acquire single user lock/ + * multi user lock. But not both at the same time. + * @param resource - Type of the resource. + * @param keys - A list of Resource names on which user want to acquire locks. + * For Resource type BUCKET_LOCK, first param should be volume, second param + * should be bucket name. For remaining all resource only one param should + * be passed. + */ + @Override + public OMLockDetails acquireReadLocks(Resource resource, Collection keys) { + return acquireLocks(resource, true, keys); + } + /** * Acquire write lock on resource. * @@ -172,6 +210,59 @@ public OMLockDetails acquireWriteLock(Resource resource, String... keys) { return acquireLock(resource, false, keys); } + /** + * Acquire write locks on a list of resources. + * + * For S3_BUCKET_LOCK, VOLUME_LOCK, BUCKET_LOCK type resource, same + * thread acquiring lock again is allowed. + * + * For USER_LOCK, PREFIX_LOCK, S3_SECRET_LOCK type resource, same thread + * acquiring lock again is not allowed. + * + * Special Note for USER_LOCK: Single thread can acquire single user lock/ + * multi user lock. But not both at the same time. + * @param resource - Type of the resource. + * @param keys - A list of Resource names on which user want to acquire lock. + * For Resource type BUCKET_LOCK, first param should be volume, second param + * should be bucket name. For remaining all resource only one param should + * be passed. + */ + @Override + public OMLockDetails acquireWriteLocks(Resource resource, Collection keys) { + return acquireLocks(resource, false, keys); + } + + private void acquireLock(Resource resource, boolean isReadLock, ReadWriteLock lock, + long startWaitingTimeNanos) { + if (isReadLock) { + lock.readLock().lock(); + updateReadLockMetrics(resource, (ReentrantReadWriteLock) lock, startWaitingTimeNanos); + } else { + lock.writeLock().lock(); + updateWriteLockMetrics(resource, (ReentrantReadWriteLock) lock, startWaitingTimeNanos); + } + } + + private OMLockDetails acquireLocks(Resource resource, boolean isReadLock, + Collection keys) { + omLockDetails.get().clear(); + if (!resource.canLock(lockSet.get())) { + String errorMessage = getErrorMessage(resource); + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + + long startWaitingTimeNanos = Time.monotonicNowNanos(); + + for (ReadWriteLock lock : bulkGetLock(resource, keys)) { + acquireLock(resource, isReadLock, lock, startWaitingTimeNanos); + } + + lockSet.set(resource.setLock(lockSet.get())); + omLockDetails.get().setLockAcquired(true); + return omLockDetails.get(); + } + private OMLockDetails acquireLock(Resource resource, boolean isReadLock, String... keys) { omLockDetails.get().clear(); @@ -184,13 +275,7 @@ private OMLockDetails acquireLock(Resource resource, boolean isReadLock, long startWaitingTimeNanos = Time.monotonicNowNanos(); ReentrantReadWriteLock lock = getLock(resource, keys); - if (isReadLock) { - lock.readLock().lock(); - updateReadLockMetrics(resource, lock, startWaitingTimeNanos); - } else { - lock.writeLock().lock(); - updateWriteLockMetrics(resource, lock, startWaitingTimeNanos); - } + acquireLock(resource, isReadLock, lock, startWaitingTimeNanos); lockSet.set(resource.setLock(lockSet.get())); omLockDetails.get().setLockAcquired(true); @@ -317,6 +402,19 @@ public OMLockDetails releaseWriteLock(Resource resource, String... keys) { return releaseLock(resource, false, keys); } + /** + * Release write lock on multiple resources. + * @param resource - Type of the resource. + * @param keys - List of resource names on which user want to acquire lock. + * For Resource type BUCKET_LOCK, first param should be volume, second param + * should be bucket name. For remaining all resource only one param should + * be passed. + */ + @Override + public OMLockDetails releaseWriteLocks(Resource resource, Collection keys) { + return releaseLocks(resource, false, keys); + } + /** * Release read lock on resource. * @param resource - Type of the resource. @@ -330,6 +428,19 @@ public OMLockDetails releaseReadLock(Resource resource, String... keys) { return releaseLock(resource, true, keys); } + /** + * Release read locks on a list of resources. + * @param resource - Type of the resource. + * @param keys - Resource names on which user want to acquire lock. + * For Resource type BUCKET_LOCK, first param should be volume, second param + * should be bucket name. For remaining all resource only one param should + * be passed. + */ + @Override + public OMLockDetails releaseReadLocks(Resource resource, Collection keys) { + return releaseLocks(resource, true, keys); + } + private OMLockDetails releaseLock(Resource resource, boolean isReadLock, String... keys) { omLockDetails.get().clear(); @@ -347,6 +458,28 @@ private OMLockDetails releaseLock(Resource resource, boolean isReadLock, return omLockDetails.get(); } + private OMLockDetails releaseLocks(Resource resource, boolean isReadLock, + Collection keys) { + omLockDetails.get().clear(); + List locks = + StreamSupport.stream(bulkGetLock(resource, keys).spliterator(), false).collect(Collectors.toList()); + // Release locks in reverse order. + Collections.reverse(locks); + for (ReadWriteLock lock : locks) { + if (isReadLock) { + lock.readLock().unlock(); + updateReadUnlockMetrics(resource, (ReentrantReadWriteLock) lock); + } else { + boolean isWriteLocked = ((ReentrantReadWriteLock)lock).isWriteLockedByCurrentThread(); + lock.writeLock().unlock(); + updateWriteUnlockMetrics(resource, (ReentrantReadWriteLock) lock, isWriteLocked); + } + } + + lockSet.set(resource.clearLock(lockSet.get())); + return omLockDetails.get(); + } + private void updateReadUnlockMetrics(Resource resource, ReentrantReadWriteLock lock) { /* diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java index 4cd44cba4b1a..ca986a84639d 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Stack; import java.util.UUID; @@ -287,6 +288,37 @@ void testLockResourceParallel() throws Exception { } + @Test + void testMultiLocksResourceParallel() throws Exception { + OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration()); + + for (Resource resource : Resource.values()) { + final List resourceName = Arrays.asList(generateResourceName(resource), + generateResourceName(resource), generateResourceName(resource)); + lock.acquireWriteLocks(resource, resourceName.subList(1, resourceName.size())); + + AtomicBoolean gotLock = new AtomicBoolean(false); + new Thread(() -> { + lock.acquireWriteLocks(resource, resourceName.subList(0, 2)); + gotLock.set(true); + lock.releaseWriteLocks(resource, resourceName.subList(0, 2)); + }).start(); + // Let's give some time for the new thread to run + Thread.sleep(100); + // Since the new thread is trying to get lock on same resource, + // it will wait. + assertFalse(gotLock.get()); + lock.releaseWriteLocks(resource, resourceName.subList(1, resourceName.size())); + // Since we have released the lock, the new thread should have the lock + // now. + // Let's give some time for the new thread to run + Thread.sleep(100); + assertTrue(gotLock.get()); + } + + } + + @Test void testMultiLockResourceParallel() throws Exception { OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/MultiSnapshotLocks.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/MultiSnapshotLocks.java new file mode 100644 index 000000000000..aac3b097b22e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/MultiSnapshotLocks.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.om.lock.OzoneManagerLock; + +/** + * Class to take multiple locks on multiple snapshots. + */ +public class MultiSnapshotLocks { + private final List objectLocks; + private final IOzoneManagerLock lock; + private final OzoneManagerLock.Resource resource; + private final boolean writeLock; + private OMLockDetails lockDetails; + + public MultiSnapshotLocks(IOzoneManagerLock lock, OzoneManagerLock.Resource resource, boolean writeLock) { + this.writeLock = writeLock; + this.resource = resource; + this.lock = lock; + this.objectLocks = new ArrayList<>(); + this.lockDetails = OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED; + } + + public synchronized OMLockDetails acquireLock(Collection ids) throws OMException { + if (this.lockDetails.isLockAcquired()) { + throw new OMException( + objectLocks.stream().map(Arrays::toString).collect(Collectors.joining(",", + "More locks cannot be acquired when locks have been already acquired. Locks acquired : [", "]")), + OMException.ResultCodes.INTERNAL_ERROR); + } + List keys = + ids.stream().filter(Objects::nonNull).map(id -> new String[] {id.toString()}) + .collect(Collectors.toList()); + OMLockDetails omLockDetails = this.writeLock ? lock.acquireWriteLocks(resource, keys) : + lock.acquireReadLocks(resource, keys); + if (omLockDetails.isLockAcquired()) { + objectLocks.addAll(keys); + } + this.lockDetails = omLockDetails; + return omLockDetails; + } + + public synchronized void releaseLock() { + if (this.writeLock) { + lockDetails = lock.releaseWriteLocks(resource, this.objectLocks); + } else { + lockDetails = lock.releaseReadLocks(resource, this.objectLocks); + } + this.objectLocks.clear(); + } + + List getObjectLocks() { + return objectLocks; + } + + public boolean isLockAcquired() { + return lockDetails.isLockAcquired(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestMultiSnapshotLocks.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestMultiSnapshotLocks.java new file mode 100644 index 000000000000..741f1d30c36e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestMultiSnapshotLocks.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.om.lock.OzoneManagerLock; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Class to test class MultiLocks. + */ +@ExtendWith(MockitoExtension.class) +public class TestMultiSnapshotLocks { + @Mock + private IOzoneManagerLock mockLock; + + @Mock + private OzoneManagerLock.Resource mockResource; + + private MultiSnapshotLocks multiSnapshotLocks; + private UUID obj1 = UUID.randomUUID(); + private UUID obj2 = UUID.randomUUID(); + + @BeforeEach + void setUp() { + // Initialize MultiLocks with mock dependencies + multiSnapshotLocks = new MultiSnapshotLocks(mockLock, mockResource, true); + } + + @Test + void testAcquireLockSuccess() throws Exception { + List objects = Arrays.asList(obj1, obj2); + OMLockDetails mockLockDetails = mock(OMLockDetails.class); + when(mockLockDetails.isLockAcquired()).thenReturn(true); + + // Simulate successful lock acquisition for each object + when(mockLock.acquireWriteLocks(eq(mockResource), anyList())).thenReturn(mockLockDetails); + + OMLockDetails result = multiSnapshotLocks.acquireLock(objects); + + assertEquals(mockLockDetails, result); + verify(mockLock, times(1)).acquireWriteLocks(ArgumentMatchers.eq(mockResource), any()); + } + + @Test + void testAcquireLockFailureReleasesAll() throws Exception { + + List objects = Arrays.asList(obj1, obj2); + OMLockDetails failedLockDetails = mock(OMLockDetails.class); + when(failedLockDetails.isLockAcquired()).thenReturn(false); + + // Simulate failure during lock acquisition + when(mockLock.acquireWriteLocks(eq(mockResource), anyCollection())).thenReturn(failedLockDetails); + + OMLockDetails result = multiSnapshotLocks.acquireLock(objects); + + assertEquals(failedLockDetails, result); + assertTrue(multiSnapshotLocks.getObjectLocks().isEmpty()); + } + + @Test + void testReleaseLock() throws Exception { + List objects = Arrays.asList(obj1, obj2); + OMLockDetails mockLockDetails = mock(OMLockDetails.class); + when(mockLockDetails.isLockAcquired()).thenReturn(true); + + // Acquire locks first + when(mockLock.acquireWriteLocks(eq(mockResource), anyCollection())).thenReturn(mockLockDetails); + multiSnapshotLocks.acquireLock(objects); + assertFalse(multiSnapshotLocks.getObjectLocks().isEmpty()); + + // Now release locks + multiSnapshotLocks.releaseLock(); + + // Verify that locks are released in order + verify(mockLock).releaseWriteLocks(eq(mockResource), any()); + assertTrue(multiSnapshotLocks.getObjectLocks().isEmpty()); + } + + @Test + void testAcquireLockWhenAlreadyAcquiredThrowsException() throws Exception { + List objects = Collections.singletonList(obj1); + OMLockDetails mockLockDetails = mock(OMLockDetails.class); + when(mockLockDetails.isLockAcquired()).thenReturn(true); + + // Acquire a lock first + when(mockLock.acquireWriteLocks(any(), anyList())).thenReturn(mockLockDetails); + multiSnapshotLocks.acquireLock(objects); + + // Try acquiring locks again without releasing + OMException exception = assertThrows(OMException.class, () -> multiSnapshotLocks.acquireLock(objects)); + + assertEquals( + String.format("More locks cannot be acquired when locks have been already acquired. Locks acquired : [[%s]]", + obj1.toString()), exception.getMessage()); + } +}