diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java index 1662904853fb..3c7b35dd23ed 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDeletingServiceIntegrationTest.java @@ -620,9 +620,9 @@ public void testParallelExcecutionOfKeyDeletionAndSnapshotDeletion() throws Exce om.getOmSnapshotManager().getSnapshot(testBucket.getVolumeName(), testBucket.getName(), testBucket.getName() + "snap2")) { renamesKeyEntries = snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", 1000); + testBucket.getName(), "", (kv) -> true, 1000); deletedKeyEntries = snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", 1000); + testBucket.getName(), "", (kv) -> true, 1000); deletedDirEntries = snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(), testBucket.getName(), 1000); } @@ -658,20 +658,20 @@ public void testParallelExcecutionOfKeyDeletionAndSnapshotDeletion() throws Exce testBucket.getName() + "snap2")) { Assertions.assertEquals(Collections.emptyList(), snapshot.get().getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", 1000)); + testBucket.getName(), "", (kv) -> true, 1000)); Assertions.assertEquals(Collections.emptyList(), snapshot.get().getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", 1000)); + testBucket.getName(), "", (kv) -> true, 1000)); Assertions.assertEquals(Collections.emptyList(), snapshot.get().getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(), testBucket.getName(), 1000)); } List> aosRenamesKeyEntries = om.getKeyManager().getRenamesKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", 1000); + testBucket.getName(), "", (kv) -> true, 1000); List>> aosDeletedKeyEntries = om.getKeyManager().getDeletedKeyEntries(testBucket.getVolumeName(), - testBucket.getName(), "", 1000); + testBucket.getName(), "", (kv) -> true, 1000); List> aosDeletedDirEntries = om.getKeyManager().getDeletedDirEntries(testBucket.getVolumeName(), testBucket.getName(), 1000); diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index fa7a5caf443a..78d417367e3b 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -1392,6 +1392,7 @@ message PurgeKeysRequest { repeated SnapshotMoveKeyInfos keysToUpdate = 3; // previous snapshotID can also be null & this field would be absent in older requests. optional NullableUUID expectedPreviousSnapshotID = 4; + repeated string renamedKeys = 5; } message PurgeKeysResponse { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeletingServiceMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeletingServiceMetrics.java index 3e6a4b937f47..baa4a34e774b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeletingServiceMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeletingServiceMetrics.java @@ -65,6 +65,8 @@ public final class DeletingServiceMetrics { */ @Metric("Total no. of keys purged") private MutableGaugeLong numKeysPurged; + @Metric("Total no. of rename entries purged") + private MutableGaugeLong numRenameEntriesPurged; private DeletingServiceMetrics() { this.registry = new MetricsRegistry(METRICS_SOURCE_NAME); @@ -154,6 +156,10 @@ public void incrNumKeysPurged(long keysPurged) { this.numKeysPurged.incr(keysPurged); } + public void incrNumRenameEntriesPurged(long renameEntriesPurged) { + this.numRenameEntriesPurged.incr(renameEntriesPurged); + } + @VisibleForTesting public void resetDirectoryMetrics() { numDirsPurged.set(0); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 61f46634ece8..0af075035704 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; +import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.service.CompactionService; import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; import org.apache.hadoop.ozone.om.service.KeyDeletingService; @@ -148,13 +149,16 @@ PendingKeysDeletion getPendingDeletionKeys( /** * Returns a list rename entries from the snapshotRenamedTable. * - * @param size max number of keys to return. + * @param count max number of keys to return. + * @param filter filter to apply on the entries. * @return a Pair of list of {@link org.apache.hadoop.hdds.utils.db.Table.KeyValue} representing the keys in the * underlying metadataManager. * @throws IOException */ List> getRenamesKeyEntries( - String volume, String bucket, String startKey, int size) throws IOException; + String volume, String bucket, String startKey, + CheckedFunction, Boolean, IOException> filter, int count) + throws IOException; /** @@ -178,13 +182,16 @@ CheckedFunction getPreviousSnapshotOzoneKeyI /** * Returns a list deleted entries from the deletedTable. * - * @param size max number of keys to return. + * @param count max number of keys to return. + * @param filter filter to apply on the entries. * @return a Pair of list of {@link org.apache.hadoop.hdds.utils.db.Table.KeyValue} representing the keys in the * underlying metadataManager. * @throws IOException */ List>> getDeletedKeyEntries( - String volume, String bucket, String startKey, int size) throws IOException; + String volume, String bucket, String startKey, + CheckedFunction, Boolean, IOException> filter, + int count) throws IOException; /** * Returns the names of up to {@code count} open keys whose age is diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index a29e8fdfadba..b399d6bb9ceb 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -797,7 +797,9 @@ public PendingKeysDeletion getPendingDeletionKeys( private List> getTableEntries(String startKey, TableIterator> tableIterator, - Function valueFunction, int size) throws IOException { + Function valueFunction, + CheckedFunction, Boolean, IOException> filter, + int size) throws IOException { List> entries = new ArrayList<>(); /* Seek to the start key if it's not null. The next key in queue is ensured to start with the bucket prefix, {@link org.apache.hadoop.hdds.utils.db.Table#iterator(bucketPrefix)} would ensure this. @@ -810,7 +812,7 @@ private List> getTableEntries(String startKey, int currentCount = 0; while (tableIterator.hasNext() && currentCount < size) { Table.KeyValue kv = tableIterator.next(); - if (kv != null) { + if (kv != null && filter.apply(kv)) { entries.add(Table.newKeyValue(kv.getKey(), valueFunction.apply(kv.getValue()))); currentCount++; } @@ -832,11 +834,12 @@ private Optional getBucketPrefix(String volumeName, String bucketName, b @Override public List> getRenamesKeyEntries( - String volume, String bucket, String startKey, int size) throws IOException { + String volume, String bucket, String startKey, + CheckedFunction, Boolean, IOException> filter, int size) throws IOException { Optional bucketPrefix = getBucketPrefix(volume, bucket, false); try (TableIterator> renamedKeyIter = metadataManager.getSnapshotRenamedTable().iterator(bucketPrefix.orElse(""))) { - return getTableEntries(startKey, renamedKeyIter, Function.identity(), size); + return getTableEntries(startKey, renamedKeyIter, Function.identity(), filter, size); } } @@ -880,11 +883,13 @@ private CheckedFunction getPreviousSnapshotOzone @Override public List>> getDeletedKeyEntries( - String volume, String bucket, String startKey, int size) throws IOException { + String volume, String bucket, String startKey, + CheckedFunction, Boolean, IOException> filter, + int size) throws IOException { Optional bucketPrefix = getBucketPrefix(volume, bucket, false); try (TableIterator> delKeyIter = metadataManager.getDeletedTable().iterator(bucketPrefix.orElse(""))) { - return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, size); + return getTableEntries(startKey, delKeyIter, RepeatedOmKeyInfo::cloneOmKeyInfoList, filter, size); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java index 75d519f2b33f..3fd000e523a5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java @@ -91,6 +91,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut List keysToBePurgedList = new ArrayList<>(); int numKeysDeleted = 0; + List renamedKeysToBePurged = new ArrayList<>(purgeKeysRequest.getRenamedKeysList()); for (DeletedKeys bucketWithDeleteKeys : bucketDeletedKeysList) { List keysList = bucketWithDeleteKeys.getKeysList(); keysToBePurgedList.addAll(keysList); @@ -98,8 +99,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut } DeletingServiceMetrics deletingServiceMetrics = ozoneManager.getDeletionMetrics(); deletingServiceMetrics.incrNumKeysPurged(numKeysDeleted); + deletingServiceMetrics.incrNumRenameEntriesPurged(renamedKeysToBePurged.size()); - if (keysToBePurgedList.isEmpty()) { + if (keysToBePurgedList.isEmpty() && renamedKeysToBePurged.isEmpty()) { return new OMKeyPurgeResponse(createErrorOMResponse(omResponse, new OMException("None of the keys can be purged be purged since a new snapshot was created for all the " + "buckets, making this request invalid", OMException.ResultCodes.KEY_DELETION_ERROR))); @@ -118,7 +120,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut } return new OMKeyPurgeResponse(omResponse.build(), - keysToBePurgedList, fromSnapshotInfo, keysToUpdateList); + keysToBePurgedList, renamedKeysToBePurged, fromSnapshotInfo, keysToUpdateList); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java index 7a1aebe6a4f5..8571fa07741c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java @@ -45,15 +45,18 @@ @CleanupTableInfo(cleanupTables = {DELETED_TABLE, SNAPSHOT_INFO_TABLE}) public class OMKeyPurgeResponse extends OmKeyResponse { private List purgeKeyList; + private List renamedList; private SnapshotInfo fromSnapshot; private List keysToUpdateList; public OMKeyPurgeResponse(@Nonnull OMResponse omResponse, @Nonnull List keyList, + @Nonnull List renamedList, SnapshotInfo fromSnapshot, List keysToUpdate) { super(omResponse); this.purgeKeyList = keyList; + this.renamedList = renamedList; this.fromSnapshot = fromSnapshot; this.keysToUpdateList = keysToUpdate; } @@ -103,19 +106,21 @@ private void processKeysToUpdate(BatchOperation batchOp, for (SnapshotMoveKeyInfos keyToUpdate : keysToUpdateList) { List keyInfosList = keyToUpdate.getKeyInfosList(); - RepeatedOmKeyInfo repeatedOmKeyInfo = - createRepeatedOmKeyInfo(keyInfosList); + RepeatedOmKeyInfo repeatedOmKeyInfo = createRepeatedOmKeyInfo(keyInfosList); metadataManager.getDeletedTable().putWithBatch(batchOp, keyToUpdate.getKey(), repeatedOmKeyInfo); } } - private void processKeys(BatchOperation batchOp, - OMMetadataManager metadataManager) throws IOException { + private void processKeys(BatchOperation batchOp, OMMetadataManager metadataManager) throws IOException { for (String key : purgeKeyList) { metadataManager.getDeletedTable().deleteWithBatch(batchOp, key); } + // Delete rename entries. + for (String key : renamedList) { + metadataManager.getSnapshotRenamedTable().deleteWithBatch(batchOp, key); + } } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index 155ea9a37a65..536406111a96 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -102,7 +102,7 @@ public AbstractKeyDeletingService(String serviceName, long interval, } protected Pair processKeyDeletes(List keyBlocksList, - Map keysToModify, + Map keysToModify, List renameEntries, String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException, InterruptedException { long startTime = Time.monotonicNow(); @@ -125,7 +125,7 @@ protected Pair processKeyDeletes(List keyBlocksLis if (blockDeletionResults != null) { long purgeStartTime = Time.monotonicNow(); purgeResult = submitPurgeKeysRequest(blockDeletionResults, - keysToModify, snapTableKey, expectedPreviousSnapshotId); + keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId); int limit = ozoneManager.getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK, OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.", @@ -142,8 +142,8 @@ protected Pair processKeyDeletes(List keyBlocksLis * @param keysToModify Updated list of RepeatedOmKeyInfo */ private Pair submitPurgeKeysRequest(List results, - Map keysToModify, String snapTableKey, UUID expectedPreviousSnapshotId) - throws InterruptedException { + Map keysToModify, List renameEntriesToBeDeleted, + String snapTableKey, UUID expectedPreviousSnapshotId) throws InterruptedException { List purgeKeys = new ArrayList<>(); // Put all keys to be purged in a list @@ -191,7 +191,10 @@ private Pair submitPurgeKeysRequest(List keysToUpdateList = new ArrayList<>(); if (keysToModify != null) { for (Map.Entry keyToModify : diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index faf320ab85db..9dba72eb8d6c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -55,6 +55,7 @@ import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter; +import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableRenameEntryFilter; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; @@ -230,20 +231,33 @@ private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyMan // Purge deleted Keys in the deletedTable && rename entries in the snapshotRenamedTable which doesn't have a // reference in the previous snapshot. try (ReclaimableKeyFilter reclaimableKeyFilter = new ReclaimableKeyFilter(getOzoneManager(), - omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock)) { + omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock); + ReclaimableRenameEntryFilter renameEntryFilter = new ReclaimableRenameEntryFilter( + getOzoneManager(), omSnapshotManager, snapshotChainManager, currentSnapshotInfo, + keyManager, lock)) { + List renamedTableEntries = + keyManager.getRenamesKeyEntries(volume, bucket, null, renameEntryFilter, remainNum).stream() + .map(entry -> { + try { + return entry.getKey(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).collect(Collectors.toList()); + remainNum -= renamedTableEntries.size(); + // Get pending keys that can be deleted PendingKeysDeletion pendingKeysDeletion = currentSnapshotInfo == null ? keyManager.getPendingDeletionKeys(reclaimableKeyFilter, remainNum) : keyManager.getPendingDeletionKeys(volume, bucket, null, reclaimableKeyFilter, remainNum); List keyBlocksList = pendingKeysDeletion.getKeyBlocksList(); //submit purge requests if there are renamed entries to be purged or keys to be purged. - if (keyBlocksList != null && !keyBlocksList.isEmpty()) { + if (!renamedTableEntries.isEmpty() || keyBlocksList != null && !keyBlocksList.isEmpty()) { // Validating if the previous snapshot is still the same before purging the blocks. SnapshotUtils.validatePreviousSnapshotId(currentSnapshotInfo, snapshotChainManager, expectedPreviousSnapshotId); - Pair purgeResult = processKeyDeletes(keyBlocksList, - pendingKeysDeletion.getKeysToModify(), snapshotTableKey, - expectedPreviousSnapshotId); + Pair purgeResult = processKeyDeletes(keyBlocksList, pendingKeysDeletion.getKeysToModify(), + renamedTableEntries, snapshotTableKey, expectedPreviousSnapshotId); remainNum -= purgeResult.getKey(); successStatus = purgeResult.getValue(); metrics.incrNumKeysProcessed(keyBlocksList.size()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 4c2151baaffd..171cb2bb02e7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -193,7 +193,7 @@ public BackgroundTaskResult call() throws InterruptedException { // Get all entries from deletedKeyTable. List>> deletedKeyEntries = snapshotKeyManager.getDeletedKeyEntries(snapInfo.getVolumeName(), snapInfo.getBucketName(), - null, remaining); + null, (kv) -> true, remaining); moveCount += deletedKeyEntries.size(); // Get all entries from deletedDirTable. List> deletedDirEntries = snapshotKeyManager.getDeletedDirEntries( @@ -201,7 +201,7 @@ public BackgroundTaskResult call() throws InterruptedException { moveCount += deletedDirEntries.size(); // Get all entries from snapshotRenamedTable. List> renameEntries = snapshotKeyManager.getRenamesKeyEntries( - snapInfo.getVolumeName(), snapInfo.getBucketName(), null, remaining - moveCount); + snapInfo.getVolumeName(), snapInfo.getBucketName(), null, (kv) -> true, remaining - moveCount); moveCount += renameEntries.size(); if (moveCount > 0) { List deletedKeys = new ArrayList<>(deletedKeyEntries.size()); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index 22740426e29c..645e561a206e 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.ratis.util.function.CheckedFunction; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -74,7 +75,8 @@ private List> mockTableIterator( Class valueClass, Table table, int numberOfVolumes, int numberOfBucketsPerVolume, int numberOfKeysPerBucket, String volumeNamePrefix, String bucketNamePrefix, String keyPrefix, Integer volumeNumberFilter, Integer bucketNumberFilter, Integer startVolumeNumber, Integer startBucketNumber, - Integer startKeyNumber, int numberOfEntries) throws IOException { + Integer startKeyNumber, CheckedFunction, Boolean, IOException> filter, + int numberOfEntries) throws IOException { TreeMap values = new TreeMap<>(); List> keyValues = new ArrayList<>(); String startKey = startVolumeNumber == null || startBucketNumber == null || startKeyNumber == null ? null @@ -98,7 +100,13 @@ private List> mockTableIterator( } when(table.iterator(anyString())).thenAnswer(i -> new MapBackedTableIterator<>(values, i.getArgument(0))); - return keyValues.subList(0, Math.min(numberOfEntries, keyValues.size())); + return keyValues.stream().filter(kv -> { + try { + return filter.apply(kv); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).limit(numberOfEntries).collect(Collectors.toList()); } @ParameterizedTest @@ -119,10 +127,12 @@ public void testGetDeletedKeyEntries(int numberOfVolumes, int numberOfBucketsPer KeyManagerImpl km = new KeyManagerImpl(null, null, metadataManager, configuration, null, null, null); Table mockedDeletedTable = Mockito.mock(Table.class); when(metadataManager.getDeletedTable()).thenReturn(mockedDeletedTable); + CheckedFunction, Boolean, IOException> filter = + (kv) -> Long.parseLong(kv.getKey().split(keyPrefix)[1]) % 2 == 0; List>> expectedEntries = mockTableIterator( RepeatedOmKeyInfo.class, mockedDeletedTable, numberOfVolumes, numberOfBucketsPerVolume, numberOfKeysPerBucket, volumeNamePrefix, bucketNamePrefix, keyPrefix, volumeNumber, bucketNumber, startVolumeNumber, startBucketNumber, - startKeyNumber, numberOfEntries).stream() + startKeyNumber, filter, numberOfEntries).stream() .map(kv -> { try { String key = kv.getKey(); @@ -140,9 +150,10 @@ public void testGetDeletedKeyEntries(int numberOfVolumes, int numberOfBucketsPer : (String.format("/%s%010d/%s%010d/%s%010d", volumeNamePrefix, startVolumeNumber, bucketNamePrefix, startBucketNumber, keyPrefix, startKeyNumber)); if (expectedException != null) { - assertThrows(expectedException, () -> km.getDeletedKeyEntries(volumeName, bucketName, startKey, numberOfEntries)); + assertThrows(expectedException, () -> km.getDeletedKeyEntries(volumeName, bucketName, startKey, filter, + numberOfEntries)); } else { - assertEquals(expectedEntries, km.getDeletedKeyEntries(volumeName, bucketName, startKey, numberOfEntries)); + assertEquals(expectedEntries, km.getDeletedKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries)); } } @@ -164,19 +175,22 @@ public void testGetRenameKeyEntries(int numberOfVolumes, int numberOfBucketsPerV KeyManagerImpl km = new KeyManagerImpl(null, null, metadataManager, configuration, null, null, null); Table mockedRenameTable = Mockito.mock(Table.class); when(metadataManager.getSnapshotRenamedTable()).thenReturn(mockedRenameTable); + CheckedFunction, Boolean, IOException> filter = + (kv) -> Long.parseLong(kv.getKey().split("/")[3]) % 2 == 0; List> expectedEntries = mockTableIterator( String.class, mockedRenameTable, numberOfVolumes, numberOfBucketsPerVolume, numberOfKeysPerBucket, volumeNamePrefix, bucketNamePrefix, keyPrefix, volumeNumber, bucketNumber, startVolumeNumber, startBucketNumber, - startKeyNumber, numberOfEntries); + startKeyNumber, filter, numberOfEntries); String volumeName = volumeNumber == null ? null : (String.format("%s%010d", volumeNamePrefix, volumeNumber)); String bucketName = bucketNumber == null ? null : (String.format("%s%010d", bucketNamePrefix, bucketNumber)); String startKey = startVolumeNumber == null || startBucketNumber == null || startKeyNumber == null ? null : (String.format("/%s%010d/%s%010d/%s%010d", volumeNamePrefix, startVolumeNumber, bucketNamePrefix, startBucketNumber, keyPrefix, startKeyNumber)); if (expectedException != null) { - assertThrows(expectedException, () -> km.getRenamesKeyEntries(volumeName, bucketName, startKey, numberOfEntries)); + assertThrows(expectedException, () -> km.getRenamesKeyEntries(volumeName, bucketName, startKey, + filter, numberOfEntries)); } else { - assertEquals(expectedEntries, km.getRenamesKeyEntries(volumeName, bucketName, startKey, numberOfEntries)); + assertEquals(expectedEntries, km.getRenamesKeyEntries(volumeName, bucketName, startKey, filter, numberOfEntries)); } } @@ -202,7 +216,7 @@ public void testGetDeletedDirEntries(int numberOfVolumes, int numberOfBucketsPer List> expectedEntries = mockTableIterator( OmKeyInfo.class, mockedDeletedDirTable, numberOfVolumes, numberOfBucketsPerVolume, numberOfKeysPerBucket, volumeNamePrefix, bucketNamePrefix, keyPrefix, volumeNumber, bucketNumber, startVolumeNumber, startBucketNumber, - startKeyNumber, numberOfEntries); + startKeyNumber, (kv) -> true, numberOfEntries); String volumeName = volumeNumber == null ? null : (String.format("%s%010d", volumeNamePrefix, volumeNumber)); String bucketName = bucketNumber == null ? null : (String.format("%s%010d", bucketNamePrefix, bucketNumber)); if (expectedException != null) { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java index 7b22c26c2bb4..cfde5ab8560f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java @@ -275,6 +275,16 @@ public static void addKeyToTable(boolean openKeyTable, boolean addToCache, omMetadataManager); } + /** + * Add key entry to SnapshotRenamedTable. + */ + public static String addRenamedEntryToTable(long trxnLogIndex, String volumeName, String bucketName, String key, + OMMetadataManager omMetadataManager) throws Exception { + String renameKey = omMetadataManager.getRenameKey(volumeName, bucketName, trxnLogIndex); + omMetadataManager.getSnapshotRenamedTable().put(renameKey, key); + return renameKey; + } + /** * Add key entry to KeyTable. if openKeyTable flag is true, add's entries * to openKeyTable, else add's it to keyTable. diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java index 39c39953438f..3ca62ca3e340 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.ozone.om.OmSnapshot; @@ -53,7 +54,7 @@ public class TestOMKeyPurgeRequestAndResponse extends TestOMKeyRequest { * Creates volume, bucket and key entries and adds to OM DB and then * deletes these keys to move them to deletedKeys table. */ - private List createAndDeleteKeys(Integer trxnIndex, String bucket) + private Pair, List> createAndDeleteKeysAndRenamedEntry(Integer trxnIndex, String bucket) throws Exception { if (bucket == null) { bucket = bucketName; @@ -63,11 +64,14 @@ private List createAndDeleteKeys(Integer trxnIndex, String bucket) omMetadataManager); List ozoneKeyNames = new ArrayList<>(numKeys); + List renamedEntries = new ArrayList<>(numKeys); for (int i = 1; i <= numKeys; i++) { String key = keyName + "-" + i; OMRequestTestUtils.addKeyToTable(false, false, volumeName, bucket, key, clientID, replicationConfig, trxnIndex++, omMetadataManager); + renamedEntries.add(OMRequestTestUtils.addRenamedEntryToTable(trxnIndex, volumeName, bucket, key, + omMetadataManager)); ozoneKeyNames.add(omMetadataManager.getOzoneKey( volumeName, bucket, key)); } @@ -79,14 +83,14 @@ private List createAndDeleteKeys(Integer trxnIndex, String bucket) deletedKeyNames.add(deletedKeyName); } - return deletedKeyNames; + return Pair.of(deletedKeyNames, renamedEntries); } /** * Create OMRequest which encapsulates DeleteKeyRequest. * @return OMRequest */ - private OMRequest createPurgeKeysRequest(List deletedKeys, + private OMRequest createPurgeKeysRequest(List deletedKeys, List renamedEntries, String snapshotDbKey) { DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder() .setVolumeName(volumeName) @@ -94,7 +98,7 @@ private OMRequest createPurgeKeysRequest(List deletedKeys, .addAllKeys(deletedKeys) .build(); PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder() - .addDeletedKeys(deletedKeysInBucket); + .addDeletedKeys(deletedKeysInBucket).addAllRenamedKeys(renamedEntries); if (snapshotDbKey != null) { purgeKeysRequest.setSnapshotTableKey(snapshotDbKey); @@ -123,16 +127,20 @@ private OMRequest preExecute(OMRequest originalOmRequest) throws IOException { @Test public void testValidateAndUpdateCache() throws Exception { // Create and Delete keys. The keys should be moved to DeletedKeys table - List deletedKeyNames = createAndDeleteKeys(1, null); + Pair, List> deleteKeysAndRenamedEntry = createAndDeleteKeysAndRenamedEntry(1, null); // The keys should be present in the DeletedKeys table before purging - for (String deletedKey : deletedKeyNames) { + for (String deletedKey : deleteKeysAndRenamedEntry.getKey()) { assertTrue(omMetadataManager.getDeletedTable().isExist( deletedKey)); } + for (String renamedKey : deleteKeysAndRenamedEntry.getValue()) { + assertTrue(omMetadataManager.getSnapshotRenamedTable().isExist(renamedKey)); + } // Create PurgeKeysRequest to purge the deleted keys - OMRequest omRequest = createPurgeKeysRequest(deletedKeyNames, null); + OMRequest omRequest = createPurgeKeysRequest(deleteKeysAndRenamedEntry.getKey(), + deleteKeysAndRenamedEntry.getValue(), null); OMRequest preExecutedRequest = preExecute(omRequest); OMKeyPurgeRequest omKeyPurgeRequest = @@ -150,7 +158,8 @@ public void testValidateAndUpdateCache() throws Exception { omMetadataManager.getStore().initBatchOperation()) { OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse( - omResponse, deletedKeyNames, null, null); + omResponse, deleteKeysAndRenamedEntry.getKey(), deleteKeysAndRenamedEntry.getValue(), null, + null); omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation); // Do manual commit and see whether addToBatch is successful or not. @@ -158,37 +167,49 @@ public void testValidateAndUpdateCache() throws Exception { } // The keys should not exist in the DeletedKeys table - for (String deletedKey : deletedKeyNames) { + for (String deletedKey : deleteKeysAndRenamedEntry.getKey()) { assertFalse(omMetadataManager.getDeletedTable().isExist(deletedKey)); } + // Renamed entry should not exist + for (String renamedKey : deleteKeysAndRenamedEntry.getValue()) { + assertFalse(omMetadataManager.getSnapshotRenamedTable().isExist(renamedKey)); + } } @Test public void testKeyPurgeInSnapshot() throws Exception { // Create and Delete keys. The keys should be moved to DeletedKeys table - List deletedKeyNames = createAndDeleteKeys(1, null); + Pair, List> deleteKeysAndRenamedEntry = createAndDeleteKeysAndRenamedEntry(1, null); SnapshotInfo snapInfo = createSnapshot("snap1"); assertEquals(snapInfo.getLastTransactionInfo(), TransactionInfo.valueOf(TransactionInfo.getTermIndex(1L)).toByteString()); // The keys should be not present in the active Db's deletedTable - for (String deletedKey : deletedKeyNames) { + for (String deletedKey : deleteKeysAndRenamedEntry.getKey()) { assertFalse(omMetadataManager.getDeletedTable().isExist(deletedKey)); } + for (String renamedKey : deleteKeysAndRenamedEntry.getValue()) { + assertFalse(omMetadataManager.getSnapshotRenamedTable().isExist(renamedKey)); + } UncheckedAutoCloseableSupplier rcOmSnapshot = ozoneManager.getOmSnapshotManager() .getSnapshot(snapInfo.getVolumeName(), snapInfo.getBucketName(), snapInfo.getName()); OmSnapshot omSnapshot = rcOmSnapshot.get(); // The keys should be present in the snapshot's deletedTable - for (String deletedKey : deletedKeyNames) { + for (String deletedKey : deleteKeysAndRenamedEntry.getKey()) { assertTrue(omSnapshot.getMetadataManager() .getDeletedTable().isExist(deletedKey)); } + // The keys should be present in the snapshot's deletedTable + for (String renamedKey : deleteKeysAndRenamedEntry.getValue()) { + assertTrue(omSnapshot.getMetadataManager() + .getSnapshotRenamedTable().isExist(renamedKey)); + } // Create PurgeKeysRequest to purge the deleted keys - OMRequest omRequest = createPurgeKeysRequest(deletedKeyNames, - snapInfo.getTableKey()); + OMRequest omRequest = createPurgeKeysRequest(deleteKeysAndRenamedEntry.getKey(), + deleteKeysAndRenamedEntry.getValue(), snapInfo.getTableKey()); OMRequest preExecutedRequest = preExecute(omRequest); OMKeyPurgeRequest omKeyPurgeRequest = @@ -211,7 +232,8 @@ public void testKeyPurgeInSnapshot() throws Exception { try (BatchOperation batchOperation = omMetadataManager.getStore().initBatchOperation()) { - OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(omResponse, deletedKeyNames, snapInfo, null); + OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse(omResponse, deleteKeysAndRenamedEntry.getKey(), + deleteKeysAndRenamedEntry.getValue(), snapInfo, null); omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation); // Do manual commit and see whether addToBatch is successful or not. @@ -220,11 +242,16 @@ public void testKeyPurgeInSnapshot() throws Exception { snapshotInfoOnDisk = omMetadataManager.getSnapshotInfoTable().getSkipCache(snapInfo.getTableKey()); assertEquals(snapshotInfoOnDisk, snapInfo); // The keys should not exist in the DeletedKeys table - for (String deletedKey : deletedKeyNames) { + for (String deletedKey : deleteKeysAndRenamedEntry.getKey()) { assertFalse(omSnapshot.getMetadataManager() .getDeletedTable().isExist(deletedKey)); } + for (String renamedEntry : deleteKeysAndRenamedEntry.getValue()) { + assertFalse(omSnapshot.getMetadataManager() + .getSnapshotRenamedTable().isExist(renamedEntry)); + } + omSnapshot = null; rcOmSnapshot.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 3b55255ee75b..f799d8af6190 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.utils.db.DBConfigFromFile; @@ -104,6 +103,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentMatchers; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -371,7 +372,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() metadataManager.getOzoneKey(volumeName, bucketName, "key2"))}; assertNotNull(deletedTable.get(deletePathKey[0])); - Mockito.doAnswer(i -> { + doAnswer(i -> { writeClient.createSnapshot(volumeName, bucketName, snap2); GenericTestUtils.waitFor(() -> { try { @@ -423,6 +424,73 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() keyDeletingService.resume(); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRenamedKeyReclaimation(boolean testForSnapshot) + throws IOException, InterruptedException, TimeoutException { + Table snapshotInfoTable = + om.getMetadataManager().getSnapshotInfoTable(); + Table deletedTable = + om.getMetadataManager().getDeletedTable(); + Table keyTable = + om.getMetadataManager().getKeyTable(BucketLayout.DEFAULT); + Table snapshotRenamedTable = om.getMetadataManager().getSnapshotRenamedTable(); + UncheckedAutoCloseableSupplier snapshot = null; + // Suspend KeyDeletingService + keyDeletingService.suspend(); + + final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable); + final long initialKeyCount = metadataManager.countRowsInTable(keyTable); + final long initialDeletedCount = metadataManager.countRowsInTable(deletedTable); + final long initialRenamedCount = metadataManager.countRowsInTable(snapshotRenamedTable); + final String volumeName = getTestName(); + final String bucketName = uniqueObjectName("bucket"); + + // Create Volume and Buckets + try { + createVolumeAndBucket(volumeName, bucketName, false); + OmKeyArgs key1 = createAndCommitKey(volumeName, bucketName, + uniqueObjectName("key"), 3); + OmKeyInfo keyInfo = writeClient.getKeyInfo(key1, false).getKeyInfo(); + assertTableRowCount(keyTable, initialKeyCount + 1, metadataManager); + writeClient.createSnapshot(volumeName, bucketName, uniqueObjectName("snap")); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 1, metadataManager); + OmKeyArgs key2 = createAndCommitKey(volumeName, bucketName, + uniqueObjectName("key"), 3); + assertTableRowCount(keyTable, initialKeyCount + 2, metadataManager); + + writeClient.renameKey(key1, key1.getKeyName() + "_renamed"); + writeClient.renameKey(key2, key2.getKeyName() + "_renamed"); + assertTableRowCount(keyTable, initialKeyCount + 2, metadataManager); + assertTableRowCount(snapshotRenamedTable, initialRenamedCount + 2, metadataManager); + assertTableRowCount(deletedTable, initialDeletedCount, metadataManager); + if (testForSnapshot) { + String snapshotName = writeClient.createSnapshot(volumeName, bucketName, uniqueObjectName("snap")); + assertTableRowCount(snapshotInfoTable, initialSnapshotCount + 2, metadataManager); + assertTableRowCount(snapshotRenamedTable, initialRenamedCount, metadataManager); + snapshot = om.getOmSnapshotManager().getSnapshot(volumeName, bucketName, snapshotName); + snapshotRenamedTable = snapshot.get().getMetadataManager().getSnapshotRenamedTable(); + } + assertTableRowCount(snapshotRenamedTable, initialRenamedCount + 2, metadataManager); + keyDeletingService.resume(); + assertTableRowCount(snapshotRenamedTable, initialRenamedCount + 1, metadataManager); + try (TableIterator> itr = snapshotRenamedTable.iterator()) { + itr.forEachRemaining(entry -> { + try { + String[] val = metadataManager.splitRenameKey(entry.getKey()); + Assertions.assertEquals(Long.valueOf(val[2]), keyInfo.getObjectID()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + } finally { + if (snapshot != null) { + snapshot.close(); + } + } + } + /* * Create Snap1 * Create 10 keys @@ -680,6 +748,7 @@ public void testFailingModifiedKeyPurge() throws IOException, InterruptedExcepti }); List blockGroups = Collections.singletonList(BlockGroup.newBuilder().setKeyName("key1") .addAllBlockIDs(Collections.singletonList(new BlockID(1, 1))).build()); + List renameEntriesToBeDeleted = Collections.singletonList("key2"); OmKeyInfo omKeyInfo = new OmKeyInfo.Builder() .setBucketName("buck") .setVolumeName("vol") @@ -692,8 +761,9 @@ public void testFailingModifiedKeyPurge() throws IOException, InterruptedExcepti .build(); Map keysToModify = Collections.singletonMap("key1", new RepeatedOmKeyInfo(Collections.singletonList(omKeyInfo))); - keyDeletingService.processKeyDeletes(blockGroups, keysToModify, null, null); + keyDeletingService.processKeyDeletes(blockGroups, keysToModify, renameEntriesToBeDeleted, null, null); assertTrue(purgeRequest.get().getPurgeKeysRequest().getKeysToUpdateList().isEmpty()); + assertEquals(renameEntriesToBeDeleted, purgeRequest.get().getPurgeKeysRequest().getRenamedKeysList()); } } @@ -845,7 +915,7 @@ private void deleteKey(String volumeName, .setKeyName(keyName) .setAcls(Collections.emptyList()) .setReplicationConfig(StandaloneReplicationConfig.getInstance( - HddsProtos.ReplicationFactor.THREE)) + THREE)) .build(); writeClient.deleteKey(keyArg); } @@ -861,7 +931,7 @@ private void renameKey(String volumeName, .setKeyName(keyName) .setAcls(Collections.emptyList()) .setReplicationConfig(StandaloneReplicationConfig.getInstance( - HddsProtos.ReplicationFactor.THREE)) + THREE)) .build(); writeClient.renameKey(keyArg, toKeyName); }