Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b86ec7e
HDDS-13031. Remove Lock set bits for SNAPSHOT_GC_LOCK
swamirishi May 13, 2025
40b6a83
HDDS-13031. Fix Locking setbit issue
swamirishi May 14, 2025
c962507
HDDS-13031. Fix checkstyle
swamirishi May 14, 2025
5dc9edc
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 16, 2025
6f12aae
HDDS-13031. Implement a Flat Lock resource in OzoneManagerLock
swamirishi May 16, 2025
c8ea2e6
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 16, 2025
588dab3
HDDS-13031. Fix pmd
swamirishi May 16, 2025
2cad0f1
HDDS-13031. Fix lock details code
swamirishi May 16, 2025
6dcf230
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 17, 2025
6cb1123
HDDS-13065. Refactor SnapshotCache to return AutoCloseSupplier instea…
swamirishi May 17, 2025
bd5025c
Merge remote-tracking branch 'origin/HDDS-13065' into HEAD
swamirishi May 17, 2025
9e5604e
HDDS-12134. Implement Snapshot Cache lock for OM Bootstrap
swamirishi May 17, 2025
78d63ed
HDDS-12134. Implement a separate SNAPSHOT_DB_LOCK
swamirishi May 17, 2025
6df8f03
HDDS-12134. Check if lock acquired before opening db
swamirishi May 17, 2025
a85b794
HDDS-13065. Fix test failure
swamirishi May 17, 2025
2d1e9b1
Merge remote-tracking branch 'origin/HDDS-13065' into HEAD
swamirishi May 17, 2025
75c4764
HDDS-13031. Fix alignment
swamirishi May 17, 2025
3750ef9
Merge remote-tracking branch 'origin/HDDS-13031' into HEAD
swamirishi May 17, 2025
2c7d0b9
HDDS-13065. Make close thread safe
swamirishi May 17, 2025
5d1a70c
Merge remote-tracking branch 'origin/HDDS-13065' into HEAD
swamirishi May 17, 2025
9eab3da
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi May 22, 2025
bf926d9
HDDS-12134. Fix merge conflicts
swamirishi May 22, 2025
61152cd
HDDS-12134. Add tests
swamirishi May 23, 2025
901f1a5
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 4, 2025
1ff2bca
HDDS-12134. Address review comments
swamirishi Jun 4, 2025
f9dd246
HDDS-12134. Address review comments
swamirishi Jun 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ OMLockDetails acquireWriteLock(Resource resource,
OMLockDetails acquireWriteLocks(Resource resource,
Collection<String[]> resources);

OMLockDetails acquireResourceWriteLock(Resource resource);

boolean acquireMultiUserLock(String firstUser, String secondUser);

void releaseMultiUserLock(String firstUser, String secondUser);
Expand All @@ -46,6 +48,8 @@ OMLockDetails releaseWriteLock(Resource resource,
OMLockDetails releaseWriteLocks(Resource resource,
Collection<String[]> resources);

OMLockDetails releaseResourceWriteLock(Resource resource);

OMLockDetails releaseReadLock(Resource resource,
String... resources);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public OMLockDetails acquireWriteLocks(Resource resource, Collection<String[]> r
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public OMLockDetails acquireResourceWriteLock(Resource resource) {
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public boolean acquireMultiUserLock(String firstUser, String secondUser) {
return false;
Expand All @@ -71,6 +76,11 @@ public OMLockDetails releaseWriteLocks(Resource resource, Collection<String[]> r
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public OMLockDetails releaseResourceWriteLock(Resource resource) {
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public OMLockDetails releaseReadLock(Resource resource, String... resources) {
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -140,9 +142,11 @@ private Striped<ReadWriteLock> createStripeLock(Resource r,
return SimpleStriped.readWriteLock(size, fair);
}

private Iterable<ReadWriteLock> bulkGetLock(Map<Resource, Striped<ReadWriteLock>> lockMap, Resource resource,
Collection<String[]> keys) {
Striped<ReadWriteLock> striped = lockMap.get(resource);
private Iterable<ReadWriteLock> getAllLocks(Striped<ReadWriteLock> striped) {
return IntStream.range(0, striped.size()).mapToObj(striped::getAt).collect(Collectors.toList());
}

private Iterable<ReadWriteLock> bulkGetLock(Striped<ReadWriteLock> striped, Collection<String[]> keys) {
List<Object> lockKeys = new ArrayList<>(keys.size());
for (String[] key : keys) {
if (Objects.nonNull(key)) {
Expand Down Expand Up @@ -200,7 +204,7 @@ public OMLockDetails acquireReadLock(Resource resource, String... keys) {
*/
@Override
public OMLockDetails acquireReadLocks(Resource resource, Collection<String[]> keys) {
return acquireLocks(resource, true, keys);
return acquireLocks(resource, true, striped -> bulkGetLock(striped, keys));
}

/**
Expand Down Expand Up @@ -244,7 +248,17 @@ public OMLockDetails acquireWriteLock(Resource resource, String... keys) {
*/
@Override
public OMLockDetails acquireWriteLocks(Resource resource, Collection<String[]> keys) {
return acquireLocks(resource, false, keys);
return acquireLocks(resource, false, striped -> bulkGetLock(striped, keys));
}

/**
* Acquires all write locks for a specified resource.
*
* @param resource The resource for which the write lock is to be acquired.
*/
@Override
public OMLockDetails acquireResourceWriteLock(Resource resource) {
return acquireLocks(resource, false, this::getAllLocks);
}

private void acquireLock(Resource resource, boolean isReadLock, ReadWriteLock lock,
Expand All @@ -258,7 +272,8 @@ private void acquireLock(Resource resource, boolean isReadLock, ReadWriteLock lo
}
}

private OMLockDetails acquireLocks(Resource resource, boolean isReadLock, Collection<String[]> keys) {
private OMLockDetails acquireLocks(Resource resource, boolean isReadLock,
Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>> lockListProvider) {
Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> resourceLockPair =
resourcelockMap.get(resource.getClass());
ResourceLockManager<Resource> resourceLockManager = resourceLockPair.getRight();
Expand All @@ -271,7 +286,7 @@ private OMLockDetails acquireLocks(Resource resource, boolean isReadLock, Collec

long startWaitingTimeNanos = Time.monotonicNowNanos();

for (ReadWriteLock lock : bulkGetLock(resourceLockPair.getKey(), resource, keys)) {
for (ReadWriteLock lock : lockListProvider.apply(resourceLockPair.getKey().get(resource))) {
acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);
}
return resourceLockManager.lockResource(resource);
Expand Down Expand Up @@ -342,7 +357,6 @@ private String getErrorMessage(Resource resource) {
return "Thread '" + Thread.currentThread().getName() + "' cannot " +
"acquire " + resource.getName() + " lock while holding " +
getCurrentLocks().toString() + " lock(s).";

}

@VisibleForTesting
Expand Down Expand Up @@ -397,7 +411,17 @@ public OMLockDetails releaseWriteLock(Resource resource, String... keys) {
*/
@Override
public OMLockDetails releaseWriteLocks(Resource resource, Collection<String[]> keys) {
return releaseLocks(resource, false, keys);
return releaseLocks(resource, false, striped -> bulkGetLock(striped, keys));
}

/**
* Releases a write lock acquired on the entire Stripe for a specified resource.
*
* @param resource The resource for which the write lock is to be acquired.
*/
@Override
public OMLockDetails releaseResourceWriteLock(Resource resource) {
return releaseLocks(resource, false, this::getAllLocks);
}

/**
Expand All @@ -423,7 +447,7 @@ public OMLockDetails releaseReadLock(Resource resource, String... keys) {
*/
@Override
public OMLockDetails releaseReadLocks(Resource resource, Collection<String[]> keys) {
return releaseLocks(resource, true, keys);
return releaseLocks(resource, true, striped -> bulkGetLock(striped, keys));
}

private OMLockDetails releaseLock(Resource resource, boolean isReadLock,
Expand All @@ -445,12 +469,12 @@ private OMLockDetails releaseLock(Resource resource, boolean isReadLock,
}

private OMLockDetails releaseLocks(Resource resource, boolean isReadLock,
Collection<String[]> keys) {
Function<Striped<ReadWriteLock>, Iterable<ReadWriteLock>> lockListProvider) {
Pair<Map<Resource, Striped<ReadWriteLock>>, ResourceLockManager> resourceLockPair =
resourcelockMap.get(resource.getClass());
ResourceLockManager<Resource> resourceLockManager = resourceLockPair.getRight();
resourceLockManager.clearLockDetails();
List<ReadWriteLock> locks = StreamSupport.stream(bulkGetLock(resourceLockPair.getKey(), resource, keys)
List<ReadWriteLock> locks = StreamSupport.stream(lockListProvider.apply(resourceLockPair.getKey().get(resource))
.spliterator(), false).collect(Collectors.toList());
// Release locks in reverse order.
Collections.reverse(locks);
Expand Down Expand Up @@ -558,7 +582,10 @@ public OMLockMetrics getOMLockMetrics() {
* Flat Resource defined in Ozone. Locks can be acquired on a resource independent of one another.
*/
public enum FlatResource implements Resource {
SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK");
// Background services lock on a Snapshot.
SNAPSHOT_GC_LOCK("SNAPSHOT_GC_LOCK"),
// Lock acquired on a Snapshot's RocksDB Handle.
SNAPSHOT_DB_LOCK("SNAPSHOT_DB_LOCK");

private String name;
private ResourceManager resourceManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
Expand All @@ -39,7 +40,9 @@
import org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Class tests OzoneManagerLock.
Expand Down Expand Up @@ -278,34 +281,119 @@ void acquireUserLockAfterMultiUserLock() {
lock.releaseMultiUserLock("user1", "user2");
}

@Test
void testLockResourceParallel() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testLockResourceParallel(boolean fullResourceLock) throws Exception {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());

for (LeveledResource resource :
LeveledResource.values()) {
for (Resource resource : Stream.of(LeveledResource.values(), FlatResource.values())
.flatMap(Arrays::stream).collect(Collectors.toList())) {
final String[] resourceName = generateResourceName(resource);
lock.acquireWriteLock(resource, resourceName);
if (fullResourceLock) {
lock.acquireResourceWriteLock(resource);
} else {
lock.acquireWriteLock(resource, resourceName);
}

AtomicBoolean gotLock = new AtomicBoolean(false);
new Thread(() -> {
lock.acquireWriteLock(resource, resourceName);
if (fullResourceLock) {
lock.acquireResourceWriteLock(resource);
} else {
lock.acquireWriteLock(resource, resourceName);
}
gotLock.set(true);
lock.releaseWriteLock(resource, resourceName);
if (fullResourceLock) {
lock.releaseResourceWriteLock(resource);
} else {
lock.releaseWriteLock(resource, resourceName);
}

}).start();
// Let's give some time for the new thread to run
Thread.sleep(100);
// Since the new thread is trying to get lock on same resource,
// it will wait.
assertFalse(gotLock.get());
lock.releaseWriteLock(resource, resourceName);
if (fullResourceLock) {
lock.releaseResourceWriteLock(resource);
} else {
lock.releaseWriteLock(resource, resourceName);
}
// Since we have released the lock, the new thread should have the lock
// now.
// Let's give some time for the new thread to run
Thread.sleep(100);
assertTrue(gotLock.get());
}
}

@ParameterizedTest
@CsvSource(value = {
"true, true",
"true, false",
"false, true",
"false, false"
})
void testResourceLockFullResourceLockParallel(boolean mainThreadAcquireResourceLock, boolean acquireWriteLock)
throws Exception {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());

for (Resource resource : Stream.of(LeveledResource.values(), FlatResource.values())
.flatMap(Arrays::stream).collect(Collectors.toList())) {
final String[] resourceName = generateResourceName(resource);
if (mainThreadAcquireResourceLock) {
lock.acquireResourceWriteLock(resource);
} else {
if (acquireWriteLock) {
lock.acquireWriteLock(resource, resourceName);
} else {
lock.acquireReadLock(resource, resourceName);
}
}

AtomicBoolean gotLock = new AtomicBoolean(false);
new Thread(() -> {
if (!mainThreadAcquireResourceLock) {
lock.acquireResourceWriteLock(resource);
} else {
if (acquireWriteLock) {
lock.acquireWriteLock(resource, resourceName);
} else {
lock.acquireReadLock(resource, resourceName);
}
}
gotLock.set(true);
if (!mainThreadAcquireResourceLock) {
lock.releaseResourceWriteLock(resource);
} else {
if (acquireWriteLock) {
lock.releaseWriteLock(resource, resourceName);
} else {
lock.releaseReadLock(resource, resourceName);
}
}
}).start();
// Let's give some time for the new thread to run
Thread.sleep(100);
// Since the new thread is trying to get lock on same resource,
// it will wait.
assertFalse(gotLock.get());
if (mainThreadAcquireResourceLock) {
lock.releaseResourceWriteLock(resource);
} else {
if (acquireWriteLock) {
lock.releaseWriteLock(resource, resourceName);
} else {
lock.releaseReadLock(resource, resourceName);
}
}
// Since we have released the lock, the new thread should have the lock
// now.
// Let's give some time for the new thread to run
Thread.sleep(100);
assertTrue(gotLock.get());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public OmSnapshotManager(OzoneManager ozoneManager) {
.getBoolean(OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES,
OZONE_OM_SNAPSHOT_COMPACT_NON_SNAPSHOT_DIFF_TABLES_DEFAULT);
this.snapshotCache = new SnapshotCache(loader, softCacheSize, ozoneManager.getMetrics(),
cacheCleanupServiceInterval, compactNonSnapshotDiffTables);
cacheCleanupServiceInterval, compactNonSnapshotDiffTables, ozoneManager.getMetadataManager().getLock());

this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
ozoneManager, snapDiffJobCf, snapDiffReportCf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.SNAPSHOT_LOCK;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -135,8 +135,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IO
// in OmSnapshotPurgeResponse. Any operation apart from delete can run in parallel along with this operation.
//TODO. Revisit other SNAPSHOT_LOCK and see if we can change write locks to read locks to further optimize it.
OMLockDetails omLockDetails = ozoneManager.getMetadataManager().getLock()
.acquireReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
.acquireReadLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
String snapshotDir = OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), snapshotInfo);
Expand All @@ -147,8 +146,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IO
}
} finally {
ozoneManager.getMetadataManager().getLock()
.releaseReadLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
.releaseReadLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.om.response.snapshot;

import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_INFO_TABLE;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.SNAPSHOT_LOCK;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_DB_LOCK;

import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
Expand Down Expand Up @@ -121,8 +121,7 @@ private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager,
// inside the snapshot directory. Any operation apart which doesn't create/delete files under this snapshot
// directory can run in parallel along with this operation.
OMLockDetails omLockDetails = omMetadataManager.getLock()
.acquireWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName());
.acquireWriteLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString());
boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
if (acquiredSnapshotLock) {
Path snapshotDirPath = OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo);
Expand All @@ -132,8 +131,7 @@ private void deleteCheckpointDirectory(OMMetadataManager omMetadataManager,
LOG.error("Failed to delete snapshot directory {} for snapshot {}",
snapshotDirPath, snapshotInfo.getTableKey(), ex);
} finally {
omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK, snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(), snapshotInfo.getName());
omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_DB_LOCK, snapshotInfo.getSnapshotId().toString());
}
}
}
Expand Down
Loading
Loading