Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.om.lock;

import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;

/**
* Interface for OM Metadata locks.
Expand All @@ -27,19 +28,31 @@ public interface IOzoneManagerLock {
OMLockDetails acquireReadLock(OzoneManagerLock.Resource resource,
String... resources);

OMLockDetails acquireReadLocks(OzoneManagerLock.Resource resource, Collection<String[]> resources);


OMLockDetails acquireWriteLock(OzoneManagerLock.Resource resource,
String... resources);

OMLockDetails acquireWriteLocks(OzoneManagerLock.Resource resource,
Collection<String[]> resources);

boolean acquireMultiUserLock(String firstUser, String secondUser);

void releaseMultiUserLock(String firstUser, String secondUser);

OMLockDetails releaseWriteLock(OzoneManagerLock.Resource resource,
String... resources);

OMLockDetails releaseWriteLocks(OzoneManagerLock.Resource resource,
Collection<String[]> resources);

OMLockDetails releaseReadLock(OzoneManagerLock.Resource resource,
String... resources);

OMLockDetails releaseReadLocks(OzoneManagerLock.Resource resource,
Collection<String[]> resources);

@VisibleForTesting
int getReadHoldCount(OzoneManagerLock.Resource resource,
String... resources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.ozone.om.lock.OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED;
import static org.apache.hadoop.ozone.om.lock.OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED;

import java.util.Collection;
import org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource;

/**
Expand All @@ -34,12 +35,22 @@ public OMLockDetails acquireReadLock(Resource resource, String... resources) {
return EMPTY_DETAILS_LOCK_ACQUIRED;
}

@Override
public OMLockDetails acquireReadLocks(Resource resource, Collection<String[]> resources) {
return EMPTY_DETAILS_LOCK_ACQUIRED;
}

@Override
public OMLockDetails acquireWriteLock(Resource resource,
String... resources) {
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public OMLockDetails acquireWriteLocks(Resource resource, Collection<String[]> resources) {
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public boolean acquireMultiUserLock(String firstUser, String secondUser) {
return false;
Expand All @@ -56,11 +67,21 @@ public OMLockDetails releaseWriteLock(Resource resource,
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public OMLockDetails releaseWriteLocks(Resource resource, Collection<String[]> resources) {
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public OMLockDetails releaseReadLock(Resource resource, String... resources) {
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public OMLockDetails releaseReadLocks(Resource resource, Collection<String[]> resources) {
return EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}

@Override
public int getReadHoldCount(Resource resource, String... resources) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
import com.google.common.util.concurrent.Striped;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.utils.CompositeKey;
import org.apache.hadoop.hdds.utils.SimpleStriped;
import org.apache.hadoop.ipc.ProcessingDetails.Timing;
import org.apache.hadoop.ipc.Server;
Expand Down Expand Up @@ -122,6 +127,17 @@ private Striped<ReadWriteLock> createStripeLock(Resource r,
return SimpleStriped.readWriteLock(size, fair);
}

private Iterable<ReadWriteLock> bulkGetLock(Resource resource, Collection<String[]> keys) {
Striped<ReadWriteLock> striped = stripedLockByResource.get(resource);
List<Object> lockKeys = new ArrayList<>(keys.size());
for (String[] key : keys) {
if (Objects.nonNull(key)) {
lockKeys.add(CompositeKey.combineKeys(key));
}
}
return striped.bulkGet(lockKeys);
}

private ReentrantReadWriteLock getLock(Resource resource, String... keys) {
Striped<ReadWriteLock> striped = stripedLockByResource.get(resource);
Object key = combineKeys(keys);
Expand Down Expand Up @@ -150,6 +166,28 @@ public OMLockDetails acquireReadLock(Resource resource, String... keys) {
return acquireLock(resource, true, keys);
}

/**
* Acquire read locks on a list of resources.
*
* For S3_BUCKET_LOCK, VOLUME_LOCK, BUCKET_LOCK type resource, same
* thread acquiring lock again is allowed.
*
* For USER_LOCK, PREFIX_LOCK, S3_SECRET_LOCK type resource, same thread
* acquiring lock again is not allowed.
*
* Special Note for USER_LOCK: Single thread can acquire single user lock/
* multi user lock. But not both at the same time.
* @param resource - Type of the resource.
* @param keys - A list of Resource names on which user want to acquire locks.
* For Resource type BUCKET_LOCK, first param should be volume, second param
* should be bucket name. For remaining all resource only one param should
* be passed.
*/
@Override
public OMLockDetails acquireReadLocks(Resource resource, Collection<String[]> keys) {
return acquireLocks(resource, true, keys);
}

/**
* Acquire write lock on resource.
*
Expand All @@ -172,6 +210,59 @@ public OMLockDetails acquireWriteLock(Resource resource, String... keys) {
return acquireLock(resource, false, keys);
}

/**
* Acquire write locks on a list of resources.
*
* For S3_BUCKET_LOCK, VOLUME_LOCK, BUCKET_LOCK type resource, same
* thread acquiring lock again is allowed.
*
* For USER_LOCK, PREFIX_LOCK, S3_SECRET_LOCK type resource, same thread
* acquiring lock again is not allowed.
*
* Special Note for USER_LOCK: Single thread can acquire single user lock/
* multi user lock. But not both at the same time.
* @param resource - Type of the resource.
* @param keys - A list of Resource names on which user want to acquire lock.
* For Resource type BUCKET_LOCK, first param should be volume, second param
* should be bucket name. For remaining all resource only one param should
* be passed.
*/
@Override
public OMLockDetails acquireWriteLocks(Resource resource, Collection<String[]> keys) {
return acquireLocks(resource, false, keys);
}

