diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 3a09ae6f20b6..3ded116b06b6 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -414,6 +414,11 @@ private OMConfigKeys() { public static final int OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT = 10; + public static final String OZONE_THREAD_NUMBER_KEY_DELETION = + "ozone.thread.number.key.deletion"; + + public static final int OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT = 10; + public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK = "ozone.snapshot.filtering.limit.per.task"; public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 9c040f9aa006..5949d22cacad 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -126,6 +126,7 @@ private void addPropertiesNotInXml() { OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER, OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD, OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION, + OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION, ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY, ScmConfigKeys.OZONE_SCM_HA_PREFIX, S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED, diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java index fcd47b593a15..b31848056d50 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java @@ -135,7 +135,7 @@ public void testKeysPurgingByKeyDeletingService() throws Exception { GenericTestUtils.waitFor( () -> { try { - return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) + return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, keyDeletingService.getDeletedKeySupplier()) .getKeyBlocksList().size() == 0; } catch (IOException e) { return false; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java index a22ae2c1442d..5db2117c5429 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -504,11 +505,11 @@ private KeyDeletingService getMockedKeyDeletingService(AtomicBoolean keyDeletion when(ozoneManager.getKeyManager()).thenReturn(keyManager); KeyDeletingService keyDeletingService = Mockito.spy(new KeyDeletingService(ozoneManager, ozoneManager.getScmClient().getBlockClient(), keyManager, 10000, - 100000, cluster.getConf(), false)); + 100000, cluster.getConf(), false, 1)); keyDeletingService.shutdown(); GenericTestUtils.waitFor(() -> keyDeletingService.getThreadCount() == 0, 1000, 100000); - when(keyManager.getPendingDeletionKeys(anyInt())).thenAnswer(i -> { + when(keyManager.getPendingDeletionKeys(anyInt(), any())).thenAnswer(i -> { // wait for SDS to reach the KDS wait block before processing any key. GenericTestUtils.waitFor(keyDeletionWaitStarted::get, 1000, 100000); keyDeletionStarted.set(true); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java index a8f8aeb1ce8e..bb3ab721a2c4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDirectoryCleaningService.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -83,6 +84,7 @@ public static void init() throws Exception { conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 2500, TimeUnit.MILLISECONDS); conf.setBoolean(OZONE_ACL_ENABLED, true); + conf.setInt(OZONE_THREAD_NUMBER_KEY_DELETION, 1); cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(3) .build(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 157398530385..f647270ae86d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -116,11 +116,14 @@ ListKeysResult listKeys(String volumeName, String bucketName, String startKey, * Second is a Mapping of Key-Value pair which is updated in the deletedTable. * * @param count max number of keys to return. + * @param deletedKeySupplier DeletedKeySupplier object. * @return a Pair of list of {@link BlockGroup} representing keys and blocks, * and a hashmap for key-value pair to be updated in the deletedTable. * @throws IOException */ - PendingKeysDeletion getPendingDeletionKeys(int count) throws IOException; + PendingKeysDeletion getPendingDeletionKeys(int count, + KeyDeletingService.DeletedKeySupplier deletedKeySupplier) + throws IOException; /** * Returns a list rename entries from the snapshotRenamedTable. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 54b4608e64f2..d79f2e8a3fda 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -58,6 +58,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT; import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND; @@ -238,9 +240,13 @@ public void start(OzoneConfiguration configuration) { OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); - keyDeletingService = new KeyDeletingService(ozoneManager, - scmClient.getBlockClient(), this, blockDeleteInterval, - serviceTimeout, configuration, isSnapshotDeepCleaningEnabled); + int keyDeletingCorePoolSize = + configuration.getInt(OZONE_THREAD_NUMBER_KEY_DELETION, + OZONE_THREAD_NUMBER_KEY_DELETION_DEFAULT); + keyDeletingService = + new KeyDeletingService(ozoneManager, scmClient.getBlockClient(), this, + blockDeleteInterval, serviceTimeout, configuration, + isSnapshotDeepCleaningEnabled, keyDeletingCorePoolSize); keyDeletingService.start(); } @@ -679,12 +685,13 @@ public ListKeysResult listKeys(String volumeName, String bucketName, } @Override - public PendingKeysDeletion getPendingDeletionKeys(final int count) + public PendingKeysDeletion getPendingDeletionKeys(final int count, + KeyDeletingService.DeletedKeySupplier deletedKeySupplier) throws IOException { OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) metadataManager; - return omMetadataManager - .getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager()); + return omMetadataManager.getPendingDeletionKeys(count, + ozoneManager.getOmSnapshotManager(), deletedKeySupplier); } private List> getTableEntries(String startKey, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index a7793fdc9abe..9cedc92a379a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -110,6 +110,7 @@ import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils; +import org.apache.hadoop.ozone.om.service.KeyDeletingService; import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo; @@ -1591,135 +1592,136 @@ private PersistedUserVolumeInfo getVolumesByUser(String userNameKey) * @throws IOException */ public PendingKeysDeletion getPendingDeletionKeys(final int keyCount, - OmSnapshotManager omSnapshotManager) + OmSnapshotManager omSnapshotManager, + KeyDeletingService.DeletedKeySupplier deletedKeySupplier) throws IOException { List keyBlocksList = Lists.newArrayList(); HashMap keysToModify = new HashMap<>(); - try (TableIterator> - keyIter = getDeletedTable().iterator()) { - int currentCount = 0; - while (keyIter.hasNext() && currentCount < keyCount) { - RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo(); - KeyValue kv = keyIter.next(); - if (kv != null) { - List blockGroupList = Lists.newArrayList(); - // Get volume name and bucket name - String[] keySplit = kv.getKey().split(OM_KEY_PREFIX); - String bucketKey = getBucketKey(keySplit[1], keySplit[2]); - OmBucketInfo bucketInfo = getBucketTable().get(bucketKey); - // If Bucket deleted bucketInfo would be null, thus making previous snapshot also null. - SnapshotInfo previousSnapshotInfo = bucketInfo == null ? null : - SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(), - bucketInfo.getBucketName(), ozoneManager, snapshotChainManager); - // previous snapshot is not active or it has not been flushed to disk then don't process the key in this - // iteration. - if (previousSnapshotInfo != null && - (previousSnapshotInfo.getSnapshotStatus() != SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE || - !OmSnapshotManager.areSnapshotChangesFlushedToDB(ozoneManager.getMetadataManager(), - previousSnapshotInfo))) { - continue; - } - // Get the latest snapshot in snapshot path. - try (ReferenceCounted rcLatestSnapshot = previousSnapshotInfo == null ? null : - omSnapshotManager.getSnapshot(previousSnapshotInfo.getVolumeName(), - previousSnapshotInfo.getBucketName(), previousSnapshotInfo.getName())) { - - // Multiple keys with the same path can be queued in one DB entry - RepeatedOmKeyInfo infoList = kv.getValue(); - for (OmKeyInfo info : infoList.cloneOmKeyInfoList()) { - // Skip the key if it exists in the previous snapshot (of the same - // scope) as in this case its blocks should not be reclaimed - - // If the last snapshot is deleted and the keys renamed in between - // the snapshots will be cleaned up by KDS. So we need to check - // in the renamedTable as well. - String dbRenameKey = getRenameKey(info.getVolumeName(), - info.getBucketName(), info.getObjectID()); - - if (rcLatestSnapshot != null) { - Table prevKeyTable = - rcLatestSnapshot.get() - .getMetadataManager() - .getKeyTable(bucketInfo.getBucketLayout()); - - Table prevDeletedTable = - rcLatestSnapshot.get().getMetadataManager().getDeletedTable(); - String prevKeyTableDBKey = getSnapshotRenamedTable() - .get(dbRenameKey); - String prevDelTableDBKey = getOzoneKey(info.getVolumeName(), - info.getBucketName(), info.getKeyName()); - // format: /volName/bucketName/keyName/objId - prevDelTableDBKey = getOzoneDeletePathKey(info.getObjectID(), - prevDelTableDBKey); - - if (prevKeyTableDBKey == null && - bucketInfo.getBucketLayout().isFileSystemOptimized()) { - long volumeId = getVolumeId(info.getVolumeName()); - prevKeyTableDBKey = getOzonePathKey(volumeId, - bucketInfo.getObjectID(), - info.getParentObjectID(), - info.getFileName()); - } else if (prevKeyTableDBKey == null) { - prevKeyTableDBKey = getOzoneKey(info.getVolumeName(), - info.getBucketName(), + int currentCount = 0; + while (currentCount < keyCount) { + RepeatedOmKeyInfo notReclaimableKeyInfo = new RepeatedOmKeyInfo(); + KeyValue kv = deletedKeySupplier.get(); + if (kv != null) { + List blockGroupList = Lists.newArrayList(); + // Get volume name and bucket name + String[] keySplit = kv.getKey().split(OM_KEY_PREFIX); + String bucketKey = getBucketKey(keySplit[1], keySplit[2]); + OmBucketInfo bucketInfo = getBucketTable().get(bucketKey); + // If Bucket deleted bucketInfo would be null, thus making previous snapshot also null. + SnapshotInfo previousSnapshotInfo = bucketInfo == null ? null : + SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(), + bucketInfo.getBucketName(), ozoneManager, snapshotChainManager); + // previous snapshot is not active or it has not been flushed to disk then don't process the key in this + // iteration. + if (previousSnapshotInfo != null && (previousSnapshotInfo.getSnapshotStatus() != + SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE || !OmSnapshotManager.areSnapshotChangesFlushedToDB( + ozoneManager.getMetadataManager(), previousSnapshotInfo))) { + continue; + } + // Get the latest snapshot in snapshot path. + try (ReferenceCounted rcLatestSnapshot = + previousSnapshotInfo == null ? null : omSnapshotManager.getSnapshot( + previousSnapshotInfo.getVolumeName(), + previousSnapshotInfo.getBucketName(), + previousSnapshotInfo.getName())) { + + // Multiple keys with the same path can be queued in one DB entry + RepeatedOmKeyInfo infoList = kv.getValue(); + for (OmKeyInfo info : infoList.cloneOmKeyInfoList()) { + // Skip the key if it exists in the previous snapshot (of the same + // scope) as in this case its blocks should not be reclaimed + + // If the last snapshot is deleted and the keys renamed in between + // the snapshots will be cleaned up by KDS. So we need to check + // in the renamedTable as well. + String dbRenameKey = + getRenameKey(info.getVolumeName(), info.getBucketName(), + info.getObjectID()); + + if (rcLatestSnapshot != null) { + Table prevKeyTable = + rcLatestSnapshot.get().getMetadataManager() + .getKeyTable(bucketInfo.getBucketLayout()); + + Table prevDeletedTable = + rcLatestSnapshot.get().getMetadataManager().getDeletedTable(); + String prevKeyTableDBKey = + getSnapshotRenamedTable().get(dbRenameKey); + String prevDelTableDBKey = + getOzoneKey(info.getVolumeName(), info.getBucketName(), info.getKeyName()); - } + // format: /volName/bucketName/keyName/objId + prevDelTableDBKey = + getOzoneDeletePathKey(info.getObjectID(), prevDelTableDBKey); + + if (prevKeyTableDBKey == null && bucketInfo.getBucketLayout() + .isFileSystemOptimized()) { + long volumeId = getVolumeId(info.getVolumeName()); + prevKeyTableDBKey = + getOzonePathKey(volumeId, bucketInfo.getObjectID(), + info.getParentObjectID(), info.getFileName()); + } else if (prevKeyTableDBKey == null) { + prevKeyTableDBKey = + getOzoneKey(info.getVolumeName(), info.getBucketName(), + info.getKeyName()); + } - OmKeyInfo omKeyInfo = prevKeyTable.get(prevKeyTableDBKey); - // When key is deleted it is no longer in keyTable, we also - // have to check deletedTable of previous snapshot - RepeatedOmKeyInfo delOmKeyInfo = - prevDeletedTable.get(prevDelTableDBKey); - if (versionExistsInPreviousSnapshot(omKeyInfo, - info, delOmKeyInfo)) { - // If the infoList size is 1, there is nothing to split. - // We either delete it or skip it. - if (!(infoList.getOmKeyInfoList().size() == 1)) { - notReclaimableKeyInfo.addOmKeyInfo(info); - } - continue; + OmKeyInfo omKeyInfo = prevKeyTable.get(prevKeyTableDBKey); + // When key is deleted it is no longer in keyTable, we also + // have to check deletedTable of previous snapshot + RepeatedOmKeyInfo delOmKeyInfo = + prevDeletedTable.get(prevDelTableDBKey); + if (versionExistsInPreviousSnapshot(omKeyInfo, info, + delOmKeyInfo)) { + // If the infoList size is 1, there is nothing to split. + // We either delete it or skip it. + if (!(infoList.getOmKeyInfoList().size() == 1)) { + notReclaimableKeyInfo.addOmKeyInfo(info); } + continue; } + } - // Add all blocks from all versions of the key to the deletion - // list - for (OmKeyLocationInfoGroup keyLocations : - info.getKeyLocationVersions()) { - List item = keyLocations.getLocationList().stream() - .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) - .collect(Collectors.toList()); - BlockGroup keyBlocks = BlockGroup.newBuilder() - .setKeyName(kv.getKey()) - .addAllBlockIDs(item) - .build(); - blockGroupList.add(keyBlocks); - } - currentCount++; + // Add all blocks from all versions of the key to the deletion + // list + for (OmKeyLocationInfoGroup keyLocations : info.getKeyLocationVersions()) { + List item = keyLocations.getLocationList().stream() + .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) + .collect(Collectors.toList()); + BlockGroup keyBlocks = + BlockGroup.newBuilder().setKeyName(kv.getKey()) + .addAllBlockIDs(item).build(); + blockGroupList.add(keyBlocks); } + currentCount++; + } - List notReclaimableKeyInfoList = - notReclaimableKeyInfo.getOmKeyInfoList(); - // If Bucket deleted bucketInfo would be null, thus making previous snapshot also null. - SnapshotInfo newPreviousSnapshotInfo = bucketInfo == null ? null : - SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(), - bucketInfo.getBucketName(), ozoneManager, snapshotChainManager); - // Check if the previous snapshot in the chain hasn't changed. - if (Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo).map(SnapshotInfo::getSnapshotId), - Optional.ofNullable(previousSnapshotInfo).map(SnapshotInfo::getSnapshotId))) { - // If all the versions are not reclaimable, then do nothing. - if (notReclaimableKeyInfoList.size() > 0 && - notReclaimableKeyInfoList.size() != - infoList.getOmKeyInfoList().size()) { - keysToModify.put(kv.getKey(), notReclaimableKeyInfo); - } + List notReclaimableKeyInfoList = + notReclaimableKeyInfo.getOmKeyInfoList(); + // If Bucket deleted bucketInfo would be null, thus making previous snapshot also null. + SnapshotInfo newPreviousSnapshotInfo = bucketInfo == null ? null : + SnapshotUtils.getLatestSnapshotInfo(bucketInfo.getVolumeName(), + bucketInfo.getBucketName(), ozoneManager, + snapshotChainManager); + // Check if the previous snapshot in the chain hasn't changed. + if (Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo) + .map(SnapshotInfo::getSnapshotId), + Optional.ofNullable(previousSnapshotInfo) + .map(SnapshotInfo::getSnapshotId))) { + // If all the versions are not reclaimable, then do nothing. + if (notReclaimableKeyInfoList.size() > 0 && notReclaimableKeyInfoList.size() != infoList.getOmKeyInfoList() + .size()) { + keysToModify.put(kv.getKey(), notReclaimableKeyInfo); + } - if (notReclaimableKeyInfoList.size() != - infoList.getOmKeyInfoList().size()) { - keyBlocksList.addAll(blockGroupList); - } + if (notReclaimableKeyInfoList.size() != infoList.getOmKeyInfoList() + .size()) { + keyBlocksList.addAll(blockGroupList); } } } + } else { + break; } } return new PendingKeysDeletion(keyBlocksList, keysToModify); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index 7b7d1e238639..5c21855f7e54 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -101,7 +101,7 @@ public AbstractKeyDeletingService(String serviceName, long interval, protected int processKeyDeletes(List keyBlocksList, KeyManager manager, HashMap keysToModify, - String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException { + String snapTableKey, UUID expectedPreviousSnapshotId, long run) throws IOException { long startTime = Time.monotonicNow(); int delCount = 0; @@ -123,7 +123,7 @@ protected int processKeyDeletes(List keyBlocksList, if (blockDeletionResults != null) { startTime = Time.monotonicNow(); delCount = submitPurgeKeysRequest(blockDeletionResults, - keysToModify, snapTableKey, expectedPreviousSnapshotId); + keysToModify, snapTableKey, expectedPreviousSnapshotId, run); int limit = ozoneManager.getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK, OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.", @@ -171,7 +171,8 @@ private int deleteAllKeys(List results, * @param keysToModify Updated list of RepeatedOmKeyInfo */ private int submitPurgeKeysRequest(List results, - HashMap keysToModify, String snapTableKey, UUID expectedPreviousSnapshotId) { + HashMap keysToModify, String snapTableKey, + UUID expectedPreviousSnapshotId, long run) { Map, List> purgeKeysMapPerBucket = new HashMap<>(); @@ -250,7 +251,7 @@ private int submitPurgeKeysRequest(List results, // Submit PurgeKeys request to OM try { - OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get()); + OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, run); } catch (ServiceException e) { LOG.error("PurgeKey request failed. Will retry at next run.", e); return 0; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 1683599d2a9c..8e7094e91f94 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -36,6 +36,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; @@ -99,13 +100,22 @@ public class KeyDeletingService extends AbstractKeyDeletingService { private final SnapshotChainManager snapshotChainManager; private DeletingServiceMetrics metrics; + public DeletedKeySupplier getDeletedKeySupplier() { + return deletedKeySupplier; + } + + private final DeletedKeySupplier deletedKeySupplier; + private AtomicInteger taskCount = new AtomicInteger(0); + private final int keyDeletingCorePoolSize; + + @SuppressWarnings("parameternumber") public KeyDeletingService(OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient, KeyManager manager, long serviceInterval, long serviceTimeout, ConfigurationSource conf, - boolean deepCleanSnapshots) { + boolean deepCleanSnapshots, int keyDeletingCorePoolSize) { super(KeyDeletingService.class.getSimpleName(), serviceInterval, - TimeUnit.MILLISECONDS, KEY_DELETING_CORE_POOL_SIZE, + TimeUnit.MILLISECONDS, keyDeletingCorePoolSize, serviceTimeout, ozoneManager, scmClient); this.manager = manager; this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, @@ -122,6 +132,9 @@ public KeyDeletingService(OzoneManager ozoneManager, this.deepCleanSnapshots = deepCleanSnapshots; this.snapshotChainManager = ((OmMetadataManagerImpl)manager.getMetadataManager()).getSnapshotChainManager(); this.metrics = ozoneManager.getDeletionMetrics(); + deletedKeySupplier = new DeletedKeySupplier(); + taskCount.set(0); + this.keyDeletingCorePoolSize = keyDeletingCorePoolSize; } /** @@ -141,10 +154,66 @@ public boolean isRunningOnAOS() { @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); - queue.add(new KeyDeletingTask(this)); + if (taskCount.get() > 0) { + LOG.info("{} Key deleting task(s) already in progress.", + taskCount.get()); + return queue; + } + try { + deletedKeySupplier.reInitItr(); + } catch (IOException ex) { + LOG.error("Unable to get the iterator.", ex); + return queue; + } + taskCount.set(keyDeletingCorePoolSize); + for (int i = 0; i < keyDeletingCorePoolSize; i++) { + queue.add(new KeyDeletingTask(this)); + } return queue; } + @Override + public void shutdown() { + super.shutdown(); + deletedKeySupplier.closeItr(); + } + + /** + * DeletedKeySupplier class. + */ + public final class DeletedKeySupplier { + + public DeletedKeySupplier() { + try { + reInitItr(); + } catch (IOException ex) { + + } + } + private TableIterator> + deleteKeyTableIterator; + + public synchronized Table.KeyValue get() + throws IOException { + if (deleteKeyTableIterator.hasNext()) { + return deleteKeyTableIterator.next(); + } + return null; + } + + public synchronized void closeItr() { + IOUtils.closeQuietly(deleteKeyTableIterator); + deleteKeyTableIterator = null; + } + + public synchronized void reInitItr() throws IOException { + closeItr(); + deleteKeyTableIterator = + manager.getMetadataManager().getDeletedTable().iterator(); + + } + } + private boolean shouldRun() { if (getOzoneManager() == null) { // OzoneManager can be null for testing @@ -215,13 +284,14 @@ public BackgroundTaskResult call() { // snapshotId since AOS could process multiple buckets in one iteration. UUID expectedPreviousSnapshotId = snapshotChainManager.getLatestGlobalSnapshotId(); PendingKeysDeletion pendingKeysDeletion = manager - .getPendingDeletionKeys(getKeyLimitPerTask()); + .getPendingDeletionKeys(getKeyLimitPerTask(), deletedKeySupplier); List keyBlocksList = pendingKeysDeletion .getKeyBlocksList(); if (keyBlocksList != null && !keyBlocksList.isEmpty()) { delCount = processKeyDeletes(keyBlocksList, getOzoneManager().getKeyManager(), - pendingKeysDeletion.getKeysToModify(), null, expectedPreviousSnapshotId); + pendingKeysDeletion.getKeysToModify(), null, + expectedPreviousSnapshotId, run); deletedKeyCount.addAndGet(delCount); metrics.incrNumKeysProcessed(keyBlocksList.size()); metrics.incrNumKeysSentForPurge(delCount); @@ -233,7 +303,7 @@ public BackgroundTaskResult call() { try { if (deepCleanSnapshots && delCount < keyLimitPerTask) { - processSnapshotDeepClean(delCount); + processSnapshotDeepClean(delCount, run); } } catch (Exception e) { LOG.error("Error while running deep clean on snapshots. Will " + @@ -241,6 +311,7 @@ public BackgroundTaskResult call() { } } + taskCount.getAndDecrement(); isRunningOnAOS.set(false); synchronized (deletingService) { this.deletingService.notify(); @@ -251,7 +322,7 @@ public BackgroundTaskResult call() { } @SuppressWarnings("checkstyle:MethodLength") - private void processSnapshotDeepClean(int delCount) + private void processSnapshotDeepClean(int delCount, long run) throws IOException { OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); @@ -449,7 +520,8 @@ private void processSnapshotDeepClean(int delCount) if (!keysToPurge.isEmpty()) { processKeyDeletes(keysToPurge, currOmSnapshot.getKeyManager(), keysToModify, currSnapInfo.getTableKey(), - Optional.ofNullable(previousSnapshot).map(SnapshotInfo::getSnapshotId).orElse(null)); + Optional.ofNullable(previousSnapshot) + .map(SnapshotInfo::getSnapshotId).orElse(null), run); } } finally { IOUtils.closeQuietly(rcPrevOmSnapshot, rcPrevToPrevOmSnapshot); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 5670e4c2f657..3c5314d2bc08 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -23,11 +23,13 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_KEY_DELETION; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -142,6 +144,23 @@ private void createConfig(File testDir) { conf.setQuietMode(false); } + private void createConfigForSnapshot(File testDir) { + conf = new OzoneConfiguration(); + System.setProperty(DBConfigFromFile.CONFIG_DIR, "/"); + ServerUtils.setOzoneMetaDirPath(conf, testDir.toString()); + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, + 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, + 1, TimeUnit.SECONDS); + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, + 200, TimeUnit.MILLISECONDS); + conf.setBoolean(OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED, true); + conf.setInt(OZONE_THREAD_NUMBER_KEY_DELETION, 1); + conf.setQuietMode(false); + } + private void createSubject() throws Exception { OmTestManagers omTestManagers = new OmTestManagers(conf, scmBlockTestingClient, null); keyManager = omTestManagers.getKeyManager(); @@ -198,8 +217,9 @@ void checkIfDeleteServiceIsDeletingKeys() () -> getDeletedKeyCount() >= initialDeletedCount + keyCount, 100, 10000); assertThat(getRunCount()).isGreaterThan(initialRunCount); - assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList()) - .isEmpty(); + assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, + keyDeletingService.getDeletedKeySupplier()) + .getKeyBlocksList()).isEmpty(); } @Test @@ -227,8 +247,9 @@ void checkDeletionForKeysWithMultipleVersions() throws Exception { 1000, 10000); assertThat(getRunCount()) .isGreaterThan(initialRunCount); - assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList()) - .isEmpty(); + assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, + keyDeletingService.getDeletedKeySupplier()) + .getKeyBlocksList()).isEmpty(); // The 1st version of the key has 1 block and the 2nd version has 2 // blocks. Hence, the ScmBlockClient should have received at least 3 @@ -269,8 +290,9 @@ void checkDeletedTableCleanUpForSnapshot() throws Exception { 1000, 10000); assertThat(getRunCount()) .isGreaterThan(initialRunCount); - assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).getKeyBlocksList()) - .isEmpty(); + assertThat(keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, + keyDeletingService.getDeletedKeySupplier()) + .getKeyBlocksList()).isEmpty(); // deletedTable should have deleted key of the snapshot bucket assertFalse(metadataManager.getDeletedTable().isEmpty()); @@ -326,7 +348,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() return omSnapshotManager; }); KeyDeletingService service = new KeyDeletingService(ozoneManager, scmBlockTestingClient, km, 10000, - 100000, conf, false); + 100000, conf, false, 1); service.shutdown(); final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable); final long initialDeletedCount = metadataManager.countRowsInTable(deletedTable); @@ -376,7 +398,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() Assertions.assertNotEquals(deletePathKey[0], group.getGroupID()); } return pendingKeysDeletion; - }).when(km).getPendingDeletionKeys(anyInt()); + }).when(km).getPendingDeletionKeys(anyInt(), any()); service.runPeriodicalTaskNow(); service.runPeriodicalTaskNow(); assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); @@ -401,95 +423,30 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() keyDeletingService.resume(); } - /* - * Create Snap1 - * Create 10 keys - * Create Snap2 - * Delete 10 keys - * Create 5 keys - * Delete 5 keys -> but stop KeyDeletingService so - that keys won't be reclaimed. - * Create snap3 - * Now wait for snap3 to be deepCleaned -> Deleted 5 - keys should be deep cleaned. - * Now delete snap2 -> Wait for snap3 to be deep cleaned so deletedTable - of Snap3 should be empty. - */ - @Test - void testSnapshotDeepClean() throws Exception { - Table snapshotInfoTable = - om.getMetadataManager().getSnapshotInfoTable(); - Table deletedTable = - om.getMetadataManager().getDeletedTable(); - Table keyTable = - om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT); - - // Suspend KeyDeletingService - keyDeletingService.suspend(); - - final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable); - final long initialKeyCount = metadataManager.countRowsInTable(keyTable); - final long initialDeletedCount = metadataManager.countRowsInTable(deletedTable); - - final String volumeName = getTestName(); - final String bucketName = uniqueObjectName("bucket"); - - // Create Volume and Buckets - createVolumeAndBucket(volumeName, bucketName, false); - - writeClient.createSnapshot(volumeName, bucketName, uniqueObjectName("snap")); - assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, metadataManager); - List createdKeys = new ArrayList<>(); - for (int i = 1; i <= 10; i++) { - OmKeyArgs args = createAndCommitKey(volumeName, bucketName, - uniqueObjectName("key"), 3); - createdKeys.add(args); - } - assertTableRowCount(keyTable, initialKeyCount + 10, metadataManager); - - String snap2 = uniqueObjectName("snap"); - writeClient.createSnapshot(volumeName, bucketName, snap2); - assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); - - // Create 5 Keys - for (int i = 11; i <= 15; i++) { - OmKeyArgs args = createAndCommitKey(volumeName, bucketName, - uniqueObjectName("key"), 3); - createdKeys.add(args); - } - - // Delete all 15 keys. - for (int i = 0; i < 15; i++) { - writeClient.deleteKey(createdKeys.get(i)); - } + } - assertTableRowCount(deletedTable, initialDeletedCount + 15, metadataManager); + @Nested + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + class Snapshot { + @BeforeAll + void setup(@TempDir File testDir) throws Exception { + // failCallsFrequency = 0 means all calls succeed + scmBlockTestingClient = new ScmBlockLocationTestingClient(null, null, 0); - // Create Snap3, traps all the deleted keys. - String snap3 = uniqueObjectName("snap"); - writeClient.createSnapshot(volumeName, bucketName, snap3); - assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 3, metadataManager); - checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, false); + createConfigForSnapshot(testDir); + createSubject(); + } + @AfterEach + void resume() { keyDeletingService.resume(); + } - try (ReferenceCounted rcOmSnapshot = - om.getOmSnapshotManager().getSnapshot(volumeName, bucketName, snap3)) { - OmSnapshot snapshot3 = rcOmSnapshot.get(); - - Table snap3deletedTable = - snapshot3.getMetadataManager().getDeletedTable(); - - // 5 keys can be deep cleaned as it was stuck previously - assertTableRowCount(snap3deletedTable, initialDeletedCount + 10, metadataManager); - - writeClient.deleteSnapshot(volumeName, bucketName, snap2); - assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); - - assertTableRowCount(snap3deletedTable, initialDeletedCount, metadataManager); - assertTableRowCount(deletedTable, initialDeletedCount, metadataManager); - checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, true); + @AfterAll + void cleanup() { + if (om.stop()) { + om.join(); } } @@ -593,7 +550,7 @@ void testSnapshotExclusiveSize() throws Exception { // Check if the exclusive size is set. try (TableIterator> - iterator = snapshotInfoTable.iterator()) { + iterator = snapshotInfoTable.iterator()) { while (iterator.hasNext()) { Table.KeyValue snapshotEntry = iterator.next(); String snapshotName = snapshotEntry.getValue().getName(); @@ -607,6 +564,99 @@ void testSnapshotExclusiveSize() throws Exception { } } } + + /* + * Create Snap1 + * Create 10 keys + * Create Snap2 + * Delete 10 keys + * Create 5 keys + * Delete 5 keys -> but stop KeyDeletingService so + that keys won't be reclaimed. + * Create snap3 + * Now wait for snap3 to be deepCleaned -> Deleted 5 + keys should be deep cleaned. + * Now delete snap2 -> Wait for snap3 to be deep cleaned so deletedTable + of Snap3 should be empty. + */ + @Test + void testSnapshotDeepClean() throws Exception { + conf.setInt(OZONE_THREAD_NUMBER_KEY_DELETION, 10); + Table snapshotInfoTable = + om.getMetadataManager().getSnapshotInfoTable(); + Table deletedTable = + om.getMetadataManager().getDeletedTable(); + Table keyTable = + om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT); + + // Suspend KeyDeletingService + keyDeletingService.suspend(); + + final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable); + final long initialKeyCount = metadataManager.countRowsInTable(keyTable); + final long initialDeletedCount = metadataManager.countRowsInTable(deletedTable); + + final String volumeName = getTestName(); + final String bucketName = uniqueObjectName("bucket"); + + // Create Volume and Buckets + createVolumeAndBucket(volumeName, bucketName, false); + + writeClient.createSnapshot(volumeName, bucketName, uniqueObjectName("snap")); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, metadataManager); + + List createdKeys = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + OmKeyArgs args = createAndCommitKey(volumeName, bucketName, + uniqueObjectName("key"), 3); + createdKeys.add(args); + } + assertTableRowCount(keyTable, initialKeyCount + 10, metadataManager); + + String snap2 = uniqueObjectName("snap"); + writeClient.createSnapshot(volumeName, bucketName, snap2); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); + + // Create 5 Keys + for (int i = 11; i <= 15; i++) { + OmKeyArgs args = createAndCommitKey(volumeName, bucketName, + uniqueObjectName("key"), 3); + createdKeys.add(args); + } + + // Delete all 15 keys. + for (int i = 0; i < 15; i++) { + writeClient.deleteKey(createdKeys.get(i)); + } + + assertTableRowCount(deletedTable, initialDeletedCount + 15, metadataManager); + + // Create Snap3, traps all the deleted keys. + String snap3 = uniqueObjectName("snap"); + writeClient.createSnapshot(volumeName, bucketName, snap3); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 3, metadataManager); + checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, false); + + keyDeletingService.resume(); + + try (ReferenceCounted rcOmSnapshot = + om.getOmSnapshotManager().getSnapshot(volumeName, bucketName, snap3)) { + OmSnapshot snapshot3 = rcOmSnapshot.get(); + + Table snap3deletedTable = + snapshot3.getMetadataManager().getDeletedTable(); + + // 5 keys can be deep cleaned as it was stuck previously + assertTableRowCount(snap3deletedTable, initialDeletedCount + 10, metadataManager); + + writeClient.deleteSnapshot(volumeName, bucketName, snap2); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); + + assertTableRowCount(snap3deletedTable, initialDeletedCount, metadataManager); + assertTableRowCount(deletedTable, initialDeletedCount, metadataManager); + checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, true); + } + } } /** @@ -638,26 +688,34 @@ void cleanup() { @Test void checkIfDeleteServiceWithFailingSCM() throws Exception { + keyManager.getDeletingService().suspend(); + keyManager.getDeletingService().getDeletedKeySupplier().reInitItr(); final int initialCount = countKeysPendingDeletion(); final long initialRunCount = getRunCount(); final int keyCount = 100; createAndDeleteKeys(keyCount, 1); + GenericTestUtils.waitFor( () -> countKeysPendingDeletion() == initialCount + keyCount, 100, 2000); + + keyManager.getDeletingService().resume(); // Make sure that we have run the background thread 5 times more GenericTestUtils.waitFor( () -> getRunCount() >= initialRunCount + 5, 100, 10000); // Since SCM calls are failing, deletedKeyCount should be zero. + keyManager.getDeletingService().suspend(); + keyManager.getDeletingService().getDeletedKeySupplier().reInitItr(); assertEquals(0, getDeletedKeyCount()); assertEquals(initialCount + keyCount, countKeysPendingDeletion()); } @Test void checkDeletionForEmptyKey() throws Exception { + keyManager.getDeletingService().suspend(); final int initialCount = countKeysPendingDeletion(); final long initialRunCount = getRunCount(); final int keyCount = 100; @@ -666,19 +724,23 @@ void checkDeletionForEmptyKey() throws Exception { // the pre-allocated blocks are not committed, hence they will be deleted. GenericTestUtils.waitFor( - () -> countKeysPendingDeletion() == initialCount + keyCount, - 100, 2000); + () -> countKeysPendingDeletion() == initialCount + keyCount, 100, + 2000); + keyManager.getDeletingService().resume(); // Make sure that we have run the background thread 2 times or more - GenericTestUtils.waitFor( - () -> getRunCount() >= initialRunCount + 2, - 100, 1000); + GenericTestUtils.waitFor(() -> getRunCount() >= initialRunCount + 2, 100, + 1000); // the blockClient is set to fail the deletion of key blocks, hence no keys // will be deleted + keyManager.getDeletingService().suspend(); assertEquals(0, getDeletedKeyCount()); + } @Test void checkDeletionForPartiallyCommitKey() throws Exception { + keyManager.getDeletingService().suspend(); + keyManager.getDeletingService().getDeletedKeySupplier().reInitItr(); final String volumeName = getTestName(); final String bucketName = uniqueObjectName("bucket"); final String keyName = uniqueObjectName("key"); @@ -877,9 +939,10 @@ private long getRunCount() { private int countKeysPendingDeletion() { try { - final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) + final int count = keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, + keyManager.getDeletingService().getDeletedKeySupplier()) .getKeyBlocksList().size(); - LOG.debug("KeyManager keys pending deletion: {}", count); + LOG.info("KeyManager keys pending deletion: {}", count); return count; } catch (IOException e) { throw new UncheckedIOException(e); @@ -888,7 +951,7 @@ private int countKeysPendingDeletion() { private long countBlocksPendingDeletion() { try { - return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE) + return keyManager.getPendingDeletionKeys(Integer.MAX_VALUE, keyDeletingService.getDeletedKeySupplier()) .getKeyBlocksList() .stream() .map(BlockGroup::getBlockIDList)