diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 5264302d147f..387d46771425 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -1196,10 +1196,10 @@ message DeletedKeys { repeated string keys = 3; } - - message PurgeKeysRequest { repeated DeletedKeys deletedKeys = 1; + // if set, will purge keys in a snapshot DB instead of active DB + optional string snapshotTableKey = 2; } message PurgeKeysResponse { diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index 5d9bbf44202b..276403b3d9cf 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -257,18 +257,6 @@ boolean recoverTrash(String volumeName, String bucketName, List listVolumes(String userName, String prefix, String startKey, int maxKeys) throws IOException; - /** - * Returns a list of pending deletion key info that ups to the given count. - * Each entry is a {@link BlockGroup}, which contains the info about the key - * name and all its associated block IDs. A pending deletion key is stored - * with #deleting# prefix in OM DB. - * - * @param count max number of keys to return. - * @return a list of {@link BlockGroup} represent keys and blocks. - * @throws IOException - */ - List getPendingDeletionKeys(int count) throws IOException; - /** * Returns the names of up to {@code count} open keys whose age is * greater than or equal to {@code expireThreshold}. @@ -544,4 +532,10 @@ String getMultipartKey(long volumeId, long bucketId, */ long getBucketId(String volume, String bucket) throws IOException; + /** + * Returns List<{@link BlockGroup}> for a key in the deletedTable. + * @param deletedKey - key to be purged from the deletedTable + * @return {@link BlockGroup} + */ + List getBlocksForKeyDelete(String deletedKey) throws IOException; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 12f7b89d125c..c2929fefd321 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -282,7 +282,8 @@ public void start(OzoneConfiguration configuration) { TimeUnit.MILLISECONDS); try { snapshotDeletingService = new SnapshotDeletingService( - snapshotServiceInterval, snapshotServiceTimeout, ozoneManager); + snapshotServiceInterval, snapshotServiceTimeout, + ozoneManager, scmClient.getBlockClient()); snapshotDeletingService.start(); } catch (IOException e) { LOG.error("Error starting Snapshot Deleting Service", e); @@ -619,7 +620,10 @@ public List listTrash(String volumeName, @Override public List getPendingDeletionKeys(final int count) throws IOException { - return metadataManager.getPendingDeletionKeys(count); + OmMetadataManagerImpl omMetadataManager = + (OmMetadataManagerImpl) metadataManager; + return omMetadataManager + .getPendingDeletionKeys(count, ozoneManager.getOmSnapshotManager()); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index ea40ba5be6f6..aa0e9d0c6e59 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -106,6 +106,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR; @@ -343,7 +344,7 @@ protected OmMetadataManagerImpl() { File checkpoint = Paths.get(metaDir.toPath().toString(), dbName).toFile(); RDBCheckpointManager.waitForCheckpointDirectoryExist(checkpoint); } - setStore(loadDB(conf, metaDir, dbName, true, + setStore(loadDB(conf, metaDir, dbName, false, java.util.Optional.of(Boolean.TRUE))); initializeOmTables(false); } @@ -1394,9 +1395,18 @@ private PersistedUserVolumeInfo getVolumesByUser(String userNameKey) } } - @Override - public List getPendingDeletionKeys(final int keyCount) - throws IOException { + /** + * Returns a list of pending deletion key info up to the limit. + * Each entry is a {@link BlockGroup}, which contains the info about the key + * name and all its associated block IDs. + * + * @param keyCount max number of keys to return. + * @param omSnapshotManager SnapshotManager + * @return a list of {@link BlockGroup} represent keys and blocks. + * @throws IOException + */ + public List getPendingDeletionKeys(final int keyCount, + OmSnapshotManager omSnapshotManager) throws IOException { List keyBlocksList = Lists.newArrayList(); try (TableIterator> keyIter = getDeletedTable().iterator()) { @@ -1404,6 +1414,15 @@ public List getPendingDeletionKeys(final int keyCount) while (keyIter.hasNext() && currentCount < keyCount) { KeyValue kv = keyIter.next(); if (kv != null) { + List blockGroupList = Lists.newArrayList(); + // Get volume name and bucket name + String[] keySplit = kv.getKey().split(OM_KEY_PREFIX); + // Get the latest snapshot in snapshot path. + OmSnapshot latestSnapshot = getLatestSnapshot(keySplit[1], + keySplit[2], omSnapshotManager); + String bucketKey = getBucketKey(keySplit[1], keySplit[2]); + OmBucketInfo bucketInfo = getBucketTable().get(bucketKey); + // Multiple keys with the same path can be queued in one DB entry RepeatedOmKeyInfo infoList = kv.getValue(); for (OmKeyInfo info : infoList.cloneOmKeyInfoList()) { @@ -1422,6 +1441,37 @@ public List getPendingDeletionKeys(final int keyCount) // 4. Further optimization: Skip all snapshotted keys altogether // e.g. by prefixing all unreclaimable keys, then calling seek + if (latestSnapshot != null) { + Table prevKeyTable = + latestSnapshot.getMetadataManager().getKeyTable( + bucketInfo.getBucketLayout()); + String prevDbKey; + if (bucketInfo.getBucketLayout().isFileSystemOptimized()) { + long volumeId = getVolumeId(info.getVolumeName()); + prevDbKey = getOzonePathKey(volumeId, + bucketInfo.getObjectID(), + info.getParentObjectID(), + info.getKeyName()); + } else { + prevDbKey = getOzoneKey(info.getVolumeName(), + info.getBucketName(), + info.getKeyName()); + } + + OmKeyInfo omKeyInfo = prevKeyTable.get(prevDbKey); + if (omKeyInfo != null && + info.getObjectID() == omKeyInfo.getObjectID()) { + // TODO: [SNAPSHOT] For now, we are not cleaning up a key in + // active DB's deletedTable if any one of the keys in + // RepeatedOmKeyInfo exists in last snapshot's key/fileTable. + // Might need to refactor OMKeyDeleteRequest first to take + // actual reclaimed key objectIDs as input + // in order to avoid any race condition. + blockGroupList.clear(); + break; + } + } + // Add all blocks from all versions of the key to the deletion list for (OmKeyLocationInfoGroup keyLocations : info.getKeyLocationVersions()) { @@ -1432,16 +1482,40 @@ public List getPendingDeletionKeys(final int keyCount) .setKeyName(kv.getKey()) .addAllBlockIDs(item) .build(); - keyBlocksList.add(keyBlocks); + blockGroupList.add(keyBlocks); } currentCount++; } + keyBlocksList.addAll(blockGroupList); } } } return keyBlocksList; } + /** + * Get the latest OmSnapshot for a snapshot path. + */ + private OmSnapshot getLatestSnapshot(String volumeName, String bucketName, + OmSnapshotManager snapshotManager) + throws IOException { + + String latestPathSnapshot = + snapshotChainManager.getLatestPathSnapshot(volumeName + + OM_KEY_PREFIX + bucketName); + String snapTableKey = latestPathSnapshot != null ? + snapshotChainManager.getTableKey(latestPathSnapshot) : null; + SnapshotInfo snapInfo = snapTableKey != null ? + getSnapshotInfoTable().get(snapTableKey) : null; + + OmSnapshot omSnapshot = null; + if (snapInfo != null) { + omSnapshot = (OmSnapshot) snapshotManager.checkForSnapshot(volumeName, + bucketName, getSnapshotPrefix(snapInfo.getName())); + } + return omSnapshot; + } + @Override public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count, BucketLayout bucketLayout) throws IOException { @@ -1786,4 +1860,30 @@ public long getBucketId(String volume, String bucket) throws IOException { } return omBucketInfo.getObjectID(); } + + @Override + public List getBlocksForKeyDelete(String deletedKey) + throws IOException { + RepeatedOmKeyInfo omKeyInfo = getDeletedTable().get(deletedKey); + if (omKeyInfo == null) { + return null; + } + + List result = new ArrayList<>(); + // Add all blocks from all versions of the key to the deletion list + for (OmKeyInfo info : omKeyInfo.cloneOmKeyInfoList()) { + for (OmKeyLocationInfoGroup keyLocations : + info.getKeyLocationVersions()) { + List item = keyLocations.getLocationList().stream() + .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) + .collect(Collectors.toList()); + BlockGroup keyBlocks = BlockGroup.newBuilder() + .setKeyName(deletedKey) + .addAllBlockIDs(item) + .build(); + result.add(keyBlocks); + } + } + return result; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshot.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshot.java index 3f3935e0283a..ee6ec49712e5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshot.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshot.java @@ -68,6 +68,7 @@ public class OmSnapshot implements IOmMetadataReader, Closeable { private final String bucketName; private final String snapshotName; private final OMMetadataManager omMetadataManager; + private final KeyManager keyManager; public OmSnapshot(KeyManager keyManager, PrefixManager prefixManager, @@ -81,6 +82,7 @@ public OmSnapshot(KeyManager keyManager, this.snapshotName = snapshotName; this.bucketName = bucketName; this.volumeName = volumeName; + this.keyManager = keyManager; this.omMetadataManager = keyManager.getMetadataManager(); } @@ -135,7 +137,7 @@ public List listKeys(String vname, String bname, @Override public List getAcl(OzoneObj obj) throws IOException { - // TODO: handle denormalization + // TODO: [SNAPSHOT] handle denormalization return omMetadataReader.getAcl(normalizeOzoneObj(obj)); } @@ -262,4 +264,8 @@ public void close() throws IOException { public OMMetadataManager getMetadataManager() { return omMetadataManager; } + + public KeyManager getKeyManager() { + return keyManager; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index e05dc897dddb..89cb9b12c1ee 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -119,7 +119,7 @@ public final class OmSnapshotManager implements AutoCloseable { // TODO: [SNAPSHOT] create config for max allowed page size. private final int maxPageSize = 1000; - OmSnapshotManager(OzoneManager ozoneManager) { + public OmSnapshotManager(OzoneManager ozoneManager) { this.options = new ManagedDBOptions(); this.options.setCreateIfMissing(true); this.columnFamilyOptions = new ManagedColumnFamilyOptions(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java index ecc1b288446c..f429afb95374 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyPurgeRequest.java @@ -18,8 +18,13 @@ package org.apache.hadoop.ozone.om.request.key; +import java.io.IOException; import java.util.ArrayList; + +import org.apache.hadoop.ozone.om.OmSnapshot; +import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; import org.apache.hadoop.ozone.om.response.OMClientResponse; @@ -33,6 +38,8 @@ import java.util.List; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix; + /** * Handles purging of keys from OM DB. */ @@ -51,6 +58,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, PurgeKeysRequest purgeKeysRequest = getOmRequest().getPurgeKeysRequest(); List bucketDeletedKeysList = purgeKeysRequest .getDeletedKeysList(); + OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager(); + String fromSnapshot = purgeKeysRequest.hasSnapshotTableKey() ? + purgeKeysRequest.getSnapshotTableKey() : null; List keysToBePurgedList = new ArrayList<>(); OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder( @@ -64,10 +74,27 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, } } - omClientResponse = new OMKeyPurgeResponse(omResponse.build(), - keysToBePurgedList); - addResponseToDoubleBuffer(trxnLogIndex, omClientResponse, - omDoubleBufferHelper); + try { + OmSnapshot omFromSnapshot = null; + if (fromSnapshot != null) { + SnapshotInfo snapshotInfo = + ozoneManager.getMetadataManager().getSnapshotInfoTable() + .get(fromSnapshot); + omFromSnapshot = (OmSnapshot) omSnapshotManager + .checkForSnapshot(snapshotInfo.getVolumeName(), + snapshotInfo.getBucketName(), + getSnapshotPrefix(snapshotInfo.getName())); + } + + omClientResponse = new OMKeyPurgeResponse(omResponse.build(), + keysToBePurgedList, omFromSnapshot); + } catch (IOException ex) { + omClientResponse = new OMKeyPurgeResponse( + createErrorOMResponse(omResponse, ex)); + } finally { + addResponseToDoubleBuffer(trxnLogIndex, omClientResponse, + omDoubleBufferHelper); + } return omClientResponse; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java index b20a5d54fc78..0103f925bbc9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java @@ -18,7 +18,9 @@ package org.apache.hadoop.ozone.om.response.key; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmSnapshot; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; @@ -36,19 +38,44 @@ @CleanupTableInfo(cleanupTables = {DELETED_TABLE}) public class OMKeyPurgeResponse extends OmKeyResponse { private List purgeKeyList; + private OmSnapshot fromSnapshot; public OMKeyPurgeResponse(@Nonnull OMResponse omResponse, - @Nonnull List keyList) { + @Nonnull List keyList, OmSnapshot fromSnapshot) { super(omResponse); this.purgeKeyList = keyList; + this.fromSnapshot = fromSnapshot; + } + + /** + * For when the request is not successful. + * For a successful request, the other constructor should be used. + */ + public OMKeyPurgeResponse(@Nonnull OMResponse omResponse) { + super(omResponse); + checkStatusNotOK(); } @Override public void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException { + if (fromSnapshot != null) { + DBStore fromSnapshotStore = fromSnapshot.getMetadataManager().getStore(); + // Init Batch Operation for snapshot db. + try (BatchOperation writeBatch = fromSnapshotStore.initBatchOperation()) { + processKeys(writeBatch, fromSnapshot.getMetadataManager()); + fromSnapshotStore.commitBatchOperation(writeBatch); + } + } else { + processKeys(batchOperation, omMetadataManager); + } + } + + private void processKeys(BatchOperation batchOp, + OMMetadataManager metadataManager) throws IOException { for (String key : purgeKeyList) { - omMetadataManager.getDeletedTable().deleteWithBatch(batchOperation, + metadataManager.getDeletedTable().deleteWithBatch(batchOp, key); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java new file mode 100644 index 000000000000..6af371086fca --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.service; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ServiceException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.om.KeyManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; + +/** + * Abstract's KeyDeletingService. + */ +public abstract class AbstractKeyDeletingService extends BackgroundService { + + private final OzoneManager ozoneManager; + private final ScmBlockLocationProtocol scmClient; + private static ClientId clientId = ClientId.randomId(); + private final AtomicLong runCount; + + public AbstractKeyDeletingService(String serviceName, long interval, + TimeUnit unit, int threadPoolSize, long serviceTimeout, + OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) { + super(serviceName, interval, unit, threadPoolSize, serviceTimeout); + this.ozoneManager = ozoneManager; + this.scmClient = scmClient; + this.runCount = new AtomicLong(0); + } + + protected int processKeyDeletes(List keyBlocksList, + KeyManager manager, + String snapTableKey) throws IOException { + + long startTime = Time.monotonicNow(); + int delCount = 0; + List blockDeletionResults = + scmClient.deleteKeyBlocks(keyBlocksList); + if (blockDeletionResults != null) { + if (isRatisEnabled()) { + delCount = submitPurgeKeysRequest(blockDeletionResults, snapTableKey); + } else { + // TODO: Once HA and non-HA paths are merged, we should have + // only one code path here. Purge keys should go through an + // OMRequest model. + delCount = deleteAllKeys(blockDeletionResults, manager); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Blocks for {} (out of {}) keys are deleted in {} ms", + delCount, blockDeletionResults.size(), + Time.monotonicNow() - startTime); + } + } + return delCount; + } + + /** + * Deletes all the keys that SCM has acknowledged and queued for delete. + * + * @param results DeleteBlockGroups returned by SCM. + * @throws IOException on Error + */ + private int deleteAllKeys(List results, + KeyManager manager) throws IOException { + Table deletedTable = + manager.getMetadataManager().getDeletedTable(); + DBStore store = manager.getMetadataManager().getStore(); + + // Put all keys to delete in a single transaction and call for delete. + int deletedCount = 0; + try (BatchOperation writeBatch = store.initBatchOperation()) { + for (DeleteBlockGroupResult result : results) { + if (result.isSuccess()) { + // Purge key from OM DB. + deletedTable.deleteWithBatch(writeBatch, + result.getObjectKey()); + if (LOG.isDebugEnabled()) { + LOG.debug("Key {} deleted from OM DB", result.getObjectKey()); + } + deletedCount++; + } + } + // Write a single transaction for delete. + store.commitBatchOperation(writeBatch); + } + return deletedCount; + } + + /** + * Submits PurgeKeys request for the keys whose blocks have been deleted + * by SCM. + * @param results DeleteBlockGroups returned by SCM. + */ + private int submitPurgeKeysRequest(List results, + String snapTableKey) { + Map, List> purgeKeysMapPerBucket = + new HashMap<>(); + + // Put all keys to be purged in a list + int deletedCount = 0; + for (DeleteBlockGroupResult result : results) { + if (result.isSuccess()) { + // Add key to PurgeKeys list. + String deletedKey = result.getObjectKey(); + // Parse Volume and BucketName + addToMap(purgeKeysMapPerBucket, deletedKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Key {} set to be purged from OM DB", deletedKey); + } + deletedCount++; + } + } + + PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder(); + if (snapTableKey != null) { + purgeKeysRequest.setSnapshotTableKey(snapTableKey); + } + + // Add keys to PurgeKeysRequest bucket wise. + for (Map.Entry, List> entry : + purgeKeysMapPerBucket.entrySet()) { + Pair volumeBucketPair = entry.getKey(); + DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder() + .setVolumeName(volumeBucketPair.getLeft()) + .setBucketName(volumeBucketPair.getRight()) + .addAllKeys(entry.getValue()) + .build(); + purgeKeysRequest.addDeletedKeys(deletedKeysInBucket); + } + + OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(Type.PurgeKeys) + .setPurgeKeysRequest(purgeKeysRequest) + .setClientId(clientId.toString()) + .build(); + + // Submit PurgeKeys request to OM + try { + RaftClientRequest raftClientRequest = + createRaftClientRequestForPurge(omRequest); + ozoneManager.getOmRatisServer().submitRequest(omRequest, + raftClientRequest); + } catch (ServiceException e) { + LOG.error("PurgeKey request failed. Will retry at next run."); + return 0; + } + + return deletedCount; + } + + private RaftClientRequest createRaftClientRequestForPurge( + OMRequest omRequest) { + return RaftClientRequest.newBuilder() + .setClientId(clientId) + .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId()) + .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId()) + .setCallId(runCount.get()) + .setMessage( + Message.valueOf( + OMRatisHelper.convertRequestToByteString(omRequest))) + .setType(RaftClientRequest.writeRequestType()) + .build(); + } + + /** + * Parse Volume and Bucket Name from ObjectKey and add it to given map of + * keys to be purged per bucket. + */ + private void addToMap(Map, List> map, + String objectKey) { + // Parse volume and bucket name + String[] split = objectKey.split(OM_KEY_PREFIX); + Preconditions.assertTrue(split.length > 3, "Volume and/or Bucket Name " + + "missing from Key Name."); + Pair volumeBucketPair = Pair.of(split[1], split[2]); + if (!map.containsKey(volumeBucketPair)) { + map.put(volumeBucketPair, new ArrayList<>()); + } + map.get(volumeBucketPair).add(objectKey); + } + + public boolean isRatisEnabled() { + if (ozoneManager == null) { + return false; + } + return ozoneManager.isRatisEnabled(); + } + + public OzoneManager getOzoneManager() { + return ozoneManager; + } + + public ScmBlockLocationProtocol getScmClient() { + return scmClient; + } + + /** + * Returns the number of times this Background service has run. + * + * @return Long, run count. + */ + @VisibleForTesting + public AtomicLong getRunCount() { + return runCount; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index cf8db7297d75..284eb9aa1162 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -17,29 +17,15 @@ package org.apache.hadoop.ozone.om.service; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import com.google.protobuf.ServiceException; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OzoneManager; -import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; -import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.hdds.utils.BackgroundService; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; @@ -47,17 +33,10 @@ import com.google.common.annotations.VisibleForTesting; -import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; -import org.apache.hadoop.hdds.utils.db.BatchOperation; -import org.apache.hadoop.hdds.utils.db.DBStore; -import org.apache.hadoop.hdds.utils.db.Table; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +46,7 @@ * metadata accordingly, if scm returns success for keys, then clean up those * keys. */ -public class KeyDeletingService extends BackgroundService { +public class KeyDeletingService extends AbstractKeyDeletingService { private static final Logger LOG = LoggerFactory.getLogger(KeyDeletingService.class); @@ -76,37 +55,22 @@ public class KeyDeletingService extends BackgroundService { // times. private static final int KEY_DELETING_CORE_POOL_SIZE = 1; - private final OzoneManager ozoneManager; - private final ScmBlockLocationProtocol scmClient; private final KeyManager manager; private static ClientId clientId = ClientId.randomId(); private final int keyLimitPerTask; private final AtomicLong deletedKeyCount; - private final AtomicLong runCount; public KeyDeletingService(OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient, KeyManager manager, long serviceInterval, long serviceTimeout, ConfigurationSource conf) { - super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS, - KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); - this.ozoneManager = ozoneManager; - this.scmClient = scmClient; + super(KeyDeletingService.class.getSimpleName(), serviceInterval, + TimeUnit.MILLISECONDS, KEY_DELETING_CORE_POOL_SIZE, + serviceTimeout, ozoneManager, scmClient); this.manager = manager; this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); this.deletedKeyCount = new AtomicLong(0); - this.runCount = new AtomicLong(0); - } - - /** - * Returns the number of times this Background service has run. - * - * @return Long, run count. - */ - @VisibleForTesting - public AtomicLong getRunCount() { - return runCount; } /** @@ -127,18 +91,11 @@ public BackgroundTaskQueue getTasks() { } private boolean shouldRun() { - if (ozoneManager == null) { + if (getOzoneManager() == null) { // OzoneManager can be null for testing return true; } - return ozoneManager.isLeaderReady(); - } - - private boolean isRatisEnabled() { - if (ozoneManager == null) { - return false; - } - return ozoneManager.isRatisEnabled(); + return getOzoneManager().isLeaderReady(); } /** @@ -160,9 +117,8 @@ public BackgroundTaskResult call() throws Exception { // Check if this is the Leader OM. If not leader, no need to execute this // task. if (shouldRun()) { - runCount.incrementAndGet(); + getRunCount().incrementAndGet(); try { - long startTime = Time.monotonicNow(); // TODO: [SNAPSHOT] HDDS-7968. Reclaim eligible key blocks in // snapshot's deletedTable when active DB's deletedTable // doesn't have enough entries left. @@ -172,24 +128,9 @@ public BackgroundTaskResult call() throws Exception { List keyBlocksList = manager .getPendingDeletionKeys(keyLimitPerTask); if (keyBlocksList != null && !keyBlocksList.isEmpty()) { - List results = - scmClient.deleteKeyBlocks(keyBlocksList); - if (results != null) { - int delCount; - if (isRatisEnabled()) { - delCount = submitPurgeKeysRequest(results); - } else { - // TODO: Once HA and non-HA paths are merged, we should have - // only one code path here. Purge keys should go through an - // OMRequest model. - delCount = deleteAllKeys(results); - } - if (delCount > 0) { - LOG.info("Number of keys deleted: {}, elapsed time: {}ms", - delCount, Time.monotonicNow() - startTime); - } - deletedKeyCount.addAndGet(delCount); - } + int delCount = processKeyDeletes(keyBlocksList, + getOzoneManager().getKeyManager(), null); + deletedKeyCount.addAndGet(delCount); } } catch (IOException e) { LOG.error("Error while running delete keys background task. Will " + @@ -199,126 +140,5 @@ public BackgroundTaskResult call() throws Exception { // By design, no one cares about the results of this call back. return EmptyTaskResult.newResult(); } - - /** - * Deletes all the keys that SCM has acknowledged and queued for delete. - * - * @param results DeleteBlockGroups returned by SCM. - * @throws IOException on Error - */ - private int deleteAllKeys(List results) - throws IOException { - Table deletedTable = - manager.getMetadataManager().getDeletedTable(); - DBStore store = manager.getMetadataManager().getStore(); - - // Put all keys to delete in a single transaction and call for delete. - int deletedCount = 0; - try (BatchOperation writeBatch = store.initBatchOperation()) { - for (DeleteBlockGroupResult result : results) { - if (result.isSuccess()) { - // Purge key from OM DB. - deletedTable.deleteWithBatch(writeBatch, - result.getObjectKey()); - if (LOG.isDebugEnabled()) { - LOG.debug("Key {} deleted from OM DB", result.getObjectKey()); - } - deletedCount++; - } - } - // Write a single transaction for delete. - store.commitBatchOperation(writeBatch); - } - return deletedCount; - } - - /** - * Submits PurgeKeys request for the keys whose blocks have been deleted - * by SCM. - * @param results DeleteBlockGroups returned by SCM. - */ - public int submitPurgeKeysRequest(List results) { - Map, List> purgeKeysMapPerBucket = - new HashMap<>(); - - // Put all keys to be purged in a list - int deletedCount = 0; - for (DeleteBlockGroupResult result : results) { - if (result.isSuccess()) { - // Add key to PurgeKeys list. - String deletedKey = result.getObjectKey(); - // Parse Volume and BucketName - addToMap(purgeKeysMapPerBucket, deletedKey); - if (LOG.isDebugEnabled()) { - LOG.debug("Key {} set to be purged from OM DB", deletedKey); - } - deletedCount++; - } - } - - PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder(); - - // Add keys to PurgeKeysRequest bucket wise. - for (Map.Entry, List> entry : - purgeKeysMapPerBucket.entrySet()) { - Pair volumeBucketPair = entry.getKey(); - DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder() - .setVolumeName(volumeBucketPair.getLeft()) - .setBucketName(volumeBucketPair.getRight()) - .addAllKeys(entry.getValue()) - .build(); - purgeKeysRequest.addDeletedKeys(deletedKeysInBucket); - } - - OMRequest omRequest = OMRequest.newBuilder() - .setCmdType(Type.PurgeKeys) - .setPurgeKeysRequest(purgeKeysRequest) - .setClientId(clientId.toString()) - .build(); - - // Submit PurgeKeys request to OM - try { - RaftClientRequest raftClientRequest = - createRaftClientRequestForPurge(omRequest); - ozoneManager.getOmRatisServer().submitRequest(omRequest, - raftClientRequest); - } catch (ServiceException e) { - LOG.error("PurgeKey request failed. Will retry at next run."); - return 0; - } - - return deletedCount; - } - } - - private RaftClientRequest createRaftClientRequestForPurge( - OMRequest omRequest) { - return RaftClientRequest.newBuilder() - .setClientId(clientId) - .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId()) - .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId()) - .setCallId(runCount.get()) - .setMessage( - Message.valueOf( - OMRatisHelper.convertRequestToByteString(omRequest))) - .setType(RaftClientRequest.writeRequestType()) - .build(); - } - - /** - * Parse Volume and Bucket Name from ObjectKey and add it to given map of - * keys to be purged per bucket. - */ - private void addToMap(Map, List> map, - String objectKey) { - // Parse volume and bucket name - String[] split = objectKey.split(OM_KEY_PREFIX); - Preconditions.assertTrue(split.length > 3, "Volume and/or Bucket Name " + - "missing from Key Name."); - Pair volumeBucketPair = Pair.of(split[1], split[2]); - if (!map.containsKey(volumeBucketPair)) { - map.put(volumeBucketPair, new ArrayList<>()); - } - map.get(volumeBucketPair).add(objectKey); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 0b61f40c1478..446881bfecc4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -22,7 +22,7 @@ import com.google.protobuf.ServiceException; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; import org.apache.hadoop.ozone.om.OmSnapshotManager; @@ -71,7 +72,7 @@ /** * Background Service to clean-up deleted snapshot and reclaim space. */ -public class SnapshotDeletingService extends BackgroundService { +public class SnapshotDeletingService extends AbstractKeyDeletingService { private static final Logger LOG = LoggerFactory.getLogger(SnapshotDeletingService.class); @@ -80,7 +81,6 @@ public class SnapshotDeletingService extends BackgroundService { // multiple times. private static final int SNAPSHOT_DELETING_CORE_POOL_SIZE = 1; private final ClientId clientId = ClientId.randomId(); - private final AtomicLong runCount; private final OzoneManager ozoneManager; private final OmSnapshotManager omSnapshotManager; @@ -92,16 +92,16 @@ public class SnapshotDeletingService extends BackgroundService { private final int keyLimitPerSnapshot; public SnapshotDeletingService(long interval, long serviceTimeout, - OzoneManager ozoneManager) throws IOException { + OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) + throws IOException { super(SnapshotDeletingService.class.getSimpleName(), interval, TimeUnit.MILLISECONDS, SNAPSHOT_DELETING_CORE_POOL_SIZE, - serviceTimeout); + serviceTimeout, ozoneManager, scmClient); this.ozoneManager = ozoneManager; this.omSnapshotManager = ozoneManager.getOmSnapshotManager(); OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) ozoneManager.getMetadataManager(); this.chainManager = omMetadataManager.getSnapshotChainManager(); - this.runCount = new AtomicLong(0); this.successRunCount = new AtomicLong(0); this.suspended = new AtomicBoolean(false); this.conf = ozoneManager.getConfiguration(); @@ -115,13 +115,14 @@ public SnapshotDeletingService(long interval, long serviceTimeout, private class SnapshotDeletingTask implements BackgroundTask { + @SuppressWarnings("checkstyle:MethodLength") @Override public BackgroundTaskResult call() throws Exception { if (!shouldRun()) { return BackgroundTaskResult.EmptyTaskResult.newResult(); } - runCount.incrementAndGet(); + getRunCount().incrementAndGet(); Table snapshotInfoTable = ozoneManager.getMetadataManager().getSnapshotInfoTable(); @@ -150,6 +151,7 @@ public BackgroundTaskResult call() throws Exception { Table snapshotDeletedTable = omSnapshot.getMetadataManager().getDeletedTable(); + // TODO: [SNAPSHOT] Check if deletedDirTable is empty. if (snapshotDeletedTable.isEmpty()) { continue; } @@ -197,12 +199,13 @@ public BackgroundTaskResult call() throws Exception { RepeatedOmKeyInfo>> deletedIterator = snapshotDeletedTable .iterator()) { + List keysToPurge = new ArrayList<>(); String snapshotBucketKey = dbBucketKey + OzoneConsts.OM_KEY_PREFIX; iterator.seek(snapshotBucketKey); int deletionCount = 0; while (deletedIterator.hasNext() && - deletionCount <= keyLimitPerSnapshot) { + deletionCount < keyLimitPerSnapshot) { Table.KeyValue deletedKeyValue = deletedIterator.next(); String deletedKey = deletedKeyValue.getKey(); @@ -227,7 +230,7 @@ public BackgroundTaskResult call() throws Exception { HddsProtos.KeyValue.Builder renamedKey = HddsProtos.KeyValue .newBuilder(); - for (OmKeyInfo keyInfo: repeatedOmKeyInfo.getOmKeyInfoList()) { + for (OmKeyInfo keyInfo : repeatedOmKeyInfo.getOmKeyInfoList()) { splitRepeatedOmKeyInfo(toReclaim, toNextDb, renamedKey, keyInfo, previousKeyTable, renamedKeyTable, bucketInfo, volumeId); @@ -238,16 +241,29 @@ public BackgroundTaskResult call() throws Exception { if (!(toReclaim.getKeyInfosCount() == repeatedOmKeyInfo.getOmKeyInfoList().size())) { toReclaimList.add(toReclaim.build()); + toNextDBList.add(toNextDb.build()); + } else { + // The key can be reclaimed here. + List blocksForKeyDelete = omSnapshot + .getMetadataManager() + .getBlocksForKeyDelete(deletedKey); + if (blocksForKeyDelete != null) { + keysToPurge.addAll(blocksForKeyDelete); + } } - toNextDBList.add(toNextDb.build()); - deletionCount++; + if (renamedKey.hasKey() && renamedKey.hasValue()) { renamedKeysList.add(renamedKey.build()); } + deletionCount++; } // Submit Move request to OM. submitSnapshotMoveDeletedKeys(snapInfo, toReclaimList, toNextDBList, renamedKeysList); + + // Delete keys From deletedTable + processKeyDeletes(keysToPurge, omSnapshot.getKeyManager(), + snapInfo.getTableKey()); snapshotLimit--; successRunCount.incrementAndGet(); } catch (IOException ex) { @@ -410,7 +426,7 @@ private void submitRequest(OMRequest omRequest) { .setClientId(clientId) .setServerId(server.getRaftPeerId()) .setGroupId(server.getRaftGroupId()) - .setCallId(runCount.get()) + .setCallId(getRunCount().get()) .setMessage(Message.valueOf( OMRatisHelper.convertRequestToByteString(omRequest))) .setType(RaftClientRequest.writeRequestType()) @@ -425,11 +441,6 @@ private void submitRequest(OMRequest omRequest) { "Will retry at next run.", e); } } - - private boolean isRatisEnabled() { - return ozoneManager.isRatisEnabled(); - } - } @Override @@ -459,10 +470,6 @@ void resume() { suspended.set(false); } - public long getRunCount() { - return runCount.get(); - } - public long getSuccessfulRunCount() { return successRunCount.get(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java index 4b2b46794f16..19df5dee21b7 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java @@ -23,7 +23,12 @@ import java.util.List; import java.util.UUID; +import org.apache.hadoop.ozone.om.OmSnapshot; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotCreateRequest; +import org.apache.hadoop.ozone.om.request.snapshot.TestOMSnapshotCreateRequest; +import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse; import org.junit.Assert; import org.junit.Test; @@ -36,6 +41,11 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.junit.jupiter.api.Assertions; + +import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; /** * Tests {@link OMKeyPurgeRequest} and {@link OMKeyPurgeResponse}. @@ -81,15 +91,20 @@ private List createAndDeleteKeys(Integer trxnIndex, String bucket) * Create OMRequest which encapsulates DeleteKeyRequest. * @return OMRequest */ - private OMRequest createPurgeKeysRequest(List deletedKeys) { + private OMRequest createPurgeKeysRequest(List deletedKeys, + String snapshotDbKey) { DeletedKeys deletedKeysInBucket = DeletedKeys.newBuilder() .setVolumeName(volumeName) .setBucketName(bucketName) .addAllKeys(deletedKeys) .build(); - PurgeKeysRequest purgeKeysRequest = PurgeKeysRequest.newBuilder() - .addDeletedKeys(deletedKeysInBucket) - .build(); + PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder() + .addDeletedKeys(deletedKeysInBucket); + + if (snapshotDbKey != null) { + purgeKeysRequest.setSnapshotTableKey(snapshotDbKey); + } + purgeKeysRequest.build(); return OMRequest.newBuilder() .setPurgeKeysRequest(purgeKeysRequest) @@ -98,6 +113,36 @@ private OMRequest createPurgeKeysRequest(List deletedKeys) { .build(); } + /** + * Create snapshot and checkpoint directory. + */ + private SnapshotInfo createSnapshot(String snapshotName) throws Exception { + when(ozoneManager.isAdmin(any())).thenReturn(true); + BatchOperation batchOperation = omMetadataManager.getStore() + .initBatchOperation(); + OMRequest omRequest = OMRequestTestUtils + .createSnapshotRequest(volumeName, bucketName, snapshotName); + // Pre-Execute OMSnapshotCreateRequest. + OMSnapshotCreateRequest omSnapshotCreateRequest = + TestOMSnapshotCreateRequest.doPreExecute(omRequest, ozoneManager); + + // validateAndUpdateCache OMSnapshotCreateResponse. + OMSnapshotCreateResponse omClientResponse = (OMSnapshotCreateResponse) + omSnapshotCreateRequest.validateAndUpdateCache(ozoneManager, 1L, + ozoneManagerDoubleBufferHelper); + // Add to batch and commit to DB. + omClientResponse.addToDBBatch(omMetadataManager, batchOperation); + omMetadataManager.getStore().commitBatchOperation(batchOperation); + batchOperation.close(); + + String key = SnapshotInfo.getTableKey(volumeName, + bucketName, snapshotName); + SnapshotInfo snapshotInfo = + omMetadataManager.getSnapshotInfoTable().get(key); + Assertions.assertNotNull(snapshotInfo); + return snapshotInfo; + } + private OMRequest preExecute(OMRequest originalOmRequest) throws IOException { OMKeyPurgeRequest omKeyPurgeRequest = new OMKeyPurgeRequest(originalOmRequest); @@ -122,7 +167,7 @@ public void testValidateAndUpdateCache() throws Exception { } // Create PurgeKeysRequest to purge the deleted keys - OMRequest omRequest = createPurgeKeysRequest(deletedKeyNames); + OMRequest omRequest = createPurgeKeysRequest(deletedKeyNames, null); OMRequest preExecutedRequest = preExecute(omRequest); OMKeyPurgeRequest omKeyPurgeRequest = @@ -141,7 +186,7 @@ public void testValidateAndUpdateCache() throws Exception { omMetadataManager.getStore().initBatchOperation()) { OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse( - omResponse, deletedKeyNames); + omResponse, deletedKeyNames, null); omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation); // Do manual commit and see whether addToBatch is successful or not. @@ -154,4 +199,61 @@ public void testValidateAndUpdateCache() throws Exception { deletedKey)); } } + + @Test + public void testKeyPurgeInSnapshot() throws Exception { + // Create and Delete keys. The keys should be moved to DeletedKeys table + List deletedKeyNames = createAndDeleteKeys(1, null); + + SnapshotInfo snapInfo = createSnapshot("snap1"); + // The keys should be not present in the active Db's deletedTable + for (String deletedKey : deletedKeyNames) { + Assert.assertFalse(omMetadataManager.getDeletedTable().isExist( + deletedKey)); + } + + OmSnapshot omSnapshot = (OmSnapshot) ozoneManager.getOmSnapshotManager() + .checkForSnapshot(volumeName, bucketName, + getSnapshotPrefix("snap1")); + + // The keys should be present in the snapshot's deletedTable + for (String deletedKey : deletedKeyNames) { + Assert.assertTrue(omSnapshot.getMetadataManager() + .getDeletedTable().isExist(deletedKey)); + } + + // Create PurgeKeysRequest to purge the deleted keys + OMRequest omRequest = createPurgeKeysRequest(deletedKeyNames, + snapInfo.getTableKey()); + + OMRequest preExecutedRequest = preExecute(omRequest); + OMKeyPurgeRequest omKeyPurgeRequest = + new OMKeyPurgeRequest(preExecutedRequest); + + omKeyPurgeRequest.validateAndUpdateCache(ozoneManager, 100L, + ozoneManagerDoubleBufferHelper); + + OMResponse omResponse = OMResponse.newBuilder() + .setPurgeKeysResponse(PurgeKeysResponse.getDefaultInstance()) + .setCmdType(Type.PurgeKeys) + .setStatus(Status.OK) + .build(); + + try (BatchOperation batchOperation = + omMetadataManager.getStore().initBatchOperation()) { + + OMKeyPurgeResponse omKeyPurgeResponse = new OMKeyPurgeResponse( + omResponse, deletedKeyNames, omSnapshot); + omKeyPurgeResponse.addToDBBatch(omMetadataManager, batchOperation); + + // Do manual commit and see whether addToBatch is successful or not. + omMetadataManager.getStore().commitBatchOperation(batchOperation); + } + + // The keys should not exist in the DeletedKeys table + for (String deletedKey : deletedKeyNames) { + Assert.assertFalse(omSnapshot.getMetadataManager() + .getDeletedTable().isExist(deletedKey)); + } + } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java index 6b81d5b2eb0f..c7a963e40f4c 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; +import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManagerPrepareState; import org.apache.hadoop.ozone.om.ResolvedBucket; import org.apache.hadoop.ozone.om.KeyManager; @@ -208,6 +209,9 @@ public void setup() throws Exception { when(ozoneManager.resolveBucketLink(any(Pair.class), any(OMClientRequest.class))) .thenReturn(new ResolvedBucket(volumeAndBucket, volumeAndBucket)); + OmSnapshotManager omSnapshotManager = new OmSnapshotManager(ozoneManager); + when(ozoneManager.getOmSnapshotManager()) + .thenReturn(omSnapshotManager); } @NotNull diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java index 2d6cb2813b44..2ef6965fd6a8 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java @@ -352,7 +352,7 @@ private OMSnapshotCreateRequest doPreExecute( /** * Static helper method so this could be used in TestOMSnapshotDeleteRequest. */ - static OMSnapshotCreateRequest doPreExecute( + public static OMSnapshotCreateRequest doPreExecute( OMRequest originalRequest, OzoneManager ozoneManager) throws Exception { OMSnapshotCreateRequest omSnapshotCreateRequest = new OMSnapshotCreateRequest(originalRequest); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 540eddf1a46b..ca6765345971 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -31,6 +31,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.ozone.om.KeyManager; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmTestManagers; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient; @@ -371,6 +372,68 @@ private void createAndDeleteKeys(KeyManager keyManager, int keyCount, } } + @Test + public void checkDeletedTableCleanUpForSnapshot() + throws Exception { + OzoneConfiguration conf = createConfAndInitValues(); + OmTestManagers omTestManagers + = new OmTestManagers(conf); + KeyManager keyManager = omTestManagers.getKeyManager(); + writeClient = omTestManagers.getWriteClient(); + om = omTestManagers.getOzoneManager(); + OMMetadataManager metadataManager = omTestManagers.getMetadataManager(); + + String volumeName = String.format("volume%s", + RandomStringUtils.randomAlphanumeric(5)); + String bucketName1 = String.format("bucket%s", + RandomStringUtils.randomAlphanumeric(5)); + String bucketName2 = String.format("bucket%s", + RandomStringUtils.randomAlphanumeric(5)); + String keyName = String.format("key%s", + RandomStringUtils.randomAlphanumeric(5)); + + // Create Volume and Buckets + createVolumeAndBucket(keyManager, volumeName, bucketName1, false); + createVolumeAndBucket(keyManager, volumeName, bucketName2, false); + + // Create the keys + OmKeyArgs key1 = createAndCommitKey(keyManager, volumeName, bucketName1, + keyName, 3); + OmKeyArgs key2 = createAndCommitKey(keyManager, volumeName, bucketName2, + keyName, 3); + + // Create snapshot + String snapName = "snap1"; + writeClient.createSnapshot(volumeName, bucketName1, snapName); + + // Delete the key + writeClient.deleteKey(key1); + writeClient.deleteKey(key2); + + // Run KeyDeletingService + KeyDeletingService keyDeletingService = + (KeyDeletingService) keyManager.getDeletingService(); + GenericTestUtils.waitFor( + () -> keyDeletingService.getDeletedKeyCount().get() >= 1, + 1000, 10000); + Assert.assertTrue(keyDeletingService.getRunCount().get() > 1); + Assert.assertEquals(0, + keyManager.getPendingDeletionKeys(Integer.MAX_VALUE).size()); + + // deletedTable should have deleted key of the snapshot bucket + Assert.assertFalse(metadataManager.getDeletedTable().isEmpty()); + String ozoneKey1 = + metadataManager.getOzoneKey(volumeName, bucketName1, keyName); + String ozoneKey2 = + metadataManager.getOzoneKey(volumeName, bucketName2, keyName); + + // key1 belongs to snapshot, so it should not be deleted when + // KeyDeletingService runs. But key2 can be reclaimed as it doesn't + // belong to any snapshot scope. + Assert.assertTrue(metadataManager.getDeletedTable().isExist(ozoneKey1)); + Assert.assertFalse(metadataManager.getDeletedTable().isExist(ozoneKey2)); + } + private void createVolumeAndBucket(KeyManager keyManager, String volumeName, String bucketName, boolean isVersioningEnabled) throws IOException { // cheat here, just create a volume and bucket entry so that we can