private void acquireLock(Resource resource, boolean isReadLock, ReadWriteLock lock,
long startWaitingTimeNanos) {
if (isReadLock) {
lock.readLock().lock();
updateReadLockMetrics(resource, (ReentrantReadWriteLock) lock, startWaitingTimeNanos);
} else {
lock.writeLock().lock();
updateWriteLockMetrics(resource, (ReentrantReadWriteLock) lock, startWaitingTimeNanos);
}
}

private OMLockDetails acquireLocks(Resource resource, boolean isReadLock,
Collection<String[]> keys) {
omLockDetails.get().clear();
if (!resource.canLock(lockSet.get())) {
String errorMessage = getErrorMessage(resource);
LOG.error(errorMessage);
throw new RuntimeException(errorMessage);
}

long startWaitingTimeNanos = Time.monotonicNowNanos();

for (ReadWriteLock lock : bulkGetLock(resource, keys)) {
acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);
}

lockSet.set(resource.setLock(lockSet.get()));
omLockDetails.get().setLockAcquired(true);
return omLockDetails.get();
}

private OMLockDetails acquireLock(Resource resource, boolean isReadLock,
String... keys) {
omLockDetails.get().clear();
Expand All @@ -184,13 +275,7 @@ private OMLockDetails acquireLock(Resource resource, boolean isReadLock,
long startWaitingTimeNanos = Time.monotonicNowNanos();

ReentrantReadWriteLock lock = getLock(resource, keys);
if (isReadLock) {
lock.readLock().lock();
updateReadLockMetrics(resource, lock, startWaitingTimeNanos);
} else {
lock.writeLock().lock();
updateWriteLockMetrics(resource, lock, startWaitingTimeNanos);
}
acquireLock(resource, isReadLock, lock, startWaitingTimeNanos);

lockSet.set(resource.setLock(lockSet.get()));
omLockDetails.get().setLockAcquired(true);
Expand Down Expand Up @@ -317,6 +402,19 @@ public OMLockDetails releaseWriteLock(Resource resource, String... keys) {
return releaseLock(resource, false, keys);
}

/**
* Release write lock on multiple resources.
* @param resource - Type of the resource.
* @param keys - List of resource names on which user want to acquire lock.
* For Resource type BUCKET_LOCK, first param should be volume, second param
* should be bucket name. For remaining all resource only one param should
* be passed.
*/
@Override
public OMLockDetails releaseWriteLocks(Resource resource, Collection<String[]> keys) {
return releaseLocks(resource, false, keys);
}

/**
* Release read lock on resource.
* @param resource - Type of the resource.
Expand All @@ -330,6 +428,19 @@ public OMLockDetails releaseReadLock(Resource resource, String... keys) {
return releaseLock(resource, true, keys);
}

/**
* Release read locks on a list of resources.
* @param resource - Type of the resource.
* @param keys - Resource names on which user want to acquire lock.
* For Resource type BUCKET_LOCK, first param should be volume, second param
* should be bucket name. For remaining all resource only one param should
* be passed.
*/
@Override
public OMLockDetails releaseReadLocks(Resource resource, Collection<String[]> keys) {
return releaseLocks(resource, true, keys);
}

private OMLockDetails releaseLock(Resource resource, boolean isReadLock,
String... keys) {
omLockDetails.get().clear();
Expand All @@ -347,6 +458,28 @@ private OMLockDetails releaseLock(Resource resource, boolean isReadLock,
return omLockDetails.get();
}

private OMLockDetails releaseLocks(Resource resource, boolean isReadLock,
Collection<String[]> keys) {
omLockDetails.get().clear();
List<ReadWriteLock> locks =
StreamSupport.stream(bulkGetLock(resource, keys).spliterator(), false).collect(Collectors.toList());
// Release locks in reverse order.
Collections.reverse(locks);
for (ReadWriteLock lock : locks) {
if (isReadLock) {
lock.readLock().unlock();
updateReadUnlockMetrics(resource, (ReentrantReadWriteLock) lock);
} else {
boolean isWriteLocked = ((ReentrantReadWriteLock)lock).isWriteLockedByCurrentThread();
lock.writeLock().unlock();
updateWriteUnlockMetrics(resource, (ReentrantReadWriteLock) lock, isWriteLocked);
}
}

lockSet.set(resource.clearLock(lockSet.get()));
return omLockDetails.get();
}

private void updateReadUnlockMetrics(Resource resource,
ReentrantReadWriteLock lock) {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Stack;
import java.util.UUID;
Expand Down Expand Up @@ -287,6 +288,37 @@ void testLockResourceParallel() throws Exception {

}

@Test
void testMultiLocksResourceParallel() throws Exception {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());

for (Resource resource : Resource.values()) {
final List<String[]> resourceName = Arrays.asList(generateResourceName(resource),
generateResourceName(resource), generateResourceName(resource));
lock.acquireWriteLocks(resource, resourceName.subList(1, resourceName.size()));

AtomicBoolean gotLock = new AtomicBoolean(false);
new Thread(() -> {
lock.acquireWriteLocks(resource, resourceName.subList(0, 2));
gotLock.set(true);
lock.releaseWriteLocks(resource, resourceName.subList(0, 2));
}).start();
// Let's give some time for the new thread to run
Thread.sleep(100);
// Since the new thread is trying to get lock on same resource,
// it will wait.
assertFalse(gotLock.get());
lock.releaseWriteLocks(resource, resourceName.subList(1, resourceName.size()));
// Since we have released the lock, the new thread should have the lock
// now.
// Let's give some time for the new thread to run
Thread.sleep(100);
assertTrue(gotLock.get());
}

}


@Test
void testMultiLockResourceParallel() throws Exception {
OzoneManagerLock lock = new OzoneManagerLock(new OzoneConfiguration());
Expand Down
Loading
Loading