diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index d26546e47ee4..009234b0483b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -100,6 +100,7 @@ import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult; import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob; import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider; import org.apache.hadoop.ozone.om.ha.OMHAMetrics; import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext; @@ -472,6 +473,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private final OzoneLockProvider ozoneLockProvider; private final OMPerformanceMetrics perfMetrics; private final BucketUtilizationMetrics bucketUtilizationMetrics; + private final OmLockOpr omLockOpr; private boolean fsSnapshotEnabled; @@ -625,6 +627,7 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) configuration); this.ozoneLockProvider = new OzoneLockProvider(getKeyPathLockEnabled(), getEnableFileSystemPaths()); + this.omLockOpr = new OmLockOpr(threadPrefix); // For testing purpose only, not hit scm from om as Hadoop UGI can't login // two principals in the same JVM. @@ -1752,6 +1755,8 @@ public void start() throws IOException { bootstrap(omNodeDetails); } + omLockOpr.start(); + omState = State.RUNNING; auditMap.put("NewOmState", omState.name()); SYSTEMAUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMSystemAction.STARTUP, auditMap)); @@ -1829,6 +1834,7 @@ public void restart() throws IOException { } startJVMPauseMonitor(); setStartTime(); + omLockOpr.start(); omState = State.RUNNING; auditMap.put("NewOmState", omState.name()); SYSTEMAUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMSystemAction.STARTUP, auditMap)); @@ -2294,6 +2300,7 @@ public boolean stop() { } keyManager.stop(); stopSecretManager(); + omLockOpr.stop(); if (scmTopologyClient != null) { scmTopologyClient.stop(); @@ -4529,6 +4536,10 @@ public OzoneLockProvider getOzoneLockProvider() { return this.ozoneLockProvider; } + public OmLockOpr getOmLockOpr() { + return this.omLockOpr; + } + public ReplicationConfig getDefaultReplicationConfig() { if (this.defaultReplicationConfig != null) { return this.defaultReplicationConfig; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLock.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLock.java new file mode 100644 index 000000000000..4e0fa6fe88ef --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/KeyLock.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import com.google.common.util.concurrent.Striped; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * key locking. + */ +public class KeyLock { + private static final Logger LOG = LoggerFactory.getLogger(KeyLock.class); + private static final long LOCK_TIMEOUT_DEFAULT = 10 * 60 * 1000; + private final Striped fileStripedLock; + private final long lockTimeout; + + public KeyLock(int stripLockSize) { + this(stripLockSize, LOCK_TIMEOUT_DEFAULT); + } + + public KeyLock(int stripLockSize, long timeout) { + fileStripedLock = Striped.readWriteLock(stripLockSize); + lockTimeout = timeout; + } + + public List lock(List keyList) throws IOException { + List locks = new ArrayList<>(); + boolean isSuccess = false; + try { + Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); + for (ReadWriteLock rwLock : readWriteLocks) { + Lock lockObj = rwLock.writeLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Key write lock is failed for {} after wait of {}ms", this, lockTimeout); + throw new OMException("Unable to get write lock after " + lockTimeout + "ms" + + ", read lock info: " + rwLock.readLock(), + OMException.ResultCodes.TIMEOUT); + } + locks.add(lockObj); + } + isSuccess = true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("Unable to get write lock as interrupted", OMException.ResultCodes.INTERNAL_ERROR); + } finally { + if (!isSuccess) { + Collections.reverse(locks); + locks.forEach(Lock::unlock); + locks.clear(); + } + } + return locks; + } + + public Lock lock(String key) throws IOException { + LOG.debug("Key {} is locked for instance {} {}", key, this, fileStripedLock.get(key)); + try { + Lock lockObj = fileStripedLock.get(key).writeLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Key {} lock is failed for {} after wait of {}ms", key, this, lockTimeout); + throw new OMException("Unable to get write lock for " + key + " after " + lockTimeout + "ms" + + ", read lock info: " + fileStripedLock.get(key).readLock(), + OMException.ResultCodes.TIMEOUT); + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("Unable to get read lock for " + key + " is interrupted", + OMException.ResultCodes.INTERNAL_ERROR); + } + } + + public List readLock(List keyList) throws OMException { + List locks = new ArrayList<>(); + boolean isSuccess = false; + try { + Iterable readWriteLocks = fileStripedLock.bulkGet(keyList); + for (ReadWriteLock rwLock : readWriteLocks) { + Lock lockObj = rwLock.readLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + LOG.error("Key read lock is failed for {} after wait of {}ms", this, lockTimeout); + throw new OMException("Unable to get read lock after " + lockTimeout + "ms" + + ", write lock info: " + rwLock.writeLock(), + OMException.ResultCodes.TIMEOUT); + } + locks.add(lockObj); + } + isSuccess = true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("Unable to get read lock as interrupted", OMException.ResultCodes.INTERNAL_ERROR); + } finally { + if (!isSuccess) { + Collections.reverse(locks); + locks.forEach(Lock::unlock); + locks.clear(); + } + } + return locks; + } + + public Lock readLock(String key) throws OMException { + try { + LOG.debug("Key {} is read locked for instance {} {}", key, this, fileStripedLock.get(key)); + Lock lockObj = fileStripedLock.get(key).readLock(); + boolean b = lockObj.tryLock(lockTimeout, TimeUnit.MILLISECONDS); + if (!b) { + throw new OMException("Unable to get read lock for " + key + " after " + lockTimeout + "ms", + OMException.ResultCodes.TIMEOUT); + } + return lockObj; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new OMException("Unable to get read lock for " + key + " is interrupted", + OMException.ResultCodes.INTERNAL_ERROR); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java new file mode 100644 index 000000000000..e8c8de32f0b4 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/lock/OmLockOpr.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import org.apache.hadoop.util.Time; +import org.jheaps.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * key locking. + */ +public class OmLockOpr { + private static final Logger LOG = LoggerFactory.getLogger(OmLockOpr.class); + private static final long MONITOR_DELAY = 10 * 60 * 1000; + private static final long MONITOR_LOCK_THRESHOLD_NS = 10 * 60 * 1000_000_000L; + private final KeyLock keyLocking; + private final KeyLock bucketLocking; + private final KeyLock volumeLocking; + private final String threadNamePrefix; + private ScheduledExecutorService executorService; + private final Map lockMonitorMap = new ConcurrentHashMap<>(); + + public OmLockOpr(String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + keyLocking = new KeyLock(102400); + bucketLocking = new KeyLock(1024); + volumeLocking = new KeyLock(1024); + } + + public void start() { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(threadNamePrefix + "OmLockOpr-Monitor-%d").build(); + executorService = Executors.newScheduledThreadPool(1, threadFactory); + executorService.scheduleWithFixedDelay(this::monitor, 0, MONITOR_DELAY, TimeUnit.MILLISECONDS); + } + + public void stop() { + executorService.shutdown(); + } + + public OmLockInfo volumeReadLock(String volumeName) throws IOException { + return lockOneKey(volumeLocking::readLock, volumeName, OmLockInfo.LockOpType.WAIT); + } + + public OmLockInfo volumeWriteLock(String volumeName) throws IOException { + return lockOneKey(volumeLocking::lock, volumeName, OmLockInfo.LockOpType.WAIT); + } + + public OmLockInfo volBucketRWLock(String volumeName, String bucketName) throws IOException { + OmLockInfo omLockInfo = new OmLockInfo(); + List locks = omLockInfo.getLocks(); + long startTime = Time.monotonicNowNanos(); + locks.add(volumeLocking.readLock(volumeName)); + try { + locks.add(bucketLocking.lock(bucketName)); + long endTime = Time.monotonicNowNanos(); + omLockInfo.add(endTime - startTime, OmLockInfo.LockOpType.WAIT); + omLockInfo.setLockTakenTime(endTime); + lockMonitorMap.put(omLockInfo, omLockInfo); + return omLockInfo; + } catch (IOException ex) { + writeUnlock(omLockInfo); + throw ex; + } + } + + public OmLockInfo bucketWriteLock(String bucketName) throws IOException { + return lockOneKey(bucketLocking::lock, bucketName, OmLockInfo.LockOpType.WAIT); + } + + public OmLockInfo bucketReadLock(String bucketName) throws IOException { + return lockOneKey(bucketLocking::readLock, bucketName, OmLockInfo.LockOpType.WAIT); + } + + private OmLockInfo lockOneKey(LockFunction lockFunction, String name, OmLockInfo.LockOpType type) + throws IOException { + OmLockInfo omLockInfo = new OmLockInfo(); + List locks = omLockInfo.getLocks(); + long startTime = Time.monotonicNowNanos(); + locks.add(lockFunction.apply(name)); + long endTime = Time.monotonicNowNanos(); + omLockInfo.add(endTime - startTime, type); + omLockInfo.setLockTakenTime(endTime); + lockMonitorMap.put(omLockInfo, omLockInfo); + return omLockInfo; + } + + public OmLockInfo obsLock(String bucketName, String keyName) throws IOException { + OmLockInfo omLockInfo = new OmLockInfo(); + List locks = omLockInfo.getLocks(); + // bucket read lock + long startTime = Time.monotonicNowNanos(); + locks.add(bucketLocking.readLock(bucketName)); + try { + // key lock with bucket uniqueness as same key can be present across bucket + locks.add(keyLocking.lock(bucketName + "/" + keyName)); + long endTime = Time.monotonicNowNanos(); + omLockInfo.add(endTime - startTime, OmLockInfo.LockOpType.WAIT); + omLockInfo.setLockTakenTime(endTime); + lockMonitorMap.put(omLockInfo, omLockInfo); + return omLockInfo; + } catch (IOException ex) { + writeUnlock(omLockInfo); + throw ex; + } + } + + public OmLockInfo obsLock(String bucketName, List keyList) throws IOException { + OmLockInfo omLockInfo = new OmLockInfo(); + List locks = omLockInfo.getLocks(); + // bucket read lock + long startTime = Time.monotonicNowNanos(); + locks.add(bucketLocking.readLock(bucketName)); + try { + // key lock with bucket uniqueness as same key can be present across bucket + List bucketKeyList = new ArrayList<>(); + keyList.forEach(e -> bucketKeyList.add(bucketName + "/" + e)); + locks.addAll(keyLocking.lock(bucketKeyList)); + long endTime = Time.monotonicNowNanos(); + omLockInfo.add(endTime - startTime, OmLockInfo.LockOpType.WAIT); + omLockInfo.setLockTakenTime(endTime); + lockMonitorMap.put(omLockInfo, omLockInfo); + return omLockInfo; + } catch (IOException ex) { + writeUnlock(omLockInfo); + throw ex; + } + } + + public void writeUnlock(OmLockInfo lockInfo) { + unlock(lockInfo, OmLockInfo.LockOpType.WRITE); + } + + public void readUnlock(OmLockInfo lockInfo) { + unlock(lockInfo, OmLockInfo.LockOpType.READ); + } + + private void unlock(OmLockInfo lockInfo, OmLockInfo.LockOpType type) { + Collections.reverse(lockInfo.getLocks()); + lockInfo.getLocks().forEach(Lock::unlock); + if (lockInfo.getLockTakenTime() > 0) { + lockInfo.add(Time.monotonicNowNanos() - lockInfo.getLockTakenTime(), type); + } + lockInfo.getLocks().clear(); + lockMonitorMap.remove(lockInfo); + } + + public void monitor() { + long curTime = Time.monotonicNowNanos() - MONITOR_LOCK_THRESHOLD_NS; + for (OmLockInfo lockInfo : lockMonitorMap.values()) { + if ((curTime - lockInfo.getLockTakenTime()) > 0) { + LOG.warn("Lock {} is crossing threshold {}: ", lockInfo, MONITOR_LOCK_THRESHOLD_NS); + } + } + } + + @VisibleForTesting + public Map getLockMonitorMap() { + return lockMonitorMap; + } + /** + * Lock information. + */ + public static class OmLockInfo { + private String key; + private long lockTakenTime; + private long waitLockNanos; + private long readLockNanos; + private long writeLockNanos; + private List locks = new ArrayList<>(); + + public void setKey(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + + public long getWaitLockNanos() { + return waitLockNanos; + } + + public long getReadLockNanos() { + return readLockNanos; + } + + public long getWriteLockNanos() { + return writeLockNanos; + } + + public List getLocks() { + return locks; + } + + public long getLockTakenTime() { + return lockTakenTime; + } + + public void setLockTakenTime(long lockTakenTime) { + this.lockTakenTime = lockTakenTime; + } + + void add(long timeNanos, LockOpType lockOpType) { + switch (lockOpType) { + case WAIT: + waitLockNanos += timeNanos; + break; + case READ: + readLockNanos += timeNanos; + break; + case WRITE: + writeLockNanos += timeNanos; + break; + default: + } + } + + @Override + public String toString() { + return "OMLockDetails{" + + "key=" + key + + ", waitLockNanos=" + waitLockNanos + + ", readLockNanos=" + readLockNanos + + ", writeLockNanos=" + writeLockNanos + + '}'; + } + + enum LockOpType { + WAIT, + READ, + WRITE + } + } + + @FunctionalInterface + private interface LockFunction { + R apply(T t) throws IOException; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java index 2fcb19f39d10..bcef1eac0ea8 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneConsts; @@ -122,6 +123,13 @@ public OMRequest preExecute(OzoneManager ozoneManager) return omRequest; } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + } + /** * Performs any request specific failure handling during request * submission. An example of this would be an undo of any steps diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java index 38a4d78b5385..5bd468ef2728 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketCreateRequest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.hdds.client.DefaultReplicationConfig; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.ClientVersion; @@ -161,6 +162,15 @@ private static void validateMaxBucket(OzoneManager ozoneManager) } } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + BucketInfo bucketInfo = getOmRequest().getCreateBucketRequest().getBucketInfo(); + return lockOpr.volBucketRWLock(bucketInfo.getVolumeName(), bucketInfo.getBucketName()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long transactionLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java index c984c66a2590..10b55cf082ec 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketDeleteRequest.java @@ -26,43 +26,39 @@ import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.audit.AuditLogger; +import org.apache.hadoop.ozone.audit.OMAction; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OMMetrics; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; +import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator; import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase; import org.apache.hadoop.ozone.om.request.validation.ValidationCondition; import org.apache.hadoop.ozone.om.request.validation.ValidationContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; -import org.apache.hadoop.ozone.security.acl.OzoneObj; -import org.apache.hadoop.ozone.audit.AuditLogger; -import org.apache.hadoop.ozone.audit.OMAction; -import org.apache.hadoop.ozone.om.request.OMClientRequest; -import org.apache.hadoop.ozone.om.OMMetadataManager; -import org.apache.hadoop.ozone.om.OMMetrics; -import org.apache.hadoop.ozone.om.OzoneManager; -import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.om.response.bucket.OMBucketDeleteResponse; import org.apache.hadoop.ozone.om.response.OMClientResponse; -import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.response.bucket.OMBucketDeleteResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteBucketResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .DeleteBucketRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .DeleteBucketResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; -import org.apache.hadoop.hdds.utils.db.cache.CacheKey; -import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.CONTAINS_SNAPSHOT; @@ -80,6 +76,14 @@ public OMBucketDeleteRequest(OMRequest omRequest) { super(omRequest); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return lockOpr.bucketWriteLock(getOmRequest().getDeleteBucketRequest().getBucketName()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long transactionLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetOwnerRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetOwnerRequest.java index 239083a58c8c..3cab92fed903 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetOwnerRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetOwnerRequest.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.audit.AuditLogger; @@ -76,6 +77,14 @@ public OMRequest preExecute(OzoneManager ozoneManager) .build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return lockOpr.bucketWriteLock(getOmRequest().getSetBucketPropertyRequest().getBucketArgs().getBucketName()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long transactionLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java index e76aa0d7093d..8b71ed4e6270 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/OMBucketSetPropertyRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.hdds.client.DefaultReplicationConfig; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.AuditLogger; import org.apache.hadoop.ozone.audit.OMAction; @@ -108,6 +109,14 @@ public OMRequest preExecute(OzoneManager ozoneManager) .build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return lockOpr.bucketWriteLock(getOmRequest().getSetBucketPropertyRequest().getBucketArgs().getBucketName()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAclRequest.java index 23c92b8ae545..cb36179788a5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAclRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/bucket/acl/OMBucketAclRequest.java @@ -23,9 +23,10 @@ import java.util.List; import java.util.Map; import java.util.function.BiPredicate; - import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.AuditLogger; @@ -35,17 +36,16 @@ import org.apache.hadoop.ozone.om.ResolvedBucket; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.request.OMClientRequest; -import org.apache.hadoop.ozone.om.response.bucket.acl.OMBucketAclResponse; import org.apache.hadoop.ozone.om.request.util.ObjectParser; import org.apache.hadoop.ozone.om.response.OMClientResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType; +import org.apache.hadoop.ozone.om.response.bucket.acl.OMBucketAclResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneObj.ObjectType; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.hadoop.ozone.security.acl.OzoneObj; -import org.apache.hadoop.hdds.utils.db.cache.CacheKey; -import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK; @@ -62,6 +62,17 @@ public OMBucketAclRequest(OMRequest omRequest, omBucketAclOp = aclOp; } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + ObjectParser objectParser = new ObjectParser(getPath(), ObjectType.BUCKET); + ResolvedBucket resolvedBucket = ozoneManager.resolveBucketLink( + Pair.of(objectParser.getVolume(), objectParser.getBucket()), false, false); + return lockOpr.volBucketRWLock(resolvedBucket.realVolume(), resolvedBucket.realBucket()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long transactionLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java index cf07bc7d4d65..de424dfdb9a4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; @@ -121,6 +122,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getCreateDirectoryRequest().getKeyArgs(); + return lockOpr.bucketReadLock(keyArgs.getBucketName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.readUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long trxnLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java index f3df379103d3..448650dddccd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java @@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OzoneConfigUtil; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -160,6 +161,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getCreateFileRequest().getKeyArgs(); + return lockOpr.bucketReadLock(keyArgs.getBucketName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.readUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java index b995f7934537..6028eb0309db 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.QuotaUtil; @@ -149,6 +150,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getAllocateBlockRequest().getKeyArgs(); + return lockOpr.obsLock(keyArgs.getBucketName(), keyArgs.getKeyName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long trxnLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java index 25b09a203ec2..e4c55216778a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java @@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.ozone.OzoneManagerVersion; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -131,6 +132,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .setKeyArgs(resolvedKeyArgs)).build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getCommitKeyRequest().getKeyArgs(); + return lockOpr.obsLock(keyArgs.getBucketName(), keyArgs.getKeyName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java index 4ac619a3a47e..865d461379f5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.ozone.OzoneManagerVersion; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.OzoneConfigUtil; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -184,6 +185,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getCreateKeyRequest().getKeyArgs(); + return lockOpr.bucketReadLock(keyArgs.getBucketName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.readUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java index 3885c18aff3c..a43ca0afdc96 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyDeleteRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -106,6 +107,18 @@ protected KeyArgs resolveBucketAndCheckAcls(OzoneManager ozoneManager, () -> resolveBucketAndCheckKeyAcls(newKeyArgs.build(), ozoneManager, ACLType.DELETE)); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getDeleteKeyRequest().getKeyArgs(); + return lockOpr.obsLock(keyArgs.getBucketName(), keyArgs.getKeyName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java index 1c99fc1814a4..8faa9bd810ae 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequest.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.nio.file.InvalidPathException; +import java.util.Arrays; import java.util.Map; import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; @@ -118,6 +120,18 @@ protected KeyArgs resolveBucketAndCheckAcls(KeyArgs keyArgs, return resolvedArgs; } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getRenameKeyRequest().getKeyArgs(); + return lockOpr.obsLock(keyArgs.getBucketName(), Arrays.asList(keyArgs.getKeyName(), + getOmRequest().getRenameKeyRequest().getToKeyName())); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } @Override @SuppressWarnings("methodlength") diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeySetTimesRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeySetTimesRequest.java index 7c548029ce0e..81ab3d6522ca 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeySetTimesRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeySetTimesRequest.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneConsts; @@ -172,6 +173,18 @@ protected void apply(OmKeyInfo omKeyInfo) { } } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + OzoneManagerProtocolProtos.KeyArgs keyArgs = getOmRequest().getSetTimesRequest().getKeyArgs(); + return lockOpr.obsLock(keyArgs.getBucketName(), keyArgs.getKeyName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long trxnLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java index 27fcf55ef908..5851758be577 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysDeleteRequest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.util.Time; @@ -87,6 +88,18 @@ public OMKeysDeleteRequest(OMRequest omRequest, BucketLayout bucketLayout) { super(omRequest, bucketLayout); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + DeleteKeyArgs keyArgs = getOmRequest().getDeleteKeysRequest().getDeleteKeys(); + return lockOpr.obsLock(keyArgs.getBucketName(), keyArgs.getKeysList()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long trxnLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java index 64da82412566..75f258639f71 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeysRenameRequest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om.request.key; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; @@ -79,6 +80,23 @@ public OMKeysRenameRequest(OMRequest omRequest, BucketLayout bucketLayout) { super(omRequest, bucketLayout); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + RenameKeysArgs keyArgs = getOmRequest().getRenameKeysRequest().getRenameKeysArgs(); + List keyNameList = new ArrayList<>(); + keyArgs.getRenameKeysMapList().forEach(e -> { + keyNameList.add(e.getFromKeyName()); + keyNameList.add(e.getToKeyName()); + }); + return lockOpr.obsLock(keyArgs.getBucketName(), keyNameList); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java index c847caa9481a..7cddebb439ff 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/acl/OMKeyAclRequest.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.audit.AuditLogger; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneManager; @@ -63,6 +64,20 @@ public OMKeyAclRequest(OMRequest omRequest) { super(omRequest); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + ObjectParser objectParser = new ObjectParser(getPath(), ObjectType.KEY); + ResolvedBucket resolvedBucket = ozoneManager.resolveBucketLink( + Pair.of(objectParser.getVolume(), objectParser.getBucket()), false, false); + return lockOpr.obsLock(resolvedBucket.realBucket(), objectParser.getKey()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long trxnLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java index de9ff1db343c..97e148b83fac 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java @@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.helpers.KeyValueUtil; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneConfigUtil; @@ -109,6 +110,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getInitiateMultiPartUploadRequest().getKeyArgs(); + return lockOpr.bucketReadLock(keyArgs.getBucketName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.readUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java index c44d95492c84..56c984ef2bc0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.QuotaUtil; @@ -99,6 +100,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getAbortMultiPartUploadRequest().getKeyArgs(); + return lockOpr.obsLock(keyArgs.getBucketName(), keyArgs.getKeyName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long trxnLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java index 4997af5d7d5a..a401570e8bc9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OMMetadataManager; @@ -103,6 +104,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { resolvedArgs)).setUserInfo(getUserInfo()).build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getCommitMultiPartUploadRequest().getKeyArgs(); + return lockOpr.obsLock(keyArgs.getBucketName(), keyArgs.getKeyName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index 17b96eaf9d90..6b6422914057 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.ozone.om.OzoneConfigUtil; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.protocolPB.OMPBHelper; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; @@ -138,6 +139,18 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { resolvedArgs)).setUserInfo(getUserInfo()).build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + if (getBucketLayout() == BucketLayout.OBJECT_STORE) { + KeyArgs keyArgs = getOmRequest().getCompleteMultiPartUploadRequest().getKeyArgs(); + return lockOpr.obsLock(keyArgs.getBucketName(), keyArgs.getKeyName()); + } + return null; + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java index f6cb32a45d59..edb0f86e1683 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeCreateRequest.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; @@ -97,6 +98,14 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return lockOpr.volumeWriteLock(getOmRequest().getCreateVolumeRequest().getVolumeInfo().getVolume()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long transactionLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java index 9f1ad0f30c7b..e444f8b1ece0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeDeleteRequest.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos; import org.slf4j.Logger; @@ -64,6 +65,14 @@ public OMVolumeDeleteRequest(OMRequest omRequest) { super(omRequest); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return lockOpr.volumeWriteLock(getOmRequest().getDeleteVolumeRequest().getVolumeName()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long transactionLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java index 090b01869742..ebb666739fc5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetOwnerRequest.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneConsts; @@ -75,6 +76,14 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return lockOpr.volumeWriteLock(getOmRequest().getSetVolumePropertyRequest().getVolumeName()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long transactionLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java index 2174acf63e6a..d3dc0685c9dd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/OMVolumeSetQuotaRequest.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; @@ -82,6 +83,14 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { .build(); } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return lockOpr.volumeWriteLock(getOmRequest().getSetVolumePropertyRequest().getVolumeName()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long transactionLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAclRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAclRequest.java index 5a83720e0b09..1afcfda62182 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAclRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/volume/acl/OMVolumeAclRequest.java @@ -19,6 +19,12 @@ package org.apache.hadoop.ozone.om.request.volume.acl; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import java.io.IOException; +import java.nio.file.InvalidPathException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.AuditLogger; @@ -27,21 +33,15 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.hadoop.ozone.security.acl.OzoneObj; -import org.apache.hadoop.hdds.utils.db.cache.CacheKey; -import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.ratis.util.function.CheckedBiConsumer; -import java.io.IOException; -import java.nio.file.InvalidPathException; -import java.util.List; -import java.util.Map; - import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK; /** @@ -65,6 +65,14 @@ public interface VolumeAclOp extends omVolumeAclOp = aclOp; } + public OmLockOpr.OmLockInfo lock(OzoneManager ozoneManager, OmLockOpr lockOpr) throws IOException { + return lockOpr.volumeWriteLock(getVolumeName()); + } + + public void unlock(OmLockOpr lockOpr, OmLockOpr.OmLockInfo lockInfo) { + lockOpr.writeUnlock(lockInfo); + } + @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long trxnLogIndex = context.getIndex(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java index 91d234d4d0b4..574a9fceaa07 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java @@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; +import org.apache.hadoop.ozone.om.lock.OmLockOpr; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer; @@ -176,9 +177,11 @@ public OMResponse processRequest(OMRequest request) throws ServiceException { private OMResponse internalProcessRequest(OMRequest request) throws ServiceException { boolean s3Auth = false; + OmLockOpr.OmLockInfo lockInfo = null; try { if (request.hasS3Authentication()) { + // set authentication information to thread local, and this will be validated OzoneManager.setS3Auth(request.getS3Authentication()); try { s3Auth = true; @@ -190,24 +193,22 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep } } - if (!isRatisEnabled()) { - return submitRequestDirectlyToOM(request); - } - if (OmUtils.isReadOnly(request)) { return submitReadRequestToOM(request); } - // To validate credentials we have already verified leader status. - // This will skip of checking leader status again if request has S3Auth. - if (!s3Auth) { - OzoneManagerRatisUtils.checkLeaderStatus(ozoneManager); - } + if (isRatisEnabled()) { + // To validate credentials we have already verified leader status. + // This will skip of checking leader status again if request has S3Auth. + if (!s3Auth) { + OzoneManagerRatisUtils.checkLeaderStatus(ozoneManager); + } - // check retry cache - final OMResponse cached = omRatisServer.checkRetryCache(); - if (cached != null) { - return cached; + // check retry cache + final OMResponse cached = omRatisServer.checkRetryCache(); + if (cached != null) { + return cached; + } } // process new request @@ -231,16 +232,44 @@ private OMResponse internalProcessRequest(OMRequest request) throws ServiceExcep return createErrorResponse(request, ex); } - final OMResponse response = omRatisServer.submitRequest(requestToSubmit); - if (!response.getSuccess()) { - omClientRequest.handleRequestFailure(ozoneManager); + lockInfo = omClientRequest.lock(ozoneManager, ozoneManager.getOmLockOpr()); + try { + if (isRatisEnabled()) { + final OMResponse response = omRatisServer.submitRequest(requestToSubmit); + if (!response.getSuccess()) { + omClientRequest.handleRequestFailure(ozoneManager); + } + return response; + } else { + return submitRequestDirectlyToOM(request); + } + } finally { + performUnlock(omClientRequest, ozoneManager.getOmLockOpr(), lockInfo); } - return response; + } catch (IOException ex) { + return createErrorResponse(request, ex); } finally { OzoneManager.setS3Auth(null); } } + private static void performUnlock( + OMClientRequest omClientRequest, OmLockOpr omLockOpr, OmLockOpr.OmLockInfo lockInfo) { + if (null == lockInfo) { + return; + } + omClientRequest.unlock(omLockOpr, lockInfo); + Server.Call call = Server.getCurCall().get(); + if (null != call) { + call.getProcessingDetails().add(Timing.LOCKWAIT, + lockInfo.getWaitLockNanos(), TimeUnit.NANOSECONDS); + call.getProcessingDetails().add(Timing.LOCKSHARED, + lockInfo.getReadLockNanos(), TimeUnit.NANOSECONDS); + call.getProcessingDetails().add(Timing.LOCKEXCLUSIVE, + lockInfo.getWriteLockNanos(), TimeUnit.NANOSECONDS); + } + } + private OMRequest preExecute(OMClientRequest finalOmClientRequest) throws IOException { return captureLatencyNs(perfMetrics.getPreExecuteLatencyNs(), @@ -254,6 +283,10 @@ public OMRequest getLastRequestToSubmit() { private OMResponse submitReadRequestToOM(OMRequest request) throws ServiceException { + if (!isRatisEnabled()) { + return handler.handleReadRequest(request); + } + // Check if this OM is the leader. RaftServerStatus raftServerStatus = omRatisServer.checkLeaderStatus(); if (raftServerStatus == LEADER_AND_READY || @@ -291,22 +324,9 @@ private ServiceException createLeaderNotReadyException() { private OMResponse submitRequestDirectlyToOM(OMRequest request) { final OMClientResponse omClientResponse; try { - if (OmUtils.isReadOnly(request)) { - return handler.handleReadRequest(request); - } else { - OMClientRequest omClientRequest = - createClientRequest(request, ozoneManager); - try { - request = omClientRequest.preExecute(ozoneManager); - } catch (IOException ex) { - // log only when audit build is complete as required - OMAuditLogger.log(omClientRequest.getAuditBuilder()); - throw ex; - } - final TermIndex termIndex = TransactionInfo.getTermIndex(transactionIndex.incrementAndGet()); - final ExecutionContext context = ExecutionContext.of(termIndex.getIndex(), termIndex); - omClientResponse = handler.handleWriteRequest(request, context, ozoneManagerDoubleBuffer); - } + final TermIndex termIndex = TransactionInfo.getTermIndex(transactionIndex.incrementAndGet()); + final ExecutionContext context = ExecutionContext.of(termIndex.getIndex(), termIndex); + omClientResponse = handler.handleWriteRequest(request, context, ozoneManagerDoubleBuffer); } catch (IOException ex) { // As some preExecute returns error. So handle here. return createErrorResponse(request, ex); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestKeyLock.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestKeyLock.java new file mode 100644 index 000000000000..d2cbd80aa67e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestKeyLock.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test for TestKeyLock. + */ +public class TestKeyLock { + private static final Logger LOG = LoggerFactory.getLogger(TestKeyLock.class); + + @Test + public void testWriteLock() throws IOException { + KeyLock keyLock = new KeyLock(1, 100); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = keyLock.lock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> assertThrows(OMException.class, + () -> keyLock.lock("test"))); + rst.join(); + + lock.unlock(); + } + + @Test + public void testWriteThenReadLock() throws IOException { + KeyLock keyLock = new KeyLock(1, 100); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = keyLock.lock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + OMException exp = assertThrows(OMException.class, () -> keyLock.readLock("test")); + assertTrue(exp.getMessage().contains("read lock")); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testReadThenWriteLock() throws IOException { + KeyLock keyLock = new KeyLock(1, 100); + + // check if same lock is tried to be taken, it will throw exception with timeout + Lock lock = keyLock.readLock("test"); + CompletableFuture rst = CompletableFuture.runAsync(() -> { + OMException exp = assertThrows(OMException.class, () -> keyLock.lock("test")); + assertTrue(exp.getMessage().contains("write lock")); + }); + rst.join(); + + lock.unlock(); + } + + @Test + public void testLockListOrderSame() throws IOException { + KeyLock keyLock = new KeyLock(1, 100); + List locks = keyLock.lock(Arrays.asList("test", "test1")); + locks.forEach(Lock::unlock); + List lockReverseOrder = keyLock.lock(Arrays.asList("test1", "test2")); + lockReverseOrder.forEach(Lock::unlock); + + assertEquals(locks.get(0), lockReverseOrder.get(0)); + assertEquals(locks.get(1), lockReverseOrder.get(1)); + } + + @Test + public void testReadLockListOrderSame() throws IOException { + KeyLock keyLock = new KeyLock(1, 100); + List locks = keyLock.readLock(Arrays.asList("test", "test1")); + locks.forEach(Lock::unlock); + List lockReverseOrder = keyLock.readLock(Arrays.asList("test1", "test2")); + lockReverseOrder.forEach(Lock::unlock); + + assertEquals(locks.get(0), lockReverseOrder.get(0)); + assertEquals(locks.get(1), lockReverseOrder.get(1)); + } + + @Test + public void testLockListFailureOnRelock() throws IOException { + KeyLock keyLock = new KeyLock(1, 100); + List locks = keyLock.lock(Arrays.asList("test", "test1")); + + // test write lock failure + CompletableFuture rst = CompletableFuture.runAsync(() -> { + OMException exp = assertThrows(OMException.class, () -> keyLock.lock("test")); + assertTrue(exp.getMessage().contains("write lock")); + }); + rst.join(); + + // test read lock failure + rst = CompletableFuture.runAsync(() -> { + OMException exp = assertThrows(OMException.class, () -> keyLock.readLock("test1")); + assertTrue(exp.getMessage().contains("read lock")); + }); + rst.join(); + + locks.forEach(Lock::unlock); + + // verify if lock is success after unlock + Lock lock = keyLock.readLock("test"); + lock.unlock(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOpr.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOpr.java new file mode 100644 index 000000000000..aa657744c3af --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestOmLockOpr.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Test for TestOmLockOpr. + */ +public class TestOmLockOpr { + + @Test + public void testObsLockOprWithParallelLock() throws IOException, ExecutionException, InterruptedException { + OmLockOpr omLockOpr = new OmLockOpr("test-"); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.obsLock("bucket", "testkey"); + assertEquals(2, omLockInfo.getLocks().size()); + assertNotNull(omLockOpr.getLockMonitorMap().get(omLockInfo)); + + CompletableFuture rst = CompletableFuture.supplyAsync(() -> { + try { + OmLockOpr.OmLockInfo lockInfoAgain = omLockOpr.obsLock("bucket", "testkey"); + assertNotNull(omLockOpr.getLockMonitorMap().get(lockInfoAgain)); + omLockOpr.writeUnlock(lockInfoAgain); + return lockInfoAgain; + } catch (IOException e) { + fail("should not throw exception"); + } + return null; + }); + try { + rst.get(1000, TimeUnit.MILLISECONDS); + fail(); + } catch (TimeoutException e) { + assertTrue(true); + } + + omLockOpr.writeUnlock(omLockInfo); + assertNull(omLockOpr.getLockMonitorMap().get(omLockInfo)); + rst.get(); + assertEquals(0, omLockOpr.getLockMonitorMap().size()); + } + + @Test + public void testObsLockOprListKeyRepeated() throws IOException { + OmLockOpr omLockOpr = new OmLockOpr("test-"); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.obsLock("bucket", Arrays.asList("testkey", "testkey2")); + assertEquals(3, omLockInfo.getLocks().size()); + assertNotNull(omLockOpr.getLockMonitorMap().get(omLockInfo)); + + omLockOpr.writeUnlock(omLockInfo); + assertEquals(0, omLockOpr.getLockMonitorMap().size()); + + omLockInfo = omLockOpr.obsLock("bucket", Arrays.asList("testkey", "testkey2")); + assertEquals(3, omLockInfo.getLocks().size()); + omLockOpr.writeUnlock(omLockInfo); + assertEquals(0, omLockOpr.getLockMonitorMap().size()); + } + + @Test + public void testBucketReadLock() throws IOException { + OmLockOpr omLockOpr = new OmLockOpr("test-"); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.bucketReadLock("bucket"); + assertEquals(1, omLockInfo.getLocks().size()); + assertNotNull(omLockOpr.getLockMonitorMap().get(omLockInfo)); + + omLockOpr.readUnlock(omLockInfo); + assertEquals(0, omLockOpr.getLockMonitorMap().size()); + } + + @Test + public void testStartStopMonitorRepeated() throws IOException { + OmLockOpr omLockOpr = new OmLockOpr("test-"); + omLockOpr.start(); + OmLockOpr.OmLockInfo omLockInfo = omLockOpr.obsLock("bucket", "testkey"); + omLockOpr.monitor(); + omLockOpr.writeUnlock(omLockInfo); + omLockOpr.stop(); + + omLockOpr.start(); + omLockInfo = omLockOpr.obsLock("bucket", "testkey"); + omLockOpr.monitor(); + omLockOpr.writeUnlock(omLockInfo); + omLockOpr.stop(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestLock.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestLock.java new file mode 100644 index 000000000000..89e82366e02e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/lock/TestRequestLock.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.lock; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.Collections; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.protocol.StorageType; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.ResolvedBucket; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.om.request.bucket.OMBucketCreateRequest; +import org.apache.hadoop.ozone.om.request.bucket.OMBucketDeleteRequest; +import org.apache.hadoop.ozone.om.request.bucket.OMBucketSetOwnerRequest; +import org.apache.hadoop.ozone.om.request.bucket.OMBucketSetPropertyRequest; +import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketAddAclRequest; +import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketRemoveAclRequest; +import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketSetAclRequest; +import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest; +import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest; +import org.apache.hadoop.ozone.om.request.key.OMAllocateBlockRequest; +import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest; +import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest; +import org.apache.hadoop.ozone.om.request.key.OMKeyDeleteRequest; +import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest; +import org.apache.hadoop.ozone.om.request.key.OMKeySetTimesRequest; +import org.apache.hadoop.ozone.om.request.key.OMKeysDeleteRequest; +import org.apache.hadoop.ozone.om.request.key.OMKeysRenameRequest; +import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest; +import org.apache.hadoop.ozone.om.request.key.acl.OMKeyRemoveAclRequest; +import org.apache.hadoop.ozone.om.request.key.acl.OMKeySetAclRequest; +import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest; +import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest; +import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest; +import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCompleteRequest; +import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest; +import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest; +import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest; +import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetQuotaRequest; +import org.apache.hadoop.ozone.om.request.volume.acl.OMVolumeAddAclRequest; +import org.apache.hadoop.ozone.om.request.volume.acl.OMVolumeRemoveAclRequest; +import org.apache.hadoop.ozone.om.request.volume.acl.OMVolumeSetAclRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteBucketRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartCommitUploadPartRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadCompleteRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetBucketPropertyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetTimesRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test for obs request for lock. + */ +public class TestRequestLock { + private OzoneManager ozoneManager; + + @Test + public void testObsRequestLockUnlock() throws IOException { + ozoneManager = mock(OzoneManager.class); + ResolvedBucket resolvedBucket = new ResolvedBucket("testvol", "testbucket", + "testvol", "testbucket", "none", BucketLayout.OBJECT_STORE); + when(ozoneManager.resolveBucketLink(Mockito.any(Pair.class), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(resolvedBucket); + OMMetadataManager mockMetaManager = mock(OMMetadataManager.class); + when(ozoneManager.getMetadataManager()).thenReturn(mockMetaManager); + when(mockMetaManager.getBucketKey(anyString(), anyString())).thenReturn("bucketkey"); + Table mockTable = mock(Table.class); + when(mockMetaManager.getBucketTable()).thenReturn(mockTable); + when(mockTable.get(anyString())).thenReturn(OmBucketInfo.newBuilder().setBucketLayout(BucketLayout.OBJECT_STORE) + .setVolumeName("testvol").setBucketName("testbucket").setAcls(Collections.EMPTY_LIST) + .setStorageType(StorageType.SSD).build()); + + OMRequest omRequest; + OmLockOpr omLockOpr = new OmLockOpr("test-"); + KeyArgs keyArgs = KeyArgs.newBuilder().setVolumeName("testvol").setBucketName("testbucket").setKeyName("testkey") + .build(); + + CreateDirectoryRequest dirReq = CreateDirectoryRequest.newBuilder().setKeyArgs(keyArgs).build(); + omRequest = getReqBuilder(Type.CreateDirectory).setCreateDirectoryRequest(dirReq).build(); + validateLockUnlock(new OMDirectoryCreateRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 1); + + CreateFileRequest fileReq = CreateFileRequest.newBuilder().setKeyArgs(keyArgs).setIsRecursive(true) + .setIsOverwrite(true).build(); + omRequest = getReqBuilder(Type.CreateFile).setCreateFileRequest(fileReq).build(); + validateLockUnlock(new OMFileCreateRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 1); + + AllocateBlockRequest blockReq = AllocateBlockRequest.newBuilder().setKeyArgs(keyArgs).setClientID(123).build(); + omRequest = getReqBuilder(Type.AllocateBlock).setAllocateBlockRequest(blockReq).build(); + validateLockUnlock(new OMAllocateBlockRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 2); + + CommitKeyRequest commitReq = CommitKeyRequest.newBuilder().setKeyArgs(keyArgs).setClientID(123).build(); + omRequest = getReqBuilder(Type.CommitKey).setCommitKeyRequest(commitReq).build(); + validateLockUnlock(new OMKeyCommitRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 2); + + CreateKeyRequest createKeyRequest = CreateKeyRequest.newBuilder().setKeyArgs(keyArgs).build(); + omRequest = getReqBuilder(Type.CreateKey).setCreateKeyRequest(createKeyRequest).build(); + validateLockUnlock(new OMKeyCreateRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 1); + + DeleteKeyRequest deleteKeyRequest = DeleteKeyRequest.newBuilder().setKeyArgs(keyArgs).build(); + omRequest = getReqBuilder(Type.DeleteKey).setDeleteKeyRequest(deleteKeyRequest).build(); + validateLockUnlock(new OMKeyDeleteRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 2); + + RenameKeyRequest renameKeyRequest = RenameKeyRequest.newBuilder().setKeyArgs(keyArgs).setToKeyName("key2").build(); + omRequest = getReqBuilder(Type.RenameKey).setRenameKeyRequest(renameKeyRequest).build(); + validateLockUnlock(new OMKeyRenameRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 3); + + OzoneManagerProtocolProtos.DeleteKeyArgs deleteKeysArgs = OzoneManagerProtocolProtos.DeleteKeyArgs.newBuilder() + .addKeys("test1").addKeys("test2").setVolumeName("testvol").setBucketName("testbucket").build(); + DeleteKeysRequest deleteKeysRequest = DeleteKeysRequest.newBuilder().setDeleteKeys(deleteKeysArgs).build(); + omRequest = getReqBuilder(Type.DeleteKeys).setDeleteKeysRequest(deleteKeysRequest).build(); + validateLockUnlock(new OMKeysDeleteRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 3); + + SetTimesRequest setTimesRequest = SetTimesRequest.newBuilder().setKeyArgs(keyArgs).setMtime(123).setAtime(123) + .build(); + omRequest = getReqBuilder(Type.SetTimes).setSetTimesRequest(setTimesRequest).build(); + validateLockUnlock(new OMKeySetTimesRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 2); + + OzoneManagerProtocolProtos.RenameKeysMap renameKeysMap = OzoneManagerProtocolProtos.RenameKeysMap.newBuilder() + .setFromKeyName("test2").setToKeyName("test1").build(); + OzoneManagerProtocolProtos.RenameKeysArgs renameKeyArgs = OzoneManagerProtocolProtos.RenameKeysArgs.newBuilder() + .addRenameKeysMap(renameKeysMap).setVolumeName("testvol").setBucketName("testbucket").build(); + RenameKeysRequest renameKeysRequest = RenameKeysRequest.newBuilder().setRenameKeysArgs(renameKeyArgs).build(); + omRequest = getReqBuilder(Type.RenameKeys).setRenameKeysRequest(renameKeysRequest).build(); + validateLockUnlock(new OMKeysRenameRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 3); + + OzoneManagerProtocolProtos.OzoneObj objArg = OzoneManagerProtocolProtos.OzoneObj.newBuilder() + .setResType(OzoneManagerProtocolProtos.OzoneObj.ObjectType.KEY) + .setStoreType(OzoneManagerProtocolProtos.OzoneObj.StoreType.OZONE).setPath("/k/b/abc.txtß").build(); + OzoneManagerProtocolProtos.OzoneAclInfo aclObj = OzoneManagerProtocolProtos.OzoneAclInfo.newBuilder() + .setType(OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType.USER) + .setName("testuser").setRights(ByteString.copyFromUtf8("*")) + .setAclScope(OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclScope.DEFAULT) + .build(); + AddAclRequest aclAddRequest = AddAclRequest.newBuilder().setObj(objArg).setAcl(aclObj).build(); + omRequest = getReqBuilder(Type.AddAcl).setAddAclRequest(aclAddRequest).build(); + validateLockUnlock(new OMKeyAddAclRequest(omRequest, ozoneManager), omLockOpr, 2); + + RemoveAclRequest removeAclRequest = RemoveAclRequest.newBuilder().setObj(objArg).setAcl(aclObj).build(); + omRequest = getReqBuilder(Type.RemoveAcl).setRemoveAclRequest(removeAclRequest).build(); + validateLockUnlock(new OMKeyRemoveAclRequest(omRequest, ozoneManager), omLockOpr, 2); + + SetAclRequest setAclRequest = SetAclRequest.newBuilder().setObj(objArg).build(); + omRequest = getReqBuilder(Type.SetAcl).setSetAclRequest(setAclRequest).build(); + validateLockUnlock(new OMKeySetAclRequest(omRequest, ozoneManager), omLockOpr, 2); + + MultipartInfoInitiateRequest initiateReq = MultipartInfoInitiateRequest.newBuilder().setKeyArgs(keyArgs).build(); + omRequest = getReqBuilder(Type.InitiateMultiPartUpload).setInitiateMultiPartUploadRequest(initiateReq).build(); + validateLockUnlock(new S3InitiateMultipartUploadRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 1); + + MultipartUploadAbortRequest abortReq = MultipartUploadAbortRequest.newBuilder().setKeyArgs(keyArgs).build(); + omRequest = getReqBuilder(Type.AbortMultiPartUpload).setAbortMultiPartUploadRequest(abortReq).build(); + validateLockUnlock(new S3MultipartUploadAbortRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 2); + + MultipartCommitUploadPartRequest multipartCommitReq = MultipartCommitUploadPartRequest.newBuilder() + .setKeyArgs(keyArgs).setClientID(123).build(); + omRequest = getReqBuilder(Type.CommitMultiPartUpload).setCommitMultiPartUploadRequest(multipartCommitReq).build(); + validateLockUnlock(new S3MultipartUploadCommitPartRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 2); + + MultipartUploadCompleteRequest completeRequest = MultipartUploadCompleteRequest.newBuilder().setKeyArgs(keyArgs) + .build(); + omRequest = getReqBuilder(Type.CompleteMultiPartUpload).setCompleteMultiPartUploadRequest(completeRequest).build(); + validateLockUnlock(new S3MultipartUploadCompleteRequest(omRequest, BucketLayout.OBJECT_STORE), omLockOpr, 2); + } + + @Test + public void testBucketRequestLockUnlock() throws IOException { + ozoneManager = mock(OzoneManager.class); + ResolvedBucket resolvedBucket = new ResolvedBucket("testvol", "testbucket", + "testvol", "testbucket", "none", BucketLayout.OBJECT_STORE); + when(ozoneManager.resolveBucketLink(Mockito.any(Pair.class), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(resolvedBucket); + + OMRequest omRequest; + OmLockOpr omLockOpr = new OmLockOpr("test-"); + + OzoneManagerProtocolProtos.BucketInfo bucketInfo = OzoneManagerProtocolProtos.BucketInfo.newBuilder() + .setVolumeName("testvol").setBucketName("testbucket").setIsVersionEnabled(false) + .setStorageType(OzoneManagerProtocolProtos.StorageTypeProto.DISK).build(); + CreateBucketRequest createBucketRequest = CreateBucketRequest.newBuilder().setBucketInfo(bucketInfo).build(); + omRequest = getReqBuilder(Type.CreateBucket).setCreateBucketRequest(createBucketRequest).build(); + validateLockUnlock(new OMBucketCreateRequest(omRequest), omLockOpr, 2); + + DeleteBucketRequest deleteBucketRequest = DeleteBucketRequest.newBuilder().setBucketName("testbucket") + .setVolumeName("testvolume").build(); + omRequest = getReqBuilder(Type.DeleteBucket).setDeleteBucketRequest(deleteBucketRequest).build(); + validateLockUnlock(new OMBucketDeleteRequest(omRequest), omLockOpr, 1); + + OzoneManagerProtocolProtos.BucketArgs args = OzoneManagerProtocolProtos.BucketArgs.newBuilder() + .setVolumeName("testvol").setBucketName("tetbucket").build(); + SetBucketPropertyRequest bucketPropertyRequest = SetBucketPropertyRequest.newBuilder().setBucketArgs(args).build(); + omRequest = getReqBuilder(Type.SetBucketProperty).setSetBucketPropertyRequest(bucketPropertyRequest).build(); + validateLockUnlock(new OMBucketSetOwnerRequest(omRequest), omLockOpr, 1); + validateLockUnlock(new OMBucketSetPropertyRequest(omRequest), omLockOpr, 1); + + OzoneManagerProtocolProtos.OzoneObj objArg = OzoneManagerProtocolProtos.OzoneObj.newBuilder() + .setResType(OzoneManagerProtocolProtos.OzoneObj.ObjectType.BUCKET) + .setStoreType(OzoneManagerProtocolProtos.OzoneObj.StoreType.OZONE).setPath("/k/b").build(); + OzoneManagerProtocolProtos.OzoneAclInfo aclObj = OzoneManagerProtocolProtos.OzoneAclInfo.newBuilder() + .setType(OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType.USER) + .setName("testuser").setRights(ByteString.copyFromUtf8("*")) + .setAclScope(OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclScope.DEFAULT) + .build(); + AddAclRequest aclAddRequest = AddAclRequest.newBuilder().setObj(objArg).setAcl(aclObj).build(); + omRequest = getReqBuilder(Type.AddAcl).setAddAclRequest(aclAddRequest).build(); + validateLockUnlock(new OMBucketAddAclRequest(omRequest), omLockOpr, 2); + + RemoveAclRequest removeAclRequest = RemoveAclRequest.newBuilder().setObj(objArg).setAcl(aclObj).build(); + omRequest = getReqBuilder(Type.RemoveAcl).setRemoveAclRequest(removeAclRequest).build(); + validateLockUnlock(new OMBucketRemoveAclRequest(omRequest), omLockOpr, 2); + + SetAclRequest setAclRequest = SetAclRequest.newBuilder().setObj(objArg).build(); + omRequest = getReqBuilder(Type.SetAcl).setSetAclRequest(setAclRequest).build(); + validateLockUnlock(new OMBucketSetAclRequest(omRequest), omLockOpr, 2); + } + + @Test + public void testVolumeRequestLockUnlock() throws IOException { + ozoneManager = mock(OzoneManager.class); + ResolvedBucket resolvedBucket = new ResolvedBucket("testvol", "testbucket", + "testvol", "testbucket", "none", BucketLayout.OBJECT_STORE); + when(ozoneManager.resolveBucketLink(Mockito.any(Pair.class), Mockito.anyBoolean(), Mockito.anyBoolean())) + .thenReturn(resolvedBucket); + + OMRequest omRequest; + OmLockOpr omLockOpr = new OmLockOpr("test-"); + + OzoneManagerProtocolProtos.VolumeInfo volumeInfo = OzoneManagerProtocolProtos.VolumeInfo.newBuilder() + .setVolume("testvol").setAdminName("admin").setOwnerName("owner").build(); + CreateVolumeRequest createVolumeRequest = CreateVolumeRequest.newBuilder().setVolumeInfo(volumeInfo).build(); + omRequest = getReqBuilder(Type.CreateVolume).setCreateVolumeRequest(createVolumeRequest).build(); + validateLockUnlock(new OMVolumeCreateRequest(omRequest), omLockOpr, 1); + + DeleteVolumeRequest deleteVolumeRequest = DeleteVolumeRequest.newBuilder().setVolumeName("testvolume").build(); + omRequest = getReqBuilder(Type.DeleteVolume).setDeleteVolumeRequest(deleteVolumeRequest).build(); + validateLockUnlock(new OMVolumeDeleteRequest(omRequest), omLockOpr, 1); + + SetVolumePropertyRequest volumePropertyRequest = SetVolumePropertyRequest.newBuilder().setVolumeName("testVolume") + .build(); + omRequest = getReqBuilder(Type.SetVolumeProperty).setSetVolumePropertyRequest(volumePropertyRequest).build(); + validateLockUnlock(new OMVolumeSetOwnerRequest(omRequest), omLockOpr, 1); + validateLockUnlock(new OMVolumeSetQuotaRequest(omRequest), omLockOpr, 1); + + OzoneManagerProtocolProtos.OzoneObj objArg = OzoneManagerProtocolProtos.OzoneObj.newBuilder() + .setResType(OzoneManagerProtocolProtos.OzoneObj.ObjectType.VOLUME) + .setStoreType(OzoneManagerProtocolProtos.OzoneObj.StoreType.OZONE).setPath("/k").build(); + OzoneManagerProtocolProtos.OzoneAclInfo aclObj = OzoneManagerProtocolProtos.OzoneAclInfo.newBuilder() + .setType(OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclType.USER) + .setName("testuser").setRights(ByteString.copyFromUtf8("*")) + .setAclScope(OzoneManagerProtocolProtos.OzoneAclInfo.OzoneAclScope.DEFAULT) + .build(); + AddAclRequest aclAddRequest = AddAclRequest.newBuilder().setObj(objArg).setAcl(aclObj).build(); + omRequest = getReqBuilder(Type.AddAcl).setAddAclRequest(aclAddRequest).build(); + validateLockUnlock(new OMVolumeAddAclRequest(omRequest), omLockOpr, 1); + + RemoveAclRequest removeAclRequest = RemoveAclRequest.newBuilder().setObj(objArg).setAcl(aclObj).build(); + omRequest = getReqBuilder(Type.RemoveAcl).setRemoveAclRequest(removeAclRequest).build(); + validateLockUnlock(new OMVolumeRemoveAclRequest(omRequest), omLockOpr, 1); + + SetAclRequest setAclRequest = SetAclRequest.newBuilder().setObj(objArg).build(); + omRequest = getReqBuilder(Type.SetAcl).setSetAclRequest(setAclRequest).build(); + validateLockUnlock(new OMVolumeSetAclRequest(omRequest), omLockOpr, 1); + } + + private OMRequest.Builder getReqBuilder(Type type) { + return OMRequest.newBuilder().setClientId("clitest").setCmdType(type); + } + + private void validateLockUnlock(OMClientRequest clientRequest, OmLockOpr omLockOpr, long lockCnt) throws IOException { + OmLockOpr.OmLockInfo lockInfo = clientRequest.lock(ozoneManager, omLockOpr); + assertEquals(lockCnt, lockInfo.getLocks().size()); + clientRequest.unlock(omLockOpr, lockInfo); + assertEquals(0, omLockOpr.getLockMonitorMap().size()); + + OmLockOpr.OmLockInfo lockInfoAgain = clientRequest.lock(ozoneManager, omLockOpr); + assertEquals(lockCnt, lockInfoAgain.getLocks().size()); + clientRequest.unlock(omLockOpr, lockInfoAgain); + assertEquals(0, omLockOpr.getLockMonitorMap().size()); + } +}