diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 462b1e4331f4..0a2e16a5a735 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -689,6 +689,14 @@
hdds.container.ratis.datanode.storage.dir be configured separately.
+
+ ozone.path.deleting.limit.per.task
+ 20000
+ OZONE, PERFORMANCE, OM
+ A maximum number of paths(dirs/files) to be deleted by
+ directory deleting service per time interval.
+
+
ozone.metadata.dirs.permissions
750
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 254a49ea9a99..3ce10ec1d142 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -388,6 +388,11 @@ public final class OMConfigKeys {
public static final String OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT
= "60s";
+ public static final String OZONE_PATH_DELETING_LIMIT_PER_TASK =
+ "ozone.path.deleting.limit.per.task";
+ // default is 20000 taking account of 32MB buffer size
+ public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 20000;
+
/**
* Configuration properties for Snapshot Directory Service.
*/
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
index 81e1dd3b4442..eb77ac1dce3e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
@@ -41,6 +41,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import org.apache.commons.lang3.RandomStringUtils;
@@ -623,9 +624,9 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution()
}
return null;
}).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(),
- anyLong(), anyList(), anyList(), eq(null), anyLong(), anyLong(), any(),
+ anyLong(), anyList(), anyList(), eq(null), anyLong(), any(),
any(ReclaimableDirFilter.class), any(ReclaimableKeyFilter.class), anyMap(), any(),
- anyLong());
+ anyLong(), any(AtomicInteger.class));
Mockito.doAnswer(i -> {
store.createSnapshot(testVolumeName, testBucketName, snap2);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
index 60378467d6d5..2b685edf273d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
@@ -27,14 +27,11 @@
public class DeleteKeysResult {
private List keysToDelete;
- private long consumedSize;
private boolean processedKeys;
- public DeleteKeysResult(List keysToDelete,
- long consumedSize, boolean processedKeys) {
+ public DeleteKeysResult(List keysToDelete, boolean processedKeys) {
this.keysToDelete = keysToDelete;
- this.consumedSize = consumedSize;
this.processedKeys = processedKeys;
}
@@ -42,11 +39,8 @@ public List getKeysToDelete() {
return keysToDelete;
}
- public long getConsumedSize() {
- return consumedSize;
- }
-
public boolean isProcessedKeys() {
return processedKeys;
}
+
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 872a99e94b15..b0562049f715 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -306,9 +306,9 @@ default List> getDeletedDirEntries(String volu
* @return list of dirs
* @throws IOException
*/
- DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId,
- OmKeyInfo parentInfo, CheckedFunction, Boolean, IOException> filter,
- long remainingBufLimit) throws IOException;
+ DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId, OmKeyInfo parentInfo,
+ CheckedFunction, Boolean, IOException> filter, int remainingNum)
+ throws IOException;
/**
* Returns all sub files under the given parent directory.
@@ -317,10 +317,9 @@ DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId,
* @return list of files
* @throws IOException
*/
- DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
- long bucketId, OmKeyInfo parentInfo,
- CheckedFunction, Boolean, IOException> filter, long remainingBufLimit)
- throws IOException;
+ DeleteKeysResult getPendingDeletionSubFiles(long volumeId, long bucketId, OmKeyInfo parentInfo,
+ CheckedFunction, Boolean, IOException> filter, int remainingNum)
+ throws IOException;
/**
* Returns the instance of Directory Deleting Service.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index e458fa73236a..7ad2f6d2e72e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -2264,49 +2264,37 @@ private void slimLocationVersion(OmKeyInfo... keyInfos) {
}
@Override
- public DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId,
- OmKeyInfo parentInfo, CheckedFunction, Boolean, IOException> filter,
- long remainingBufLimit) throws IOException {
+ public DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId, OmKeyInfo parentInfo,
+ CheckedFunction, Boolean, IOException> filter, int remainingNum) throws IOException {
return gatherSubPathsWithIterator(volumeId, bucketId, parentInfo, metadataManager.getDirectoryTable(),
kv -> Table.newKeyValue(metadataManager.getOzoneDeletePathKey(kv.getValue().getObjectID(), kv.getKey()),
- OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue())),
- filter, remainingBufLimit);
+ OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue())), filter, remainingNum);
}
- private DeleteKeysResult gatherSubPathsWithIterator(
- long volumeId, long bucketId, OmKeyInfo parentInfo,
- Table table,
+ private DeleteKeysResult gatherSubPathsWithIterator(long volumeId, long bucketId,
+ OmKeyInfo parentInfo, Table table,
CheckedFunction, KeyValue, IOException> deleteKeyTransformer,
- CheckedFunction, Boolean, IOException> deleteKeyFilter,
- long remainingBufLimit) throws IOException {
+ CheckedFunction, Boolean, IOException> deleteKeyFilter, int remainingNum)
+ throws IOException {
List keyInfos = new ArrayList<>();
- String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId,
- parentInfo.getObjectID(), "");
- long consumedSize = 0;
+ String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId, parentInfo.getObjectID(), "");
try (TableIterator> iterator = table.iterator(seekFileInDB)) {
- while (iterator.hasNext() && remainingBufLimit > 0) {
+ while (iterator.hasNext() && remainingNum > 0) {
KeyValue entry = iterator.next();
- final long objectSerializedSize = entry.getValueByteSize();
- // No need to check the table again as the value in cache and iterator would be same when directory
- // deleting service runs.
- if (remainingBufLimit - objectSerializedSize < 0) {
- break;
- }
KeyValue keyInfo = deleteKeyTransformer.apply(entry);
if (deleteKeyFilter.apply(keyInfo)) {
keyInfos.add(keyInfo.getValue());
- remainingBufLimit -= objectSerializedSize;
- consumedSize += objectSerializedSize;
+ remainingNum--;
}
}
- return new DeleteKeysResult(keyInfos, consumedSize, !iterator.hasNext());
+ return new DeleteKeysResult(keyInfos, !iterator.hasNext());
}
}
@Override
public DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
long bucketId, OmKeyInfo parentInfo,
- CheckedFunction, Boolean, IOException> filter, long remainingBufLimit)
+ CheckedFunction, Boolean, IOException> filter, int remainingNum)
throws IOException {
CheckedFunction, KeyValue, IOException> tranformer = kv -> {
OmKeyInfo keyInfo = OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue());
@@ -2315,7 +2303,7 @@ public DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
return Table.newKeyValue(deleteKey, keyInfo);
};
return gatherSubPathsWithIterator(volumeId, bucketId, parentInfo, metadataManager.getFileTable(), tranformer,
- filter, remainingBufLimit);
+ filter, remainingNum);
}
public boolean isBucketFSOptimized(String volName, String buckName)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 001e686455f1..f73ee72e5954 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -19,6 +19,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
@@ -29,6 +31,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -43,6 +46,7 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -161,6 +165,7 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
private final AtomicLong deletedDirsCount;
private final AtomicLong movedDirsCount;
private final AtomicLong movedFilesCount;
+ private final int pathLimitPerTask;
public DirectoryDeletingService(long interval, TimeUnit unit,
long serviceTimeout, OzoneManager ozoneManager,
@@ -182,6 +187,8 @@ public DirectoryDeletingService(long interval, TimeUnit unit,
this.deletedDirsCount = new AtomicLong(0);
this.movedDirsCount = new AtomicLong(0);
this.movedFilesCount = new AtomicLong(0);
+ this.pathLimitPerTask =
+ configuration.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK, OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
}
public void registerReconfigCallbacks(ReconfigurationHandler handler) {
@@ -262,31 +269,28 @@ void optimizeDirDeletesAndSubmitRequest(
List> allSubDirList,
List purgePathRequestList,
String snapTableKey, long startTime,
- long remainingBufLimit, KeyManager keyManager,
+ KeyManager keyManager,
CheckedFunction, Boolean, IOException> reclaimableDirChecker,
CheckedFunction, Boolean, IOException> reclaimableFileChecker,
Map bucketNameInfoMap,
- UUID expectedPreviousSnapshotId, long rnCnt) throws InterruptedException {
+ UUID expectedPreviousSnapshotId, long rnCnt, AtomicInteger remainNum) throws InterruptedException {
// Optimization to handle delete sub-dir and keys to remove quickly
// This case will be useful to handle when depth of directory is high
int subdirDelNum = 0;
int subDirRecursiveCnt = 0;
- int consumedSize = 0;
- while (subDirRecursiveCnt < allSubDirList.size() && remainingBufLimit > 0) {
+ while (subDirRecursiveCnt < allSubDirList.size() && remainNum.get() > 0) {
try {
Pair stringOmKeyInfoPair = allSubDirList.get(subDirRecursiveCnt++);
Boolean subDirectoryReclaimable = reclaimableDirChecker.apply(Table.newKeyValue(stringOmKeyInfoPair.getKey(),
stringOmKeyInfoPair.getValue()));
Optional request = prepareDeleteDirRequest(
stringOmKeyInfoPair.getValue(), stringOmKeyInfoPair.getKey(), subDirectoryReclaimable, allSubDirList,
- keyManager, reclaimableFileChecker, remainingBufLimit);
+ keyManager, reclaimableFileChecker, remainNum);
if (!request.isPresent()) {
continue;
}
PurgePathRequest requestVal = request.get();
- consumedSize += requestVal.getSerializedSize();
- remainingBufLimit -= consumedSize;
purgePathRequestList.add(requestVal);
// Count up the purgeDeletedDir, subDirs and subFiles
if (requestVal.hasDeletedDir() && !StringUtils.isBlank(requestVal.getDeletedDir())) {
@@ -301,7 +305,7 @@ void optimizeDirDeletesAndSubmitRequest(
}
}
if (!purgePathRequestList.isEmpty()) {
- submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId, bucketNameInfoMap);
+ submitPurgePathsWithBatching(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId, bucketNameInfoMap);
}
if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -379,7 +383,7 @@ private Optional prepareDeleteDirRequest(
List> subDirList,
KeyManager keyManager,
CheckedFunction, Boolean, IOException> reclaimableFileFilter,
- long remainingBufLimit) throws IOException {
+ AtomicInteger remainNum) throws IOException {
// step-0: Get one pending deleted directory
if (LOG.isDebugEnabled()) {
LOG.debug("Pending deleted dir name: {}",
@@ -390,11 +394,12 @@ private Optional prepareDeleteDirRequest(
.getVolumeBucketIdPairFSO(delDirName);
// step-1: get all sub directories under the deletedDir
+ int remainingNum = remainNum.get();
DeleteKeysResult subDirDeleteResult =
keyManager.getPendingDeletionSubDirs(volumeBucketId.getVolumeId(), volumeBucketId.getBucketId(),
- pendingDeletedDirInfo, keyInfo -> true, remainingBufLimit);
+ pendingDeletedDirInfo, keyInfo -> true, remainingNum);
List subDirs = subDirDeleteResult.getKeysToDelete();
- remainingBufLimit -= subDirDeleteResult.getConsumedSize();
+ remainNum.addAndGet(-subDirs.size());
OMMetadataManager omMetadataManager = keyManager.getMetadataManager();
for (OmKeyInfo dirInfo : subDirs) {
@@ -408,10 +413,12 @@ private Optional prepareDeleteDirRequest(
// step-2: get all sub files under the deletedDir
// Only remove sub files if the parent directory is going to be deleted or can be reclaimed.
+ remainingNum = remainNum.get();
DeleteKeysResult subFileDeleteResult =
keyManager.getPendingDeletionSubFiles(volumeBucketId.getVolumeId(), volumeBucketId.getBucketId(),
- pendingDeletedDirInfo, keyInfo -> purgeDir || reclaimableFileFilter.apply(keyInfo), remainingBufLimit);
+ pendingDeletedDirInfo, keyInfo -> purgeDir || reclaimableFileFilter.apply(keyInfo), remainingNum);
List subFiles = subFileDeleteResult.getKeysToDelete();
+ remainNum.addAndGet(-subFiles.size());
if (LOG.isDebugEnabled()) {
for (OmKeyInfo fileInfo : subFiles) {
@@ -421,11 +428,14 @@ private Optional prepareDeleteDirRequest(
// step-3: If both sub-dirs and sub-files are exhausted under a parent
// directory, only then delete the parent.
- String purgeDeletedDir = purgeDir && subDirDeleteResult.isProcessedKeys() &&
- subFileDeleteResult.isProcessedKeys() ? delDirName : null;
+ String purgeDeletedDir =
+ purgeDir && subDirDeleteResult.isProcessedKeys() && subFileDeleteResult.isProcessedKeys() ? delDirName : null;
if (purgeDeletedDir == null && subFiles.isEmpty() && subDirs.isEmpty()) {
return Optional.empty();
}
+ if (purgeDeletedDir != null) {
+ remainNum.addAndGet(-1);
+ }
return Optional.of(wrapPurgeRequest(volumeBucketId.getVolumeId(), volumeBucketId.getBucketId(),
purgeDeletedDir, subFiles, subDirs));
}
@@ -460,9 +470,51 @@ private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest(
return purgePathsRequest.build();
}
- private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List requests,
+ private List submitPurgePathsWithBatching(List requests,
String snapTableKey, UUID expectedPreviousSnapshotId, Map bucketNameInfoMap)
throws InterruptedException {
+
+ List responses = new ArrayList<>();
+ List purgePathRequestBatch = new ArrayList<>();
+ long batchBytes = 0;
+
+ for (PurgePathRequest req : requests) {
+ int reqSize = req.getSerializedSize();
+
+ // If adding this request would exceed the limit, flush the current batch first
+ if (batchBytes + reqSize > ratisByteLimit && !purgePathRequestBatch.isEmpty()) {
+ OzoneManagerProtocolProtos.OMResponse resp =
+ submitPurgeRequest(snapTableKey, expectedPreviousSnapshotId, bucketNameInfoMap, purgePathRequestBatch);
+ if (!resp.getSuccess()) {
+ return Collections.emptyList();
+ }
+ responses.add(resp);
+ purgePathRequestBatch.clear();
+ batchBytes = 0;
+ }
+
+ // Add current request to batch
+ purgePathRequestBatch.add(req);
+ batchBytes += reqSize;
+ }
+
+ // Flush remaining batch if any
+ if (!purgePathRequestBatch.isEmpty()) {
+ OzoneManagerProtocolProtos.OMResponse resp =
+ submitPurgeRequest(snapTableKey, expectedPreviousSnapshotId, bucketNameInfoMap, purgePathRequestBatch);
+ if (!resp.getSuccess()) {
+ return Collections.emptyList();
+ }
+ responses.add(resp);
+ }
+
+ return responses;
+ }
+
+ @VisibleForTesting
+ OzoneManagerProtocolProtos.OMResponse submitPurgeRequest(String snapTableKey,
+ UUID expectedPreviousSnapshotId, Map bucketNameInfoMap,
+ List pathRequests) throws InterruptedException {
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
@@ -476,17 +528,14 @@ private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List
- new VolumeBucketId(purgePathRequest.getVolumeId(), purgePathRequest.getBucketId())).distinct()
- .map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList()));
+ purgeDirRequest.addAllDeletedPath(pathRequests);
+ purgeDirRequest.addAllBucketNameInfos(pathRequests.stream()
+ .map(purgePathRequest -> new VolumeBucketId(purgePathRequest.getVolumeId(), purgePathRequest.getBucketId()))
+ .distinct().map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList()));
OzoneManagerProtocolProtos.OMRequest omRequest =
- OzoneManagerProtocolProtos.OMRequest.newBuilder()
- .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
- .setPurgeDirectoriesRequest(purgeDirRequest)
- .setClientId(getClientId().toString())
- .build();
+ OzoneManagerProtocolProtos.OMRequest.newBuilder().setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
+ .setPurgeDirectoriesRequest(purgeDirRequest).setClientId(getClientId().toString()).build();
// Submit Purge paths request to OM. Acquire bootstrap lock when processing deletes for snapshots.
try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) {
@@ -528,8 +577,8 @@ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ
* @param keyManager KeyManager of the underlying store.
*/
@VisibleForTesting
- void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager,
- long remainingBufLimit, long rnCnt) throws IOException, ExecutionException, InterruptedException {
+ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, long rnCnt, int remainNum)
+ throws IOException, ExecutionException, InterruptedException {
String volume, bucket; String snapshotTableKey;
if (currentSnapshotInfo != null) {
volume = currentSnapshotInfo.getVolumeName();
@@ -553,8 +602,8 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key
for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
- return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, remainingBufLimit,
- expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt);
+ return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier,
+ expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, remainNum);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
@@ -597,16 +646,16 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key
* @param currentSnapshotInfo Information about the current snapshot whose deleted directories are being processed.
* @param keyManager Key manager of the underlying storage system to handle key operations.
* @param dirSupplier Supplier for fetching pending deleted directories to be processed.
- * @param remainingBufLimit Remaining buffer limit for processing directories and files.
* @param expectedPreviousSnapshotId The UUID of the previous snapshot expected in the chain.
* @param totalExclusiveSizeMap A map for storing total exclusive size and exclusive replicated size
* for each snapshot.
* @param runCount The number of times the processing task has been executed.
+ * @param remaining Number of dirs to be processed.
* @return A boolean indicating whether the processed directory list is empty.
*/
private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyManager keyManager,
- DeletedDirSupplier dirSupplier, long remainingBufLimit, UUID expectedPreviousSnapshotId,
- Map> totalExclusiveSizeMap, long runCount) throws InterruptedException {
+ DeletedDirSupplier dirSupplier, UUID expectedPreviousSnapshotId,
+ Map> totalExclusiveSizeMap, long runCount, int remaining) throws InterruptedException {
OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager();
IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock();
String snapshotTableKey = currentSnapshotInfo == null ? null : currentSnapshotInfo.getTableKey();
@@ -618,12 +667,12 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM
long dirNum = 0L;
long subDirNum = 0L;
long subFileNum = 0L;
- int consumedSize = 0;
List purgePathRequestList = new ArrayList<>();
Map bucketNameInfos = new HashMap<>();
+ AtomicInteger remainNum = new AtomicInteger(remaining);
List> allSubDirList = new ArrayList<>();
- while (remainingBufLimit > 0) {
+ while (remainNum.get() > 0) {
KeyValue pendingDeletedDirInfo = dirSupplier.get();
if (pendingDeletedDirInfo == null) {
break;
@@ -642,13 +691,11 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM
Optional request = prepareDeleteDirRequest(
pendingDeletedDirInfo.getValue(),
pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList,
- getOzoneManager().getKeyManager(), reclaimableFileFilter, remainingBufLimit);
+ getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum);
if (!request.isPresent()) {
continue;
}
PurgePathRequest purgePathRequest = request.get();
- consumedSize += purgePathRequest.getSerializedSize();
- remainingBufLimit -= consumedSize;
purgePathRequestList.add(purgePathRequest);
// Count up the purgeDeletedDir, subDirs and subFiles
if (purgePathRequest.hasDeletedDir() && !StringUtils.isBlank(purgePathRequest.getDeletedDir())) {
@@ -660,9 +707,9 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM
optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
- startTime, remainingBufLimit, getOzoneManager().getKeyManager(),
+ startTime, getOzoneManager().getKeyManager(),
reclaimableDirFilter, reclaimableFileFilter, bucketNameInfos, expectedPreviousSnapshotId,
- runCount);
+ runCount, remainNum);
Map exclusiveReplicatedSizeMap = reclaimableFileFilter.getExclusiveReplicatedSizeMap();
Map exclusiveSizeMap = reclaimableFileFilter.getExclusiveSizeMap();
List previousPathSnapshotsInChain =
@@ -722,7 +769,7 @@ public BackgroundTaskResult call() {
snapInfo.getName())) {
KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager()
: omSnapshot.get().getKeyManager();
- processDeletedDirsForStore(snapInfo, keyManager, ratisByteLimit, run);
+ processDeletedDirsForStore(snapInfo, keyManager, run, pathLimitPerTask);
}
} catch (IOException | ExecutionException e) {
LOG.error("Error while running delete files background task for store {}. Will retry at next run.",
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/MultiSnapshotLocks.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/MultiSnapshotLocks.java
index 525877306965..e353e1044666 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/MultiSnapshotLocks.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/MultiSnapshotLocks.java
@@ -61,8 +61,10 @@ public synchronized OMLockDetails acquireLock(Collection ids) throws OMExc
lock.acquireReadLocks(resource, keys);
if (omLockDetails.isLockAcquired()) {
objectLocks.addAll(keys);
+ this.lockDetails = OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED;
+ } else {
+ this.lockDetails = OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
}
- this.lockDetails = omLockDetails;
return omLockDetails;
}
@@ -72,6 +74,8 @@ public synchronized void releaseLock() {
} else {
lockDetails = lock.releaseReadLocks(resource, this.objectLocks);
}
+ this.lockDetails = lockDetails.isLockAcquired() ? OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED :
+ OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
this.objectLocks.clear();
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index 81c9dc465544..3da9b0cf3440 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -19,6 +19,8 @@
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED;
+import static org.apache.hadoop.ozone.om.lock.OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;
import com.google.common.annotations.VisibleForTesting;
@@ -287,16 +289,35 @@ public void release(UUID key) {
*/
public UncheckedAutoCloseableSupplier lock() {
return lock(() -> lock.acquireResourceWriteLock(SNAPSHOT_DB_LOCK),
- () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK));
+ () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK), () -> cleanup(true));
}
- private UncheckedAutoCloseableSupplier lock(
- Supplier lockFunction, Supplier unlockFunction) {
- AtomicReference lockDetails = new AtomicReference<>(lockFunction.get());
+ /**
+ * Acquires a write lock on a specific snapshot database and returns an auto-closeable supplier for lock details.
+ * The lock ensures that the operations accessing the snapshot database are performed in a thread safe manner. The
+ * returned supplier automatically releases the lock acquired when closed, preventing potential resource
+ * contention or deadlocks.
+ */
+ public UncheckedAutoCloseableSupplier lock(UUID snapshotId) {
+ return lock(() -> lock.acquireWriteLock(SNAPSHOT_DB_LOCK, snapshotId.toString()),
+ () -> lock.releaseWriteLock(SNAPSHOT_DB_LOCK, snapshotId.toString()),
+ () -> cleanup(snapshotId));
+ }
+
+ private OMLockDetails getEmptyOmLockDetails(OMLockDetails lockDetails) {
+ return lockDetails.isLockAcquired() ? EMPTY_DETAILS_LOCK_ACQUIRED : EMPTY_DETAILS_LOCK_NOT_ACQUIRED;
+ }
+
+ private UncheckedAutoCloseableSupplier lock(Supplier lockFunction,
+ Supplier unlockFunction, Supplier cleanupFunction) {
+ Supplier emptyLockFunction = () -> getEmptyOmLockDetails(lockFunction.get());
+ Supplier emptyUnlockFunction = () -> getEmptyOmLockDetails(unlockFunction.get());
+
+ AtomicReference lockDetails = new AtomicReference<>(emptyLockFunction.get());
if (lockDetails.get().isLockAcquired()) {
- cleanup(true);
+ cleanupFunction.get();
if (!dbMap.isEmpty()) {
- lockDetails.set(unlockFunction.get());
+ lockDetails.set(emptyUnlockFunction.get());
}
}
@@ -306,7 +327,7 @@ private UncheckedAutoCloseableSupplier lock(
public void close() {
lockDetails.updateAndGet((prevLock) -> {
if (prevLock != null && prevLock.isLockAcquired()) {
- return unlockFunction.get();
+ return emptyUnlockFunction.get();
}
return prevLock;
});
@@ -323,43 +344,49 @@ public OMLockDetails get() {
* If cache size exceeds soft limit, attempt to clean up and close the
instances that has zero reference count.
*/
- private synchronized void cleanup(boolean force) {
+ private synchronized Void cleanup(boolean force) {
if (force || dbMap.size() > cacheSizeLimit) {
for (UUID evictionKey : pendingEvictionQueue) {
- ReferenceCounted snapshot = dbMap.get(evictionKey);
- if (snapshot != null && snapshot.getTotalRefCount() == 0) {
- try {
- compactSnapshotDB(snapshot.get());
- } catch (IOException e) {
- LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}",
- evictionKey, e.getMessage());
- }
- }
-
- dbMap.compute(evictionKey, (k, v) -> {
- pendingEvictionQueue.remove(k);
- if (v == null) {
- throw new IllegalStateException("SnapshotId '" + k + "' does not exist in cache. The RocksDB " +
- "instance of the Snapshot may not be closed properly.");
- }
+ cleanup(evictionKey);
+ }
+ }
+ return null;
+ }
- if (v.getTotalRefCount() > 0) {
- LOG.debug("SnapshotId {} is still being referenced ({}), skipping its clean up.", k, v.getTotalRefCount());
- return v;
- } else {
- LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k);
- // Close the instance, which also closes its DB handle.
- try {
- v.get().close();
- } catch (IOException ex) {
- throw new IllegalStateException("Error while closing snapshot DB.", ex);
- }
- omMetrics.decNumSnapshotCacheSize();
- return null;
- }
- });
+ private synchronized Void cleanup(UUID evictionKey) {
+ ReferenceCounted snapshot = dbMap.get(evictionKey);
+ if (snapshot != null && snapshot.getTotalRefCount() == 0) {
+ try {
+ compactSnapshotDB(snapshot.get());
+ } catch (IOException e) {
+ LOG.warn("Failed to compact snapshot DB for snapshotId {}: {}",
+ evictionKey, e.getMessage());
}
}
+
+ dbMap.compute(evictionKey, (k, v) -> {
+ pendingEvictionQueue.remove(k);
+ if (v == null) {
+ throw new IllegalStateException("SnapshotId '" + k + "' does not exist in cache. The RocksDB " +
+ "instance of the Snapshot may not be closed properly.");
+ }
+
+ if (v.getTotalRefCount() > 0) {
+ LOG.debug("SnapshotId {} is still being referenced ({}), skipping its clean up.", k, v.getTotalRefCount());
+ return v;
+ } else {
+ LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k);
+ // Close the instance, which also closes its DB handle.
+ try {
+ v.get().close();
+ } catch (IOException ex) {
+ throw new IllegalStateException("Error while closing snapshot DB.", ex);
+ }
+ omMetrics.decNumSnapshotCacheSize();
+ return null;
+ }
+ });
+ return null;
}
/**
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index 9fabe5a46509..06b70dca9054 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -24,20 +24,26 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mockStatic;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
import org.apache.hadoop.ozone.om.KeyManager;
@@ -51,11 +57,13 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.ExitUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.MockedStatic;
@@ -185,7 +193,7 @@ public void testMultithreadedDirectoryDeletion() throws Exception {
= new OmTestManagers(conf);
OzoneManager ozoneManager = omTestManagers.getOzoneManager();
AtomicBoolean isRunning = new AtomicBoolean(true);
- try (MockedStatic mockedStatic = Mockito.mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) {
+ try (MockedStatic mockedStatic = mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) {
List> futureList = new ArrayList<>();
Thread deletionThread = new Thread(() -> {
while (futureList.size() < threadCount) {
@@ -221,7 +229,7 @@ public void testMultithreadedDirectoryDeletion() throws Exception {
DirectoryDeletingService.DirDeletingTask dirDeletingTask =
ozoneManager.getKeyManager().getDirDeletingService().new DirDeletingTask(null);
- dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), Long.MAX_VALUE, 1);
+ dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), 1, 6000);
assertThat(futureList).hasSize(threadCount);
for (Pair pair : futureList) {
assertTrue(pair.getRight().isDone());
@@ -231,4 +239,77 @@ public void testMultithreadedDirectoryDeletion() throws Exception {
ozoneManager.stop();
}
}
+
+ @Test
+ @DisplayName("DirectoryDeletingService batches PurgeDirectories by Ratis byte limit (via submitRequest spy)")
+ void testPurgeDirectoriesBatching() throws Exception {
+ final int ratisLimitBytes = 2304;
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ File testDir = Files.createTempDirectory("TestDDS-SubmitSpy").toFile();
+ ServerUtils.setOzoneMetaDirPath(conf, testDir.toString());
+ conf.setTimeDuration(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS);
+ conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, ratisLimitBytes, StorageUnit.BYTES);
+ conf.setQuietMode(false);
+
+ OmTestManagers managers = new OmTestManagers(conf);
+ om = managers.getOzoneManager();
+ KeyManager km = managers.getKeyManager();
+
+ DirectoryDeletingService real = (DirectoryDeletingService) km.getDirDeletingService();
+ DirectoryDeletingService dds = Mockito.spy(real);
+
+ List captured = new ArrayList<>();
+ Mockito.doAnswer(inv -> {
+ OzoneManagerProtocolProtos.OMRequest req = inv.getArgument(0);
+ captured.add(req);
+ return OzoneManagerProtocolProtos.OMResponse.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories).setStatus(OzoneManagerProtocolProtos.Status.OK)
+ .build();
+ }).when(dds).submitRequest(Mockito.any(OzoneManagerProtocolProtos.OMRequest.class));
+
+ final long volumeId = 1L, bucketId = 2L;
+ List purgeList = new ArrayList<>();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 30; i++) {
+ sb.append("0123456789");
+ }
+ final String longSuffix = sb.toString();
+
+ for (int i = 0; i < 20; i++) {
+ purgeList.add(OzoneManagerProtocolProtos.PurgePathRequest.newBuilder().setVolumeId(volumeId).setBucketId(bucketId)
+ .setDeletedDir("dir-" + longSuffix + "-" + i).build());
+ }
+
+ org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId vbId =
+ new org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId(volumeId, bucketId);
+ OzoneManagerProtocolProtos.BucketNameInfo bni =
+ OzoneManagerProtocolProtos.BucketNameInfo.newBuilder().setVolumeId(volumeId).setBucketId(bucketId)
+ .setVolumeName("v").setBucketName("b").build();
+ Map
+ bucketNameInfoMap = new HashMap<>();
+ bucketNameInfoMap.put(vbId, bni);
+
+ dds.optimizeDirDeletesAndSubmitRequest(0L, 0L, 0L, new ArrayList<>(), purgeList, null, Time.monotonicNow(), km,
+ kv -> true, kv -> true, bucketNameInfoMap, null, 1L, new AtomicInteger(Integer.MAX_VALUE));
+
+ assertThat(captured.size())
+ .as("Expect batching to respect Ratis byte limit")
+ .isBetween(3, 5);
+
+ for (OzoneManagerProtocolProtos.OMRequest omReq : captured) {
+ assertThat(omReq.getCmdType()).isEqualTo(OzoneManagerProtocolProtos.Type.PurgeDirectories);
+
+ OzoneManagerProtocolProtos.PurgeDirectoriesRequest purge = omReq.getPurgeDirectoriesRequest();
+ int payloadBytes =
+ purge.getDeletedPathList().stream().mapToInt(OzoneManagerProtocolProtos.PurgePathRequest::getSerializedSize)
+ .sum();
+
+ assertThat(payloadBytes).as("Batch size should respect Ratis byte limit").isLessThanOrEqualTo(ratisLimitBytes);
+ }
+
+ org.apache.commons.io.FileUtils.deleteDirectory(testDir);
+ }
+
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestMultiSnapshotLocks.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestMultiSnapshotLocks.java
index 9c358a9261b3..29f6a2d0efcb 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestMultiSnapshotLocks.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestMultiSnapshotLocks.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.om.snapshot;
+import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_GC_LOCK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -31,9 +32,11 @@
import static org.mockito.Mockito.when;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
@@ -66,6 +69,21 @@ void setUp() {
multiSnapshotLocks = new MultiSnapshotLocks(mockLock, mockResource, true);
}
+ @Test
+ public void testMultiSnapshotLocksWithMultipleResourceLocksMultipleTimes() throws OMException {
+ OzoneManagerLock omLock = new OzoneManagerLock(new OzoneConfiguration());
+ MultiSnapshotLocks multiSnapshotLocks1 = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK, true);
+ MultiSnapshotLocks multiSnapshotLocks2 = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK, true);
+ Collection uuid1 = Collections.singleton(UUID.randomUUID());
+ Collection uuid2 = Collections.singleton(UUID.randomUUID());
+ for (int i = 0; i < 10; i++) {
+ assertTrue(multiSnapshotLocks1.acquireLock(uuid1).isLockAcquired());
+ assertTrue(multiSnapshotLocks2.acquireLock(uuid2).isLockAcquired());
+ multiSnapshotLocks1.releaseLock();
+ multiSnapshotLocks2.releaseLock();
+ }
+ }
+
@Test
void testAcquireLockSuccess() throws Exception {
List objects = Arrays.asList(obj1, obj2);
@@ -107,7 +125,8 @@ void testReleaseLock() throws Exception {
when(mockLock.acquireWriteLocks(eq(mockResource), anyCollection())).thenReturn(mockLockDetails);
multiSnapshotLocks.acquireLock(objects);
assertFalse(multiSnapshotLocks.getObjectLocks().isEmpty());
-
+ when(mockLockDetails.isLockAcquired()).thenReturn(false);
+ when(mockLock.releaseWriteLocks(eq(mockResource), anyCollection())).thenReturn(mockLockDetails);
// Now release locks
multiSnapshotLocks.releaseLock();
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
index 9406d74c5ff6..3be27c9012af 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.ozone.om.snapshot;
import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.VOLUME_LOCK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -43,6 +45,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
@@ -51,6 +54,7 @@
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
+import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterEach;
@@ -173,7 +177,7 @@ public void testGetHoldsReadLock(int numberOfLocks) throws IOException {
@ParameterizedTest
@ValueSource(ints = {0, 1, 5, 10})
@DisplayName("Tests lock() holds a write lock")
- public void testGetHoldsWriteLock(int numberOfLocks) {
+ public void testLockHoldsWriteLock(int numberOfLocks) {
clearInvocations(lock);
for (int i = 0; i < numberOfLocks; i++) {
snapshotCache.lock();
@@ -181,6 +185,29 @@ public void testGetHoldsWriteLock(int numberOfLocks) {
verify(lock, times(numberOfLocks)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK));
}
+ @Test
+ public void testLockSupplierReturnsLockWithAnotherLockReleased() {
+ IOzoneManagerLock ozoneManagerLock = new OzoneManagerLock(new OzoneConfiguration());
+ snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 50, true, ozoneManagerLock);
+ try (UncheckedAutoCloseableSupplier lockDetails = snapshotCache.lock()) {
+ ozoneManagerLock.acquireWriteLock(VOLUME_LOCK, "vol1");
+ ozoneManagerLock.releaseWriteLock(VOLUME_LOCK, "vol1");
+ assertTrue(lockDetails.get().isLockAcquired());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 5, 10})
+ @DisplayName("Tests lock(snapshotId) holds a write lock")
+ public void testLockHoldsWriteLockSnapshotId(int numberOfLocks) {
+ clearInvocations(lock);
+ UUID snapshotId = UUID.randomUUID();
+ for (int i = 0; i < numberOfLocks; i++) {
+ snapshotCache.lock(snapshotId);
+ }
+ verify(lock, times(numberOfLocks)).acquireWriteLock(eq(SNAPSHOT_DB_LOCK), eq(snapshotId.toString()));
+ }
+
@Test
@DisplayName("get() same entry twice yields one cache entry only")
void testGetTwice() throws IOException {