diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java index 808b9a4321ab..7e8ed7c78171 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java @@ -36,6 +36,8 @@ OMLockDetails acquireWriteLock(Resource resource, OMLockDetails acquireWriteLocks(Resource resource, Collection resources); + OMLockDetails acquireResourceWriteLock(Resource resource); + boolean acquireMultiUserLock(String firstUser, String secondUser); void releaseMultiUserLock(String firstUser, String secondUser); @@ -46,6 +48,8 @@ OMLockDetails releaseWriteLock(Resource resource, OMLockDetails releaseWriteLocks(Resource resource, Collection resources); + OMLockDetails releaseResourceWriteLock(Resource resource); + OMLockDetails releaseReadLock(Resource resource, String... resources); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java index 0c289cb18890..faf5ca99b8cd 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java @@ -50,6 +50,11 @@ public OMLockDetails acquireWriteLocks(Resource resource, Collection r return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; } + @Override + public OMLockDetails acquireResourceWriteLock(Resource resource) { + return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; + } + @Override public boolean acquireMultiUserLock(String firstUser, String secondUser) { return false; @@ -71,6 +76,11 @@ public OMLockDetails releaseWriteLocks(Resource resource, Collection r return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; } + @Override + public OMLockDetails releaseResourceWriteLock(Resource resource) { + return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; + } + @Override public OMLockDetails releaseReadLock(Resource resource, String... resources) { return EMPTY_DETAILS_LOCK_NOT_ACQUIRED; diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java index 2a62836bed9b..6cd96f73238a 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java @@ -37,7 +37,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.apache.commons.lang3.tuple.Pair; @@ -140,9 +142,11 @@ private Striped createStripeLock(Resource r, return SimpleStriped.readWriteLock(size, fair); } - private Iterable bulkGetLock(Map> lockMap, Resource resource, - Collection keys) { - Striped striped = lockMap.get(resource); + private Iterable getAllLocks(Striped striped) { + return IntStream.range(0, striped.size()).mapToObj(striped::getAt).collect(Collectors.toList()); + } + + private Iterable bulkGetLock(Striped striped, Collection keys) { List lockKeys = new ArrayList<>(keys.size()); for (String[] key : keys) { if (Objects.nonNull(key)) { @@ -200,7 +204,7 @@ public OMLockDetails acquireReadLock(Resource resource, String... keys) { */ @Override public OMLockDetails acquireReadLocks(Resource resource, Collection keys) { - return acquireLocks(resource, true, keys); + return acquireLocks(resource, true, striped -> bulkGetLock(striped, keys)); } /** @@ -244,7 +248,17 @@ public OMLockDetails acquireWriteLock(Resource resource, String... keys) { */ @Override public OMLockDetails acquireWriteLocks(Resource resource, Collection keys) { - return acquireLocks(resource, false, keys); + return acquireLocks(resource, false, striped -> bulkGetLock(striped, keys)); + } + + /** + * Acquires all write locks for a specified resource. + * + * @param resource The resource for which the write lock is to be acquired. + */ + @Override + public OMLockDetails acquireResourceWriteLock(Resource resource) { + return acquireLocks(resource, false, this::getAllLocks); } private void acquireLock(Resource resource, boolean isReadLock, ReadWriteLock lock, @@ -258,7 +272,8 @@ private void acquireLock(Resource resource, boolean isReadLock, ReadWriteLock lo } } - private OMLockDetails acquireLocks(Resource resource, boolean isReadLock, Collection keys) { + private OMLockDetails acquireLocks(Resource resource, boolean isReadLock, + Function, Iterable> lockListProvider) { Pair>, ResourceLockManager> resourceLockPair = resourcelockMap.get(resource.getClass()); ResourceLockManager resourceLockManager = resourceLockPair.getRight(); @@ -271,7 +286,7 @@ private OMLockDetails acquireLocks(Resource resource, boolean isReadLock, Collec long startWaitingTimeNanos = Time.monotonicNowNanos(); - for (ReadWriteLock lock : bulkGetLock(resourceLockPair.getKey(), resource, keys)) { + for (ReadWriteLock lock : lockListProvider.apply(resourceLockPair.getKey().get(resource))) { acquireLock(resource, isReadLock, lock, startWaitingTimeNanos); } return resourceLockManager.lockResource(resource); @@ -342,7 +357,6 @@ private String getErrorMessage(Resource resource) { return "Thread '" + Thread.currentThread().getName() + "' cannot " + "acquire " + resource.getName() + " lock while holding " + getCurrentLocks().toString() + " lock(s)."; - } @VisibleForTesting @@ -397,7 +411,17 @@ public OMLockDetails releaseWriteLock(Resource resource, String... keys) { */ @Override public OMLockDetails releaseWriteLocks(Resource resource, Collection keys) { - return releaseLocks(resource, false, keys); + return releaseLocks(resource, false, striped -> bulkGetLock(striped, keys)); + } + + /** + * Releases a write lock acquired on the entire Stripe for a specified resource. + * + * @param resource The resource for which the write lock is to be acquired. + */ + @Override + public OMLockDetails releaseResourceWriteLock(Resource resource) { + return releaseLocks(resource, false, this::getAllLocks); } /** @@ -423,7 +447,7 @@ public OMLockDetails releaseReadLock(Resource resource, String... keys) { */ @Override public OMLockDetails releaseReadLocks(Resource resource, Collection keys) { - return releaseLocks(resource, true, keys); + return releaseLocks(resource, true, striped -> bulkGetLock(striped, keys)); } private OMLockDetails releaseLock(Resource resource, boolean isReadLock, @@ -445,12 +469,12 @@ private OMLockDetails releaseLock(Resource resource, boolean isReadLock, } private OMLockDetails releaseLocks(Resource resource, boolean isReadLock, - Collection keys) { + Function, Iterable> lockListProvider) { Pair>, ResourceLockManager> resourceLockPair = resourcelockMap.get(resource.getClass()); ResourceLockManager resourceLockManager = resourceLockPair.getRight(); resourceLockManager.clearLockDetails(); - List locks = StreamSupport.stream(bulkGetLock(resourceLockPair.getKey(), resource, keys) + List locks = StreamSupport.stream(lockListProvider.apply(resourceLockPair.getKey().get(resource)) .spliterator(), false).collect(Collectors.toList()); // Release locks in reverse order. Collections.reverse(locks); @@ -558,7 +582,10 @@ public OMLockMetrics getOMLockMetrics() { * Flat Resource defined in Ozone. Locks can be acquired on a resource independent of one another. */ public enum FlatResource implements Resource { - SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK"); + // Background services lock on a Snapshot. + SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK"), + // Lock acquired on a Snapshot's RocksDB Handle. + SNAPSHOT_DB_LOCK("SNAPSHOT_DB_LOCK"); private String name; private ResourceManager resourceManager; diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java index 500a96e29a46..a1d853eb6b39 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java @@ -31,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; @@ -39,7 +40,9 @@ import org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; /** * Class tests OzoneManagerLock. @@ -278,34 +281,119 @@ void acquireUserLockAfterMultiUserLock() { lock.releaseMultiUserLock("user1", "user2"); } - @Test - void testLockResourceParallel() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testLockResourceParallel(boolean fullResourceLock) throws Exception { OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration()); - for (LeveledResource resource : - LeveledResource.values()) { + for (Resource resource : Stream.of(LeveledResource.values(), FlatResource.values()) + .flatMap(Arrays::stream).collect(Collectors.toList())) { final String[] resourceName = generateResourceName(resource); - lock.acquireWriteLock(resource, resourceName); + if (fullResourceLock) { + lock.acquireResourceWriteLock(resource); + } else { + lock.acquireWriteLock(resource, resourceName); + } AtomicBoolean gotLock = new AtomicBoolean(false); new Thread(() -> { - lock.acquireWriteLock(resource, resourceName); + if (fullResourceLock) { + lock.acquireResourceWriteLock(resource); + } else { + lock.acquireWriteLock(resource, resourceName); + } gotLock.set(true); - lock.releaseWriteLock(resource, resourceName); + if (fullResourceLock) { + lock.releaseResourceWriteLock(resource); + } else { + lock.releaseWriteLock(resource, resourceName); + } + }).start(); // Let's give some time for the new thread to run Thread.sleep(100); // Since the new thread is trying to get lock on same resource, // it will wait. assertFalse(gotLock.get()); - lock.releaseWriteLock(resource, resourceName); + if (fullResourceLock) { + lock.releaseResourceWriteLock(resource); + } else { + lock.releaseWriteLock(resource, resourceName); + } // Since we have released the lock, the new thread should have the lock // now. // Let's give some time for the new thread to run Thread.sleep(100); assertTrue(gotLock.get()); } + } + + @ParameterizedTest + @CsvSource(value = { + "true, true", + "true, false", + "false, true", + "false, false" + }) + void testResourceLockFullResourceLockParallel(boolean mainThreadAcquireResourceLock, boolean acquireWriteLock) + throws Exception { + OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration()); + + for (Resource resource : Stream.of(LeveledResource.values(), FlatResource.values()) + .flatMap(Arrays::stream).collect(Collectors.toList())) { + final String[] resourceName = generateResourceName(resource); + if (mainThreadAcquireResourceLock) { + lock.acquireResourceWriteLock(resource); + } else { + if (acquireWriteLock) { + lock.acquireWriteLock(resource, resourceName); + } else { + lock.acquireReadLock(resource, resourceName); + } + } + AtomicBoolean gotLock = new AtomicBoolean(false); + new Thread(() -> { + if (!mainThreadAcquireResourceLock) { + lock.acquireResourceWriteLock(resource); + } else { + if (acquireWriteLock) { + lock.acquireWriteLock(resource, resourceName); + } else { + lock.acquireReadLock(resource, resourceName); + } + } + gotLock.set(true); + if (!mainThreadAcquireResourceLock) { + lock.releaseResourceWriteLock(resource); + } else { + if (acquireWriteLock) { + lock.releaseWriteLock(resource, resourceName); + } else { + lock.releaseReadLock(resource, resourceName); + } + } + }).start(); + // Let's give some time for the new thread to run + Thread.sleep(100); + // Since the new thread is trying to get lock on same resource, + // it will wait. + assertFalse(gotLock.get()); + if (mainThreadAcquireResourceLock) { + lock.releaseResourceWriteLock(resource); + } else { + if (acquireWriteLock) { + lock.releaseWriteLock(resource, resourceName); + } else { + lock.releaseReadLock(resource, resourceName); + } + } + // Since we have released the lock, the new thread should have the lock + // now. + // Let's give some time for the new thread to run + Thread.sleep(100); + assertTrue(gotLock.get()); + } } @Test diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index 262750e5c2f2..1de509e99394 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -295,7 +295,7 @@ public OmSnapshotManager(OzoneManager ozoneManager) { .getBoolean(OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES, OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT); this.snapshotCache = new SnapshotCache(loader, softCacheSize, ozoneManager.getMetrics(), - cacheCleanupServiceInterval, compactNonSnapshotDiffTables); + cacheCleanupServiceInterval, compactNonSnapshotDiffTables, ozoneManager.getMetadataManager().getLock()); this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ, ozoneManager, snapDiffJobCf, snapDiffReportCf, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java index ea46366d9182..b94fd45bf7fb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java @@ -19,7 +19,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT; -import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.SNAPSHOT_LOCK; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK; import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap; import com.google.common.annotations.VisibleForTesting; @@ -135,8 +135,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IO // in OmSnapshotPurgeResponse. Any operation apart from delete can run in parallel along with this operation. //TODO. Revisit other SNAPSHOT_LOCK and see if we can change write locks to read locks to further optimize it. OMLockDetails omLockDetails = ozoneManager.getMetadataManager().getLock() - .acquireReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(), - snapshotInfo.getName()); + .acquireReadLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString()); boolean acquiredSnapshotLock = omLockDetails.isLockAcquired(); if (acquiredSnapshotLock) { String snapshotDir = OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), snapshotInfo); @@ -147,8 +146,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IO } } finally { ozoneManager.getMetadataManager().getLock() - .releaseReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), - snapshotInfo.getBucketName(), snapshotInfo.getName()); + .releaseReadLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString()); } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java index 5a530ee11880..2503b291c00e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.om.response.snapshot; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_INFO_TABLE; -import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.SNAPSHOT_LOCK; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; @@ -121,8 +121,7 @@ private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager, // inside the snapshot directory. Any operation apart which doesn't create/delete files under this snapshot // directory can run in parallel along with this operation. OMLockDetails omLockDetails = omMetadataManager.getLock() - .acquireWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(), - snapshotInfo.getName()); + .acquireWriteLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString()); boolean acquiredSnapshotLock = omLockDetails.isLockAcquired(); if (acquiredSnapshotLock) { Path snapshotDirPath = OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo); @@ -132,8 +131,7 @@ private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager, LOG.error("Failed to delete snapshot directory {} for snapshot {}", snapshotDirPath, snapshotInfo.getTableKey(), ex); } finally { - omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), - snapshotInfo.getBucketName(), snapshotInfo.getName()); + omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString()); } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java index 4bdca5d04ffe..b465956f35e6 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om.snapshot; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK; import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG; import com.google.common.annotations.VisibleForTesting; @@ -28,12 +29,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.hadoop.hdds.utils.Scheduler; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.ozone.om.OmSnapshot; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +62,7 @@ public class SnapshotCache implements ReferenceCountedCallback, AutoCloseable { private final int cacheSizeLimit; private final Set pendingEvictionQueue; private final Scheduler scheduler; + private final IOzoneManagerLock lock; private static final String SNAPSHOT_CACHE_CLEANUP_SERVICE = "SnapshotCacheCleanupService"; private final boolean compactNonSnapshotDiffTables; @@ -86,7 +91,7 @@ private void compactSnapshotDB(OmSnapshot snapshot) throws IOException { try { metadataManager.getStore().compactTable(table.getName()); } catch (IOException e) { - LOG.warn("Failed to compact table {} in snapshot {}: {}", + LOG.warn("Failed to compact table {} in snapshot {}: {}", table.getName(), snapshot.getSnapshotID(), e.getMessage()); } } @@ -94,17 +99,18 @@ private void compactSnapshotDB(OmSnapshot snapshot) throws IOException { } public SnapshotCache(CacheLoader cacheLoader, int cacheSizeLimit, OMMetrics omMetrics, - long cleanupInterval, boolean compactNonSnapshotDiffTables) { + long cleanupInterval, boolean compactNonSnapshotDiffTables, IOzoneManagerLock lock) { this.dbMap = new ConcurrentHashMap<>(); this.cacheLoader = cacheLoader; this.cacheSizeLimit = cacheSizeLimit; this.omMetrics = omMetrics; + this.lock = lock; this.pendingEvictionQueue = ConcurrentHashMap.newKeySet(); this.compactNonSnapshotDiffTables = compactNonSnapshotDiffTables; if (cleanupInterval > 0) { this.scheduler = new Scheduler(SNAPSHOT_CACHE_CLEANUP_SERVICE, true, 1); - this.scheduler.scheduleWithFixedDelay(this::cleanup, cleanupInterval, + this.scheduler.scheduleWithFixedDelay(() -> this.cleanup(false), cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS); } else { this.scheduler = null; @@ -172,7 +178,8 @@ public enum Reason { } /** - * Get or load OmSnapshot. Shall be close()d after use. + * Get or load OmSnapshot. Shall be close()d after use. This would acquire a read lock on the Snapshot Database + * during the entire lifecycle of the returned OmSnapshot instance. * TODO: [SNAPSHOT] Can add reason enum to param list later. * @param key SnapshotId * @return an OmSnapshot instance, or null on error @@ -183,6 +190,11 @@ public UncheckedAutoCloseableSupplier get(UUID key) throws IOExcepti LOG.warn("Snapshot cache size ({}) exceeds configured soft-limit ({}).", size(), cacheSizeLimit); } + OMLockDetails lockDetails = lock.acquireReadLock(SNAPSHOT_DB_LOCK, key.toString()); + if (!lockDetails.isLockAcquired()) { + throw new OMException("Unable to acquire readlock on snapshot db with key " + key, + OMException.ResultCodes.INTERNAL_ERROR); + } // Atomic operation to initialize the OmSnapshot instance (once) if the key // does not exist, and increment the reference count on the instance. ReferenceCounted rcOmSnapshot = @@ -214,11 +226,12 @@ public UncheckedAutoCloseableSupplier get(UUID key) throws IOExcepti if (rcOmSnapshot == null) { // The only exception that would fall through the loader logic above // is OMException with FILE_NOT_FOUND. + lock.releaseReadLock(SNAPSHOT_DB_LOCK, key.toString()); throw new OMException("SnapshotId: '" + key + "' not found, or the snapshot is no longer active.", OMException.ResultCodes.FILE_NOT_FOUND); } return new UncheckedAutoCloseableSupplier() { - private AtomicReference closed = new AtomicReference<>(false); + private final AtomicReference closed = new AtomicReference<>(false); @Override public OmSnapshot get() { return rcOmSnapshot.get(); @@ -229,6 +242,7 @@ public void close() { closed.updateAndGet(alreadyClosed -> { if (!alreadyClosed) { rcOmSnapshot.decrementRefCount(); + lock.releaseReadLock(SNAPSHOT_DB_LOCK, key.toString()); } return true; }); @@ -249,21 +263,59 @@ public void release(UUID key) { val.decrementRefCount(); } + /** + * Acquires a write lock on the snapshot database and returns an auto-closeable supplier + * for lock details. The lock ensures that the operations accessing the snapshot database + * are performed in a thread-safe manner. The returned supplier automatically releases the + * lock when closed, preventing potential resource contention or deadlocks. + */ + public UncheckedAutoCloseableSupplier lock() { + return lock(() -> lock.acquireResourceWriteLock(SNAPSHOT_DB_LOCK), + () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK)); + } + + private UncheckedAutoCloseableSupplier lock( + Supplier lockFunction, Supplier unlockFunction) { + AtomicReference lockDetails = new AtomicReference<>(lockFunction.get()); + if (lockDetails.get().isLockAcquired()) { + cleanup(true); + if (!dbMap.isEmpty()) { + lockDetails.set(unlockFunction.get()); + } + } + + return new UncheckedAutoCloseableSupplier() { + + @Override + public void close() { + lockDetails.updateAndGet((prevLock) -> { + if (prevLock != null && prevLock.isLockAcquired()) { + return unlockFunction.get(); + } + return prevLock; + }); + } + + @Override + public OMLockDetails get() { + return lockDetails.get(); + } + }; + } /** * If cache size exceeds soft limit, attempt to clean up and close the instances that has zero reference count. */ - @VisibleForTesting - void cleanup() { - if (dbMap.size() > cacheSizeLimit) { + private synchronized void cleanup(boolean force) { + if (force || dbMap.size() > cacheSizeLimit) { for (UUID evictionKey : pendingEvictionQueue) { ReferenceCounted snapshot = dbMap.get(evictionKey); if (snapshot != null && snapshot.getTotalRefCount() == 0) { try { compactSnapshotDB(snapshot.get()); } catch (IOException e) { - LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}", + LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}", evictionKey, e.getMessage()); } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java index 195bcd105a92..2a87f081ea94 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java @@ -17,15 +17,20 @@ package org.apache.hadoop.ozone.om.snapshot; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -41,6 +46,10 @@ import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMMetrics; import org.apache.hadoop.ozone.om.OmSnapshot; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock; import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.junit.jupiter.api.AfterEach; @@ -50,6 +59,8 @@ import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.stubbing.Answer; import org.slf4j.event.Level; @@ -61,6 +72,7 @@ class TestSnapshotCache { private static final int CACHE_SIZE_LIMIT = 3; private static CacheLoader cacheLoader; + private static IOzoneManagerLock lock; private SnapshotCache snapshotCache; private OMMetrics omMetrics; @@ -93,20 +105,21 @@ static void beforeAll() throws Exception { tables.add(table2); tables.add(keyTable); when(store.listTables()).thenReturn(tables); - + return omSnapshot; } ); // Set SnapshotCache log level. Set to DEBUG for verbose output GenericTestUtils.setLogLevel(SnapshotCache.class, Level.DEBUG); + lock = spy(new OmReadOnlyLock()); } @BeforeEach void setUp() { // Reset cache for each test case omMetrics = OMMetrics.create(); - snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50, true); + snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50, true, lock); } @AfterEach @@ -128,6 +141,45 @@ void testGet() throws IOException { assertEquals(1, omMetrics.getNumSnapshotCacheSize()); } + @Test + @DisplayName("Tests get() fails on read lock failure") + public void testGetFailsOnReadLock() throws IOException { + final UUID dbKey1 = UUID.randomUUID(); + final UUID dbKey2 = UUID.randomUUID(); + when(lock.acquireReadLock(eq(SNAPSHOT_DB_LOCK), eq(dbKey1.toString()))) + .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED); + assertThrows(OMException.class, () -> snapshotCache.get(dbKey1)); + snapshotCache.get(dbKey2); + assertEquals(1, snapshotCache.size()); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 5, 10}) + @DisplayName("Tests get() holds a read lock") + public void testGetHoldsReadLock(int numberOfLocks) throws IOException { + clearInvocations(lock); + final UUID dbKey1 = UUID.randomUUID(); + final UUID dbKey2 = UUID.randomUUID(); + for (int i = 0; i < numberOfLocks; i++) { + snapshotCache.get(dbKey1); + snapshotCache.get(dbKey2); + } + assertEquals(numberOfLocks > 0 ? 2 : 0, snapshotCache.size()); + verify(lock, times(numberOfLocks)).acquireReadLock(eq(SNAPSHOT_DB_LOCK), eq(dbKey1.toString())); + verify(lock, times(numberOfLocks)).acquireReadLock(eq(SNAPSHOT_DB_LOCK), eq(dbKey2.toString())); + } + + @ParameterizedTest + @ValueSource(ints = {0, 1, 5, 10}) + @DisplayName("Tests lock() holds a write lock") + public void testGetHoldsWriteLock(int numberOfLocks) { + clearInvocations(lock); + for (int i = 0; i < numberOfLocks; i++) { + snapshotCache.lock(); + } + verify(lock, times(numberOfLocks)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK)); + } + @Test @DisplayName("get() same entry twice yields one cache entry only") void testGetTwice() throws IOException { @@ -266,7 +318,7 @@ void testEviction1() throws IOException, InterruptedException, TimeoutException assertEquals(1, snapshotCache.size()); assertEquals(1, omMetrics.getNumSnapshotCacheSize()); assertEntryExistence(dbKey1, false); - + // Verify compaction was called on the tables org.apache.hadoop.hdds.utils.db.DBStore store1 = snapshot1.get().getMetadataManager().getStore(); verify(store1, times(1)).compactTable("table1"); @@ -371,7 +423,8 @@ void testEviction3WithClose() throws IOException, InterruptedException, TimeoutE @DisplayName("Snapshot operations not blocked during compaction") void testSnapshotOperationsNotBlockedDuringCompaction() throws IOException, InterruptedException, TimeoutException { omMetrics = OMMetrics.create(); - snapshotCache = new SnapshotCache(cacheLoader, 1, omMetrics, 50, true); + snapshotCache = new SnapshotCache(cacheLoader, 1, omMetrics, 50, true, + lock); final UUID dbKey1 = UUID.randomUUID(); UncheckedAutoCloseableSupplier snapshot1 = snapshotCache.get(dbKey1); assertEquals(1, snapshotCache.size()); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index 57666d3665e2..2ffbaa44d828 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -135,6 +135,7 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.helpers.WithParentObjectId; +import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock; import org.apache.hadoop.ozone.om.snapshot.SnapshotTestUtils.StubbedPersistentMap; import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse; import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage; @@ -370,7 +371,8 @@ public void init() throws RocksDBException, IOException, ExecutionException { omSnapshotManager = mock(OmSnapshotManager.class); when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager); - SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0, true); + SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10, omMetrics, 0, true, + new OmReadOnlyLock()); when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), anyString())) .thenAnswer(invocationOnMock -> {