diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index a5df9a1776e7..3edadbc89a1a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -50,6 +50,7 @@ public abstract class BackgroundService { private final int threadPoolSize; private final String threadNamePrefix; private final PeriodicalTask service; + private CompletableFuture future; public BackgroundService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout) { @@ -68,6 +69,11 @@ public BackgroundService(String serviceName, long interval, this.threadNamePrefix = threadNamePrefix; initExecutorAndThreadGroup(); service = new PeriodicalTask(); + this.future = CompletableFuture.completedFuture(null); + } + + protected CompletableFuture getFuture() { + return future; } @VisibleForTesting @@ -138,7 +144,7 @@ public synchronized void run() { while (!tasks.isEmpty()) { BackgroundTask task = tasks.poll(); - CompletableFuture.runAsync(() -> { + future = future.thenCombine(CompletableFuture.runAsync(() -> { long startTime = System.nanoTime(); try { BackgroundTaskResult result = task.call(); @@ -157,7 +163,7 @@ public synchronized void run() { serviceName, endTime - startTime, serviceTimeoutInNanos); } } - }, exec); + }, exec), (Void1, Void) -> null); } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java similarity index 99% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java index fa1f6b9022e2..1933925384fc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestDirectoryDeletingServiceWithFSO.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.ozone; +package org.apache.hadoop.ozone.om.service; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; @@ -77,8 +77,6 @@ import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer; import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; -import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; -import org.apache.hadoop.ozone.om.service.KeyDeletingService; import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableDirFilter; import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedDDSWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRootedDDSWithFSO.java similarity index 97% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedDDSWithFSO.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRootedDDSWithFSO.java index 3c2cfa914edc..3fc7d15f2375 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedDDSWithFSO.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestRootedDDSWithFSO.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.hadoop.fs.ozone; +package org.apache.hadoop.ozone.om.service; -import static org.apache.hadoop.fs.ozone.TestDirectoryDeletingServiceWithFSO.assertSubPathsCount; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.service.TestDirectoryDeletingServiceWithFSO.assertSubPathsCount; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -51,7 +51,6 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java index 584a426716fb..8a5e4ed81ad6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerHASnapshot.java @@ -35,6 +35,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.RandomStringUtils; @@ -314,7 +315,8 @@ private void createFileKey(OzoneBucket bucket, String keyName) * and purgeSnapshot in same batch. */ @Test - public void testKeyAndSnapshotDeletionService() throws IOException, InterruptedException, TimeoutException { + public void testKeyAndSnapshotDeletionService() + throws IOException, InterruptedException, TimeoutException, ExecutionException { OzoneManager omLeader = cluster.getOMLeader(); OzoneManager omFollower; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index ee699e16c31d..3ee50ffd04f4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -17,57 +17,23 @@ package org.apache.hadoop.ozone.om.service; -import static org.apache.hadoop.ozone.OzoneConsts.OBJECT_ID_RECLAIM_BLOCKS; -import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; -import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.isBlockLocationInfoSame; - import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundService; -import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.ozone.ClientVersion; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; -import org.apache.hadoop.ozone.om.DeleteKeysResult; import org.apache.hadoop.ozone.om.DeletingServiceMetrics; -import org.apache.hadoop.ozone.om.KeyManager; -import org.apache.hadoop.ozone.om.OMConfigKeys; -import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; import org.apache.hadoop.ozone.om.OzoneManager; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; -import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.util.function.CheckedFunction; /** * Abstracts common code from KeyDeletingService and DirectoryDeletingService @@ -79,456 +45,53 @@ public abstract class AbstractKeyDeletingService extends BackgroundService private final OzoneManager ozoneManager; private final DeletingServiceMetrics metrics; private final OMPerformanceMetrics perfMetrics; - private final ScmBlockLocationProtocol scmClient; private final ClientId clientId = ClientId.randomId(); - private final AtomicLong deletedDirsCount; - private final AtomicLong movedDirsCount; - private final AtomicLong movedFilesCount; private final AtomicLong runCount; private final AtomicLong callId; + private final AtomicBoolean suspended; private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); public AbstractKeyDeletingService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout, - OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient) { + OzoneManager ozoneManager) { super(serviceName, interval, unit, threadPoolSize, serviceTimeout, ozoneManager.getThreadNamePrefix()); this.ozoneManager = ozoneManager; - this.scmClient = scmClient; - this.deletedDirsCount = new AtomicLong(0); - this.movedDirsCount = new AtomicLong(0); - this.movedFilesCount = new AtomicLong(0); this.runCount = new AtomicLong(0); this.metrics = ozoneManager.getDeletionMetrics(); this.perfMetrics = ozoneManager.getPerfMetrics(); this.callId = new AtomicLong(0); - } - - protected Pair processKeyDeletes(List keyBlocksList, - Map keysToModify, List renameEntries, - String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException { - - long startTime = Time.monotonicNow(); - Pair purgeResult = Pair.of(0, false); - if (LOG.isDebugEnabled()) { - LOG.debug("Send {} key(s) to SCM: {}", - keyBlocksList.size(), keyBlocksList); - } else if (LOG.isInfoEnabled()) { - int logSize = 10; - if (keyBlocksList.size() < logSize) { - logSize = keyBlocksList.size(); - } - LOG.info("Send {} key(s) to SCM, first {} keys: {}", - keyBlocksList.size(), logSize, keyBlocksList.subList(0, logSize)); - } - List blockDeletionResults = - scmClient.deleteKeyBlocks(keyBlocksList); - LOG.info("{} BlockGroup deletion are acked by SCM in {} ms", - keyBlocksList.size(), Time.monotonicNow() - startTime); - if (blockDeletionResults != null) { - long purgeStartTime = Time.monotonicNow(); - purgeResult = submitPurgeKeysRequest(blockDeletionResults, - keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId); - int limit = ozoneManager.getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK, - OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); - LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.", - purgeResult, blockDeletionResults.size(), Time.monotonicNow() - purgeStartTime, limit); - } - perfMetrics.setKeyDeletingServiceLatencyMs(Time.monotonicNow() - startTime); - return purgeResult; - } - - /** - * Submits PurgeKeys request for the keys whose blocks have been deleted - * by SCM. - * @param results DeleteBlockGroups returned by SCM. - * @param keysToModify Updated list of RepeatedOmKeyInfo - */ - private Pair submitPurgeKeysRequest(List results, - Map keysToModify, List renameEntriesToBeDeleted, - String snapTableKey, UUID expectedPreviousSnapshotId) { - List purgeKeys = new ArrayList<>(); - - // Put all keys to be purged in a list - int deletedCount = 0; - Set failedDeletedKeys = new HashSet<>(); - boolean purgeSuccess = true; - for (DeleteBlockGroupResult result : results) { - String deletedKey = result.getObjectKey(); - if (result.isSuccess()) { - // Add key to PurgeKeys list. - if (keysToModify != null && !keysToModify.containsKey(deletedKey)) { - // Parse Volume and BucketName - purgeKeys.add(deletedKey); - if (LOG.isDebugEnabled()) { - LOG.debug("Key {} set to be updated in OM DB, Other versions " + - "of the key that are reclaimable are reclaimed.", deletedKey); - } - } else if (keysToModify == null) { - purgeKeys.add(deletedKey); - if (LOG.isDebugEnabled()) { - LOG.debug("Key {} set to be purged from OM DB", deletedKey); - } - } - deletedCount++; - } else { - // If the block deletion failed, then the deleted keys should also not be modified. - failedDeletedKeys.add(deletedKey); - purgeSuccess = false; - } - } - - PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder(); - if (snapTableKey != null) { - purgeKeysRequest.setSnapshotTableKey(snapTableKey); - } - OzoneManagerProtocolProtos.NullableUUID.Builder expectedPreviousSnapshotNullableUUID = - OzoneManagerProtocolProtos.NullableUUID.newBuilder(); - if (expectedPreviousSnapshotId != null) { - expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId)); - } - purgeKeysRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build()); - DeletedKeys deletedKeys = DeletedKeys.newBuilder() - .setVolumeName("") - .setBucketName("") - .addAllKeys(purgeKeys) - .build(); - purgeKeysRequest.addDeletedKeys(deletedKeys); - // Adding rename entries to be purged. - if (renameEntriesToBeDeleted != null) { - purgeKeysRequest.addAllRenamedKeys(renameEntriesToBeDeleted); - } - List keysToUpdateList = new ArrayList<>(); - if (keysToModify != null) { - for (Map.Entry keyToModify : - keysToModify.entrySet()) { - if (failedDeletedKeys.contains(keyToModify.getKey())) { - continue; - } - SnapshotMoveKeyInfos.Builder keyToUpdate = - SnapshotMoveKeyInfos.newBuilder(); - keyToUpdate.setKey(keyToModify.getKey()); - List keyInfos = - keyToModify.getValue().getOmKeyInfoList().stream() - .map(k -> k.getProtobuf(ClientVersion.CURRENT_VERSION)) - .collect(Collectors.toList()); - keyToUpdate.addAllKeyInfos(keyInfos); - keysToUpdateList.add(keyToUpdate.build()); - } - - if (!keysToUpdateList.isEmpty()) { - purgeKeysRequest.addAllKeysToUpdate(keysToUpdateList); - } - } - - OMRequest omRequest = OMRequest.newBuilder() - .setCmdType(Type.PurgeKeys) - .setPurgeKeysRequest(purgeKeysRequest) - .setClientId(clientId.toString()) - .build(); - - // Submit PurgeKeys request to OM. Acquire bootstrap lock when processing deletes for snapshots. - try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { - OMResponse omResponse = submitRequest(omRequest); - if (omResponse != null) { - purgeSuccess = purgeSuccess && omResponse.getSuccess(); - } - } catch (ServiceException | InterruptedException e) { - LOG.error("PurgeKey request failed. Will retry at next run.", e); - return Pair.of(0, false); - } - - return Pair.of(deletedCount, purgeSuccess); + this.suspended = new AtomicBoolean(false); } protected OMResponse submitRequest(OMRequest omRequest) throws ServiceException { return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, callId.incrementAndGet()); } - protected OMResponse submitPurgePaths(List requests, - String snapTableKey, UUID expectedPreviousSnapshotId) { - OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest = - OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder(); - - if (snapTableKey != null) { - purgeDirRequest.setSnapshotTableKey(snapTableKey); - } - OzoneManagerProtocolProtos.NullableUUID.Builder expectedPreviousSnapshotNullableUUID = - OzoneManagerProtocolProtos.NullableUUID.newBuilder(); - if (expectedPreviousSnapshotId != null) { - expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId)); - } - purgeDirRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build()); - - purgeDirRequest.addAllDeletedPath(requests); - - OzoneManagerProtocolProtos.OMRequest omRequest = - OzoneManagerProtocolProtos.OMRequest.newBuilder() - .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories) - .setPurgeDirectoriesRequest(purgeDirRequest) - .setClientId(clientId.toString()) - .build(); - - // Submit Purge paths request to OM. Acquire bootstrap lock when processing deletes for snapshots. - try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { - return submitRequest(omRequest); - } catch (ServiceException | InterruptedException e) { - LOG.error("PurgePaths request failed. Will retry at next run.", e); - } - return null; - } - - private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest( - final long volumeId, - final long bucketId, - final String purgeDeletedDir, - final List purgeDeletedFiles, - final List markDirsAsDeleted) { - // Put all keys to be purged in a list - PurgePathRequest.Builder purgePathsRequest = PurgePathRequest.newBuilder(); - purgePathsRequest.setVolumeId(volumeId); - purgePathsRequest.setBucketId(bucketId); - - if (purgeDeletedDir != null) { - purgePathsRequest.setDeletedDir(purgeDeletedDir); - } - - for (OmKeyInfo purgeFile : purgeDeletedFiles) { - purgePathsRequest.addDeletedSubFiles( - purgeFile.getProtobuf(true, ClientVersion.CURRENT_VERSION)); - } - - // Add these directories to deletedDirTable, so that its sub-paths will be - // traversed in next iteration to ensure cleanup all sub-children. - for (OmKeyInfo dir : markDirsAsDeleted) { - purgePathsRequest.addMarkDeletedSubDirs( - dir.getProtobuf(ClientVersion.CURRENT_VERSION)); - } - - return purgePathsRequest.build(); - } - - protected Optional prepareDeleteDirRequest( - OmKeyInfo pendingDeletedDirInfo, String delDirName, boolean purgeDir, - List> subDirList, - KeyManager keyManager, - CheckedFunction, Boolean, IOException> reclaimableFileFilter, - long remainingBufLimit) throws IOException { - // step-0: Get one pending deleted directory - if (LOG.isDebugEnabled()) { - LOG.debug("Pending deleted dir name: {}", - pendingDeletedDirInfo.getKeyName()); - } - - final String[] keys = delDirName.split(OM_KEY_PREFIX); - final long volumeId = Long.parseLong(keys[1]); - final long bucketId = Long.parseLong(keys[2]); - - // step-1: get all sub directories under the deletedDir - DeleteKeysResult subDirDeleteResult = - keyManager.getPendingDeletionSubDirs(volumeId, bucketId, - pendingDeletedDirInfo, keyInfo -> true, remainingBufLimit); - List subDirs = subDirDeleteResult.getKeysToDelete(); - remainingBufLimit -= subDirDeleteResult.getConsumedSize(); - - OMMetadataManager omMetadataManager = keyManager.getMetadataManager(); - for (OmKeyInfo dirInfo : subDirs) { - String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId, - bucketId, dirInfo.getParentObjectID(), dirInfo.getFileName()); - String ozoneDeleteKey = omMetadataManager.getOzoneDeletePathKey( - dirInfo.getObjectID(), ozoneDbKey); - subDirList.add(Pair.of(ozoneDeleteKey, dirInfo)); - LOG.debug("Moved sub dir name: {}", dirInfo.getKeyName()); - } - - // step-2: get all sub files under the deletedDir - // Only remove sub files if the parent directory is going to be deleted or can be reclaimed. - DeleteKeysResult subFileDeleteResult = - keyManager.getPendingDeletionSubFiles(volumeId, bucketId, - pendingDeletedDirInfo, keyInfo -> purgeDir || reclaimableFileFilter.apply(keyInfo), remainingBufLimit); - List subFiles = subFileDeleteResult.getKeysToDelete(); - - if (LOG.isDebugEnabled()) { - for (OmKeyInfo fileInfo : subFiles) { - LOG.debug("Moved sub file name: {}", fileInfo.getKeyName()); - } - } - - // step-3: If both sub-dirs and sub-files are exhausted under a parent - // directory, only then delete the parent. - String purgeDeletedDir = purgeDir && subDirDeleteResult.isProcessedKeys() && - subFileDeleteResult.isProcessedKeys() ? delDirName : null; - if (purgeDeletedDir == null && subFiles.isEmpty() && subDirs.isEmpty()) { - return Optional.empty(); - } - return Optional.of(wrapPurgeRequest(volumeId, bucketId, - purgeDeletedDir, subFiles, subDirs)); - } - - @SuppressWarnings("checkstyle:ParameterNumber") - public void optimizeDirDeletesAndSubmitRequest( - long dirNum, long subDirNum, long subFileNum, - List> allSubDirList, - List purgePathRequestList, - String snapTableKey, long startTime, - long remainingBufLimit, KeyManager keyManager, - CheckedFunction, Boolean, IOException> reclaimableDirChecker, - CheckedFunction, Boolean, IOException> reclaimableFileChecker, - UUID expectedPreviousSnapshotId, long rnCnt) { - - // Optimization to handle delete sub-dir and keys to remove quickly - // This case will be useful to handle when depth of directory is high - int subdirDelNum = 0; - int subDirRecursiveCnt = 0; - int consumedSize = 0; - while (subDirRecursiveCnt < allSubDirList.size() && remainingBufLimit > 0) { - try { - Pair stringOmKeyInfoPair = allSubDirList.get(subDirRecursiveCnt++); - Boolean subDirectoryReclaimable = reclaimableDirChecker.apply(Table.newKeyValue(stringOmKeyInfoPair.getKey(), - stringOmKeyInfoPair.getValue())); - Optional request = prepareDeleteDirRequest( - stringOmKeyInfoPair.getValue(), stringOmKeyInfoPair.getKey(), subDirectoryReclaimable, allSubDirList, - keyManager, reclaimableFileChecker, remainingBufLimit); - if (!request.isPresent()) { - continue; - } - PurgePathRequest requestVal = request.get(); - consumedSize += requestVal.getSerializedSize(); - remainingBufLimit -= consumedSize; - purgePathRequestList.add(requestVal); - // Count up the purgeDeletedDir, subDirs and subFiles - if (requestVal.hasDeletedDir() && !StringUtils.isBlank(requestVal.getDeletedDir())) { - subdirDelNum++; - } - subDirNum += requestVal.getMarkDeletedSubDirsCount(); - subFileNum += requestVal.getDeletedSubFilesCount(); - } catch (IOException e) { - LOG.error("Error while running delete directories and files " + - "background task. Will retry at next run for subset.", e); - break; - } - } - if (!purgePathRequestList.isEmpty()) { - submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId); - } - - if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) { - long subdirMoved = subDirNum - subdirDelNum; - deletedDirsCount.addAndGet(dirNum + subdirDelNum); - movedDirsCount.addAndGet(subdirMoved); - movedFilesCount.addAndGet(subFileNum); - long timeTakenInIteration = Time.monotonicNow() - startTime; - LOG.info("Number of dirs deleted: {}, Number of sub-dir " + - "deleted: {}, Number of sub-files moved:" + - " {} to DeletedTable, Number of sub-dirs moved {} to " + - "DeletedDirectoryTable, iteration elapsed: {}ms, " + - " totalRunCount: {}", - dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum), - timeTakenInIteration, rnCnt); - metrics.incrementDirectoryDeletionTotalMetrics(dirNum + subdirDelNum, subDirNum, subFileNum); - perfMetrics.setDirectoryDeletingServiceLatencyMs(timeTakenInIteration); + final boolean shouldRun() { + if (getOzoneManager() == null) { + // OzoneManager can be null for testing + return true; } + return !suspended.get() && getOzoneManager().isLeaderReady(); } /** - * To calculate Exclusive Size for current snapshot, Check - * the next snapshot deletedTable if the deleted key is - * referenced in current snapshot and not referenced in the - * previous snapshot then that key is exclusive to the current - * snapshot. Here since we are only iterating through - * deletedTable we can check the previous and previous to - * previous snapshot to achieve the same. - * previousSnapshot - Snapshot for which exclusive size is - * getting calculating. - * currSnapshot - Snapshot's deletedTable is used to calculate - * previousSnapshot snapshot's exclusive size. - * previousToPrevSnapshot - Snapshot which is used to check - * if key is exclusive to previousSnapshot. + * Suspend the service. */ - @SuppressWarnings("checkstyle:ParameterNumber") - public void calculateExclusiveSize( - SnapshotInfo previousSnapshot, - SnapshotInfo previousToPrevSnapshot, - OmKeyInfo keyInfo, - OmBucketInfo bucketInfo, long volumeId, - Table snapRenamedTable, - Table previousKeyTable, - Table prevRenamedTable, - Table previousToPrevKeyTable, - Map exclusiveSizeMap, - Map exclusiveReplicatedSizeMap) throws IOException { - String prevSnapKey = previousSnapshot.getTableKey(); - long exclusiveReplicatedSize = - exclusiveReplicatedSizeMap.getOrDefault( - prevSnapKey, 0L) + keyInfo.getReplicatedSize(); - long exclusiveSize = exclusiveSizeMap.getOrDefault( - prevSnapKey, 0L) + keyInfo.getDataSize(); - - // If there is no previous to previous snapshot, then - // the previous snapshot is the first snapshot. - if (previousToPrevSnapshot == null) { - exclusiveSizeMap.put(prevSnapKey, exclusiveSize); - exclusiveReplicatedSizeMap.put(prevSnapKey, - exclusiveReplicatedSize); - } else { - OmKeyInfo keyInfoPrevSnapshot = getPreviousSnapshotKeyName( - keyInfo, bucketInfo, volumeId, - snapRenamedTable, previousKeyTable); - OmKeyInfo keyInfoPrevToPrevSnapshot = getPreviousSnapshotKeyName( - keyInfoPrevSnapshot, bucketInfo, volumeId, - prevRenamedTable, previousToPrevKeyTable); - // If the previous to previous snapshot doesn't - // have the key, then it is exclusive size for the - // previous snapshot. - if (keyInfoPrevToPrevSnapshot == null) { - exclusiveSizeMap.put(prevSnapKey, exclusiveSize); - exclusiveReplicatedSizeMap.put(prevSnapKey, - exclusiveReplicatedSize); - } - } + @VisibleForTesting + public void suspend() throws ExecutionException, InterruptedException { + suspended.set(true); + getFuture().get(); } - private OmKeyInfo getPreviousSnapshotKeyName( - OmKeyInfo keyInfo, OmBucketInfo bucketInfo, long volumeId, - Table snapRenamedTable, - Table previousKeyTable) throws IOException { - - if (keyInfo == null) { - return null; - } - - String dbKeyPrevSnap; - if (bucketInfo.getBucketLayout().isFileSystemOptimized()) { - dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzonePathKey( - volumeId, - bucketInfo.getObjectID(), - keyInfo.getParentObjectID(), - keyInfo.getFileName()); - } else { - dbKeyPrevSnap = getOzoneManager().getMetadataManager().getOzoneKey( - keyInfo.getVolumeName(), - keyInfo.getBucketName(), - keyInfo.getKeyName()); - } - - String dbRenameKey = getOzoneManager().getMetadataManager().getRenameKey( - keyInfo.getVolumeName(), - keyInfo.getBucketName(), - keyInfo.getObjectID()); - - String renamedKey = snapRenamedTable.getIfExist(dbRenameKey); - OmKeyInfo prevKeyInfo = renamedKey != null ? - previousKeyTable.get(renamedKey) : - previousKeyTable.get(dbKeyPrevSnap); - - if (prevKeyInfo == null || - prevKeyInfo.getObjectID() != keyInfo.getObjectID()) { - return null; - } - - return isBlockLocationInfoSame(prevKeyInfo, keyInfo) ? - prevKeyInfo : null; + /** + * Resume the service if suspended. + */ + @VisibleForTesting + public void resume() { + suspended.set(false); } protected boolean isBufferLimitCrossed( @@ -536,78 +99,20 @@ protected boolean isBufferLimitCrossed( return cLimit + increment >= maxLimit; } - protected boolean isKeyReclaimable( - Table previousKeyTable, - Table renamedTable, - OmKeyInfo deletedKeyInfo, OmBucketInfo bucketInfo, - long volumeId, HddsProtos.KeyValue.Builder renamedKeyBuilder) - throws IOException { - - String dbKey; - // Handle case when the deleted snapshot is the first snapshot. - if (previousKeyTable == null) { - return true; - } - - // These are uncommitted blocks wrapped into a pseudo KeyInfo - if (deletedKeyInfo.getObjectID() == OBJECT_ID_RECLAIM_BLOCKS) { - return true; - } - - // Construct keyTable or fileTable DB key depending on the bucket type - if (bucketInfo.getBucketLayout().isFileSystemOptimized()) { - dbKey = ozoneManager.getMetadataManager().getOzonePathKey( - volumeId, - bucketInfo.getObjectID(), - deletedKeyInfo.getParentObjectID(), - deletedKeyInfo.getFileName()); - } else { - dbKey = ozoneManager.getMetadataManager().getOzoneKey( - deletedKeyInfo.getVolumeName(), - deletedKeyInfo.getBucketName(), - deletedKeyInfo.getKeyName()); - } - - /* - snapshotRenamedTable: - 1) /volumeName/bucketName/objectID -> - /volumeId/bucketId/parentId/fileName (FSO) - 2) /volumeName/bucketName/objectID -> - /volumeName/bucketName/keyName (non-FSO) - */ - String dbRenameKey = ozoneManager.getMetadataManager().getRenameKey( - deletedKeyInfo.getVolumeName(), deletedKeyInfo.getBucketName(), - deletedKeyInfo.getObjectID()); - - // Condition: key should not exist in snapshotRenamedTable - // of the current snapshot and keyTable of the previous snapshot. - // Check key exists in renamedTable of the Snapshot - String renamedKey = renamedTable.getIfExist(dbRenameKey); - - if (renamedKey != null && renamedKeyBuilder != null) { - renamedKeyBuilder.setKey(dbRenameKey).setValue(renamedKey); - } - // previousKeyTable is fileTable if the bucket is FSO, - // otherwise it is the keyTable. - OmKeyInfo prevKeyInfo = renamedKey != null ? previousKeyTable - .get(renamedKey) : previousKeyTable.get(dbKey); - - if (prevKeyInfo == null || - prevKeyInfo.getObjectID() != deletedKeyInfo.getObjectID()) { - return true; - } + public OzoneManager getOzoneManager() { + return ozoneManager; + } - // For key overwrite the objectID will remain the same, In this - // case we need to check if OmKeyLocationInfo is also same. - return !isBlockLocationInfoSame(prevKeyInfo, deletedKeyInfo); + ClientId getClientId() { + return clientId; } - public OzoneManager getOzoneManager() { - return ozoneManager; + DeletingServiceMetrics getMetrics() { + return metrics; } - public ScmBlockLocationProtocol getScmClient() { - return scmClient; + OMPerformanceMetrics getPerfMetrics() { + return perfMetrics; } /** @@ -624,37 +129,6 @@ public AtomicLong getCallId() { return callId; } - /** - * Returns the number of dirs deleted by the background service. - * - * @return Long count. - */ - @VisibleForTesting - public long getDeletedDirsCount() { - return deletedDirsCount.get(); - } - - /** - * Returns the number of sub-dirs deleted by the background service. - * - * @return Long count. - */ - @VisibleForTesting - public long getMovedDirsCount() { - return movedDirsCount.get(); - } - - /** - * Returns the number of files moved to DeletedTable by the background - * service. - * - * @return Long count. - */ - @VisibleForTesting - public long getMovedFilesCount() { - return movedFilesCount.get(); - } - @Override public BootstrapStateHandler.Lock getBootstrapStateLock() { return lock; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 7edbe7761175..f96099323d54 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -17,11 +17,13 @@ package org.apache.hadoop.ozone.om.service; +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import com.google.protobuf.ServiceException; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -38,10 +40,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.ReconfigurationHandler; import org.apache.hadoop.hdds.conf.StorageUnit; @@ -52,8 +56,12 @@ import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.Table.KeyValue; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.ClientVersion; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; +import org.apache.hadoop.ozone.om.DeleteKeysResult; import org.apache.hadoop.ozone.om.KeyManager; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; import org.apache.hadoop.ozone.om.OmSnapshotManager; @@ -68,6 +76,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest; import org.apache.hadoop.util.Time; +import org.apache.ratis.util.function.CheckedFunction; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,18 +149,20 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { // from parent directory info from deleted directory table concurrently // and send deletion requests. private int ratisByteLimit; - private final AtomicBoolean suspended; private final AtomicBoolean isRunningOnAOS; private final SnapshotChainManager snapshotChainManager; private final boolean deepCleanSnapshots; private final ExecutorService deletionThreadPool; private final int numberOfParallelThreadsPerStore; + private final AtomicLong deletedDirsCount; + private final AtomicLong movedDirsCount; + private final AtomicLong movedFilesCount; public DirectoryDeletingService(long interval, TimeUnit unit, long serviceTimeout, OzoneManager ozoneManager, OzoneConfiguration configuration, int dirDeletingServiceCorePoolSize, boolean deepCleanSnapshots) { super(DirectoryDeletingService.class.getSimpleName(), interval, unit, - dirDeletingServiceCorePoolSize, serviceTimeout, ozoneManager, null); + dirDeletingServiceCorePoolSize, serviceTimeout, ozoneManager); int limit = (int) configuration.getStorageSize( OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, @@ -162,11 +173,13 @@ public DirectoryDeletingService(long interval, TimeUnit unit, // always go to 90% of max limit for request as other header will be added this.ratisByteLimit = (int) (limit * 0.9); - this.suspended = new AtomicBoolean(false); this.isRunningOnAOS = new AtomicBoolean(false); registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(), configuration); this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager(); this.deepCleanSnapshots = deepCleanSnapshots; + this.deletedDirsCount = new AtomicLong(0); + this.movedDirsCount = new AtomicLong(0); + this.movedFilesCount = new AtomicLong(0); } public void registerReconfigCallbacks(ReconfigurationHandler handler, OzoneConfiguration conf) { @@ -187,38 +200,10 @@ private synchronized void updateAndRestart(OzoneConfiguration conf) { start(); } - private boolean shouldRun() { - if (getOzoneManager() == null) { - // OzoneManager can be null for testing - return true; - } - return getOzoneManager().isLeaderReady() && !suspended.get(); - } - public boolean isRunningOnAOS() { return isRunningOnAOS.get(); } - /** - * Suspend the service. - */ - @VisibleForTesting - public void suspend() { - suspended.set(true); - } - - /** - * Resume the service if suspended. - */ - @VisibleForTesting - public void resume() { - suspended.set(false); - } - - public void setRatisByteLimit(int ratisByteLimit) { - this.ratisByteLimit = ratisByteLimit; - } - @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); @@ -244,6 +229,71 @@ public void shutdown() { super.shutdown(); } + @SuppressWarnings("checkstyle:ParameterNumber") + void optimizeDirDeletesAndSubmitRequest( + long dirNum, long subDirNum, long subFileNum, + List> allSubDirList, + List purgePathRequestList, + String snapTableKey, long startTime, + long remainingBufLimit, KeyManager keyManager, + CheckedFunction, Boolean, IOException> reclaimableDirChecker, + CheckedFunction, Boolean, IOException> reclaimableFileChecker, + UUID expectedPreviousSnapshotId, long rnCnt) { + + // Optimization to handle delete sub-dir and keys to remove quickly + // This case will be useful to handle when depth of directory is high + int subdirDelNum = 0; + int subDirRecursiveCnt = 0; + int consumedSize = 0; + while (subDirRecursiveCnt < allSubDirList.size() && remainingBufLimit > 0) { + try { + Pair stringOmKeyInfoPair = allSubDirList.get(subDirRecursiveCnt++); + Boolean subDirectoryReclaimable = reclaimableDirChecker.apply(Table.newKeyValue(stringOmKeyInfoPair.getKey(), + stringOmKeyInfoPair.getValue())); + Optional request = prepareDeleteDirRequest( + stringOmKeyInfoPair.getValue(), stringOmKeyInfoPair.getKey(), subDirectoryReclaimable, allSubDirList, + keyManager, reclaimableFileChecker, remainingBufLimit); + if (!request.isPresent()) { + continue; + } + PurgePathRequest requestVal = request.get(); + consumedSize += requestVal.getSerializedSize(); + remainingBufLimit -= consumedSize; + purgePathRequestList.add(requestVal); + // Count up the purgeDeletedDir, subDirs and subFiles + if (requestVal.hasDeletedDir() && !StringUtils.isBlank(requestVal.getDeletedDir())) { + subdirDelNum++; + } + subDirNum += requestVal.getMarkDeletedSubDirsCount(); + subFileNum += requestVal.getDeletedSubFilesCount(); + } catch (IOException e) { + LOG.error("Error while running delete directories and files " + + "background task. Will retry at next run for subset.", e); + break; + } + } + if (!purgePathRequestList.isEmpty()) { + submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId); + } + + if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) { + long subdirMoved = subDirNum - subdirDelNum; + deletedDirsCount.addAndGet(dirNum + subdirDelNum); + movedDirsCount.addAndGet(subdirMoved); + movedFilesCount.addAndGet(subFileNum); + long timeTakenInIteration = Time.monotonicNow() - startTime; + LOG.info("Number of dirs deleted: {}, Number of sub-dir " + + "deleted: {}, Number of sub-files moved:" + + " {} to DeletedTable, Number of sub-dirs moved {} to " + + "DeletedDirectoryTable, iteration elapsed: {}ms, " + + " totalRunCount: {}", + dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum), + timeTakenInIteration, rnCnt); + getMetrics().incrementDirectoryDeletionTotalMetrics(dirNum + subdirDelNum, subDirNum, subFileNum); + getPerfMetrics().setDirectoryDeletingServiceLatencyMs(timeTakenInIteration); + } + } + private static final class DeletedDirSupplier implements Closeable { private TableIterator> deleteTableIterator; @@ -265,7 +315,159 @@ public void close() { } } - private final class DirDeletingTask implements BackgroundTask { + /** + * Returns the number of dirs deleted by the background service. + * + * @return Long count. + */ + @VisibleForTesting + public long getDeletedDirsCount() { + return deletedDirsCount.get(); + } + + /** + * Returns the number of sub-dirs deleted by the background service. + * + * @return Long count. + */ + @VisibleForTesting + public long getMovedDirsCount() { + return movedDirsCount.get(); + } + + /** + * Returns the number of files moved to DeletedTable by the background + * service. + * + * @return Long count. + */ + @VisibleForTesting + public long getMovedFilesCount() { + return movedFilesCount.get(); + } + + private Optional prepareDeleteDirRequest( + OmKeyInfo pendingDeletedDirInfo, String delDirName, boolean purgeDir, + List> subDirList, + KeyManager keyManager, + CheckedFunction, Boolean, IOException> reclaimableFileFilter, + long remainingBufLimit) throws IOException { + // step-0: Get one pending deleted directory + if (LOG.isDebugEnabled()) { + LOG.debug("Pending deleted dir name: {}", + pendingDeletedDirInfo.getKeyName()); + } + + final String[] keys = delDirName.split(OM_KEY_PREFIX); + final long volumeId = Long.parseLong(keys[1]); + final long bucketId = Long.parseLong(keys[2]); + + // step-1: get all sub directories under the deletedDir + DeleteKeysResult subDirDeleteResult = + keyManager.getPendingDeletionSubDirs(volumeId, bucketId, + pendingDeletedDirInfo, keyInfo -> true, remainingBufLimit); + List subDirs = subDirDeleteResult.getKeysToDelete(); + remainingBufLimit -= subDirDeleteResult.getConsumedSize(); + + OMMetadataManager omMetadataManager = keyManager.getMetadataManager(); + for (OmKeyInfo dirInfo : subDirs) { + String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId, + bucketId, dirInfo.getParentObjectID(), dirInfo.getFileName()); + String ozoneDeleteKey = omMetadataManager.getOzoneDeletePathKey( + dirInfo.getObjectID(), ozoneDbKey); + subDirList.add(Pair.of(ozoneDeleteKey, dirInfo)); + LOG.debug("Moved sub dir name: {}", dirInfo.getKeyName()); + } + + // step-2: get all sub files under the deletedDir + // Only remove sub files if the parent directory is going to be deleted or can be reclaimed. + DeleteKeysResult subFileDeleteResult = + keyManager.getPendingDeletionSubFiles(volumeId, bucketId, + pendingDeletedDirInfo, keyInfo -> purgeDir || reclaimableFileFilter.apply(keyInfo), remainingBufLimit); + List subFiles = subFileDeleteResult.getKeysToDelete(); + + if (LOG.isDebugEnabled()) { + for (OmKeyInfo fileInfo : subFiles) { + LOG.debug("Moved sub file name: {}", fileInfo.getKeyName()); + } + } + + // step-3: If both sub-dirs and sub-files are exhausted under a parent + // directory, only then delete the parent. + String purgeDeletedDir = purgeDir && subDirDeleteResult.isProcessedKeys() && + subFileDeleteResult.isProcessedKeys() ? delDirName : null; + if (purgeDeletedDir == null && subFiles.isEmpty() && subDirs.isEmpty()) { + return Optional.empty(); + } + return Optional.of(wrapPurgeRequest(volumeId, bucketId, + purgeDeletedDir, subFiles, subDirs)); + } + + private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest( + final long volumeId, + final long bucketId, + final String purgeDeletedDir, + final List purgeDeletedFiles, + final List markDirsAsDeleted) { + // Put all keys to be purged in a list + PurgePathRequest.Builder purgePathsRequest = PurgePathRequest.newBuilder(); + purgePathsRequest.setVolumeId(volumeId); + purgePathsRequest.setBucketId(bucketId); + + if (purgeDeletedDir != null) { + purgePathsRequest.setDeletedDir(purgeDeletedDir); + } + + for (OmKeyInfo purgeFile : purgeDeletedFiles) { + purgePathsRequest.addDeletedSubFiles( + purgeFile.getProtobuf(true, ClientVersion.CURRENT_VERSION)); + } + + // Add these directories to deletedDirTable, so that its sub-paths will be + // traversed in next iteration to ensure cleanup all sub-children. + for (OmKeyInfo dir : markDirsAsDeleted) { + purgePathsRequest.addMarkDeletedSubDirs( + dir.getProtobuf(ClientVersion.CURRENT_VERSION)); + } + + return purgePathsRequest.build(); + } + + private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List requests, + String snapTableKey, UUID expectedPreviousSnapshotId) { + OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest = + OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder(); + + if (snapTableKey != null) { + purgeDirRequest.setSnapshotTableKey(snapTableKey); + } + OzoneManagerProtocolProtos.NullableUUID.Builder expectedPreviousSnapshotNullableUUID = + OzoneManagerProtocolProtos.NullableUUID.newBuilder(); + if (expectedPreviousSnapshotId != null) { + expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId)); + } + purgeDirRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build()); + + purgeDirRequest.addAllDeletedPath(requests); + + OzoneManagerProtocolProtos.OMRequest omRequest = + OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories) + .setPurgeDirectoriesRequest(purgeDirRequest) + .setClientId(getClientId().toString()) + .build(); + + // Submit Purge paths request to OM. Acquire bootstrap lock when processing deletes for snapshots. + try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { + return submitRequest(omRequest); + } catch (ServiceException | InterruptedException e) { + LOG.error("PurgePaths request failed. Will retry at next run.", e); + } + return null; + } + + @VisibleForTesting + final class DirDeletingTask implements BackgroundTask { private final DirectoryDeletingService directoryDeletingService; private final UUID snapshotId; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 5e34c1ff741a..c7078758143b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -22,13 +22,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.protobuf.ServiceException; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,28 +39,36 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.utils.BackgroundTask; import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; import org.apache.hadoop.hdds.utils.BackgroundTaskResult; import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.om.DeletingServiceMetrics; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.KeyManager; +import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshot; import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.PendingKeysDeletion; import org.apache.hadoop.ozone.om.SnapshotChainManager; +import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter; import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableRenameEntryFilter; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.NullableUUID; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest; +import org.apache.hadoop.util.Time; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,14 +82,13 @@ public class KeyDeletingService extends AbstractKeyDeletingService { private static final Logger LOG = LoggerFactory.getLogger(KeyDeletingService.class); + private final ScmBlockLocationProtocol scmClient; private int keyLimitPerTask; private final AtomicLong deletedKeyCount; - private final AtomicBoolean suspended; private final AtomicBoolean isRunningOnAOS; private final boolean deepCleanSnapshots; private final SnapshotChainManager snapshotChainManager; - private DeletingServiceMetrics metrics; public KeyDeletingService(OzoneManager ozoneManager, ScmBlockLocationProtocol scmClient, long serviceInterval, @@ -86,17 +96,16 @@ public KeyDeletingService(OzoneManager ozoneManager, boolean deepCleanSnapshots) { super(KeyDeletingService.class.getSimpleName(), serviceInterval, TimeUnit.MILLISECONDS, keyDeletionCorePoolSize, - serviceTimeout, ozoneManager, scmClient); + serviceTimeout, ozoneManager); this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); Preconditions.checkArgument(keyLimitPerTask >= 0, OZONE_KEY_DELETING_LIMIT_PER_TASK + " cannot be negative."); this.deletedKeyCount = new AtomicLong(0); - this.suspended = new AtomicBoolean(false); this.isRunningOnAOS = new AtomicBoolean(false); this.deepCleanSnapshots = deepCleanSnapshots; this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager(); - this.metrics = ozoneManager.getDeletionMetrics(); + this.scmClient = scmClient; } /** @@ -113,6 +122,141 @@ public boolean isRunningOnAOS() { return isRunningOnAOS.get(); } + Pair processKeyDeletes(List keyBlocksList, + Map keysToModify, List renameEntries, + String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException { + long startTime = Time.monotonicNow(); + Pair purgeResult = Pair.of(0, false); + if (LOG.isDebugEnabled()) { + LOG.debug("Send {} key(s) to SCM: {}", + keyBlocksList.size(), keyBlocksList); + } else if (LOG.isInfoEnabled()) { + int logSize = 10; + if (keyBlocksList.size() < logSize) { + logSize = keyBlocksList.size(); + } + LOG.info("Send {} key(s) to SCM, first {} keys: {}", + keyBlocksList.size(), logSize, keyBlocksList.subList(0, logSize)); + } + List blockDeletionResults = + scmClient.deleteKeyBlocks(keyBlocksList); + LOG.info("{} BlockGroup deletion are acked by SCM in {} ms", + keyBlocksList.size(), Time.monotonicNow() - startTime); + if (blockDeletionResults != null) { + long purgeStartTime = Time.monotonicNow(); + purgeResult = submitPurgeKeysRequest(blockDeletionResults, + keysToModify, renameEntries, snapTableKey, expectedPreviousSnapshotId); + int limit = getOzoneManager().getConfiguration().getInt(OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK, + OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); + LOG.info("Blocks for {} (out of {}) keys are deleted from DB in {} ms. Limit per task is {}.", + purgeResult, blockDeletionResults.size(), Time.monotonicNow() - purgeStartTime, limit); + } + getPerfMetrics().setKeyDeletingServiceLatencyMs(Time.monotonicNow() - startTime); + return purgeResult; + } + + /** + * Submits PurgeKeys request for the keys whose blocks have been deleted + * by SCM. + * @param results DeleteBlockGroups returned by SCM. + * @param keysToModify Updated list of RepeatedOmKeyInfo + */ + private Pair submitPurgeKeysRequest(List results, + Map keysToModify, List renameEntriesToBeDeleted, + String snapTableKey, UUID expectedPreviousSnapshotId) { + List purgeKeys = new ArrayList<>(); + + // Put all keys to be purged in a list + int deletedCount = 0; + Set failedDeletedKeys = new HashSet<>(); + boolean purgeSuccess = true; + for (DeleteBlockGroupResult result : results) { + String deletedKey = result.getObjectKey(); + if (result.isSuccess()) { + // Add key to PurgeKeys list. + if (keysToModify != null && !keysToModify.containsKey(deletedKey)) { + // Parse Volume and BucketName + purgeKeys.add(deletedKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Key {} set to be updated in OM DB, Other versions " + + "of the key that are reclaimable are reclaimed.", deletedKey); + } + } else if (keysToModify == null) { + purgeKeys.add(deletedKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Key {} set to be purged from OM DB", deletedKey); + } + } + deletedCount++; + } else { + // If the block deletion failed, then the deleted keys should also not be modified. + failedDeletedKeys.add(deletedKey); + purgeSuccess = false; + } + } + + PurgeKeysRequest.Builder purgeKeysRequest = PurgeKeysRequest.newBuilder(); + if (snapTableKey != null) { + purgeKeysRequest.setSnapshotTableKey(snapTableKey); + } + NullableUUID.Builder expectedPreviousSnapshotNullableUUID = NullableUUID.newBuilder(); + if (expectedPreviousSnapshotId != null) { + expectedPreviousSnapshotNullableUUID.setUuid(HddsUtils.toProtobuf(expectedPreviousSnapshotId)); + } + purgeKeysRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build()); + OzoneManagerProtocolProtos.DeletedKeys deletedKeys = OzoneManagerProtocolProtos.DeletedKeys.newBuilder() + .setVolumeName("") + .setBucketName("") + .addAllKeys(purgeKeys) + .build(); + purgeKeysRequest.addDeletedKeys(deletedKeys); + // Adding rename entries to be purged. + if (renameEntriesToBeDeleted != null) { + purgeKeysRequest.addAllRenamedKeys(renameEntriesToBeDeleted); + } + List keysToUpdateList = new ArrayList<>(); + if (keysToModify != null) { + for (Map.Entry keyToModify : + keysToModify.entrySet()) { + if (failedDeletedKeys.contains(keyToModify.getKey())) { + continue; + } + OzoneManagerProtocolProtos.SnapshotMoveKeyInfos.Builder keyToUpdate = + OzoneManagerProtocolProtos.SnapshotMoveKeyInfos.newBuilder(); + keyToUpdate.setKey(keyToModify.getKey()); + List keyInfos = + keyToModify.getValue().getOmKeyInfoList().stream() + .map(k -> k.getProtobuf(ClientVersion.CURRENT_VERSION)) + .collect(Collectors.toList()); + keyToUpdate.addAllKeyInfos(keyInfos); + keysToUpdateList.add(keyToUpdate.build()); + } + + if (!keysToUpdateList.isEmpty()) { + purgeKeysRequest.addAllKeysToUpdate(keysToUpdateList); + } + } + + OzoneManagerProtocolProtos.OMRequest omRequest = OzoneManagerProtocolProtos.OMRequest.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.PurgeKeys) + .setPurgeKeysRequest(purgeKeysRequest) + .setClientId(getClientId().toString()) + .build(); + + // Submit PurgeKeys request to OM. Acquire bootstrap lock when processing deletes for snapshots. + try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { + OzoneManagerProtocolProtos.OMResponse omResponse = submitRequest(omRequest); + if (omResponse != null) { + purgeSuccess = purgeSuccess && omResponse.getSuccess(); + } + } catch (ServiceException | InterruptedException e) { + LOG.error("PurgeKey request failed. Will retry at next run.", e); + return Pair.of(0, false); + } + + return Pair.of(deletedCount, purgeSuccess); + } + @Override public BackgroundTaskQueue getTasks() { BackgroundTaskQueue queue = new BackgroundTaskQueue(); @@ -133,30 +277,6 @@ public BackgroundTaskQueue getTasks() { return queue; } - private boolean shouldRun() { - if (getOzoneManager() == null) { - // OzoneManager can be null for testing - return true; - } - return !suspended.get() && getOzoneManager().isLeaderReady(); - } - - /** - * Suspend the service. - */ - @VisibleForTesting - public void suspend() { - suspended.set(true); - } - - /** - * Resume the service if suspended. - */ - @VisibleForTesting - public void resume() { - suspended.set(false); - } - public int getKeyLimitPerTask() { return keyLimitPerTask; } @@ -261,8 +381,8 @@ private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyMan renamedTableEntries, snapshotTableKey, expectedPreviousSnapshotId); remainNum -= purgeResult.getKey(); successStatus = purgeResult.getValue(); - metrics.incrNumKeysProcessed(keyBlocksList.size()); - metrics.incrNumKeysSentForPurge(purgeResult.getKey()); + getMetrics().incrNumKeysProcessed(keyBlocksList.size()); + getMetrics().incrNumKeysSentForPurge(purgeResult.getKey()); if (successStatus) { deletedKeyCount.addAndGet(purgeResult.getKey()); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 171cb2bb02e7..96ae98a19b6b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -32,7 +32,6 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -82,7 +81,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService { private final OzoneManager ozoneManager; private final OmSnapshotManager omSnapshotManager; private final SnapshotChainManager chainManager; - private final AtomicBoolean suspended; private final OzoneConfiguration conf; private final AtomicLong successRunCount; private final int keyLimitPerTask; @@ -95,14 +93,13 @@ public SnapshotDeletingService(long interval, long serviceTimeout, throws IOException { super(SnapshotDeletingService.class.getSimpleName(), interval, TimeUnit.MILLISECONDS, SNAPSHOT_DELETING_CORE_POOL_SIZE, - serviceTimeout, ozoneManager, null); + serviceTimeout, ozoneManager); this.ozoneManager = ozoneManager; this.omSnapshotManager = ozoneManager.getOmSnapshotManager(); OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) ozoneManager.getMetadataManager(); this.chainManager = omMetadataManager.getSnapshotChainManager(); this.successRunCount = new AtomicLong(0); - this.suspended = new AtomicBoolean(false); this.conf = ozoneManager.getConfiguration(); this.snapshotDeletionPerTask = conf.getInt(SNAPSHOT_DELETING_LIMIT_PER_TASK, SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT); @@ -333,26 +330,6 @@ public BackgroundTaskQueue getTasks() { return queue; } - private boolean shouldRun() { - return !suspended.get() && ozoneManager.isLeaderReady(); - } - - /** - * Suspend the service. - */ - @VisibleForTesting - public void suspend() { - suspended.set(true); - } - - /** - * Resume the service if suspended. - */ - @VisibleForTesting - public void resume() { - suspended.set(false); - } - public long getSuccessfulRunCount() { return successRunCount.get(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java deleted file mode 100644 index a14003c2245b..000000000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java +++ /dev/null @@ -1,484 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.om.service; - -import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE; -import static org.apache.hadoop.ozone.om.request.file.OMFileRequest.getDirectoryInfo; -import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getPreviousSnapshot; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ServiceException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.hdds.utils.BackgroundTask; -import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; -import org.apache.hadoop.hdds.utils.BackgroundTaskResult; -import org.apache.hadoop.hdds.utils.IOUtils; -import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.om.OMMetadataManager; -import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; -import org.apache.hadoop.ozone.om.OmSnapshot; -import org.apache.hadoop.ozone.om.OmSnapshotManager; -import org.apache.hadoop.ozone.om.OzoneManager; -import org.apache.hadoop.ozone.om.SnapshotChainManager; -import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; -import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; -import org.apache.hadoop.ozone.om.request.file.OMFileRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; - -/** - * Snapshot BG Service for deleted directory deep clean and exclusive size - * calculation for deleted directories. - */ -public class SnapshotDirectoryCleaningService - extends AbstractKeyDeletingService { - // Use only a single thread for DirDeletion. Multiple threads would read - // or write to same tables and can send deletion requests for same key - // multiple times. - private static final int SNAPSHOT_DIR_CORE_POOL_SIZE = 1; - - private final AtomicBoolean suspended; - private final Map exclusiveSizeMap; - private final Map exclusiveReplicatedSizeMap; - - public SnapshotDirectoryCleaningService(long interval, TimeUnit unit, - long serviceTimeout, - OzoneManager ozoneManager, - ScmBlockLocationProtocol scmClient) { - super(SnapshotDirectoryCleaningService.class.getSimpleName(), - interval, unit, SNAPSHOT_DIR_CORE_POOL_SIZE, serviceTimeout, - ozoneManager, scmClient); - this.suspended = new AtomicBoolean(false); - this.exclusiveSizeMap = new HashMap<>(); - this.exclusiveReplicatedSizeMap = new HashMap<>(); - } - - private boolean shouldRun() { - if (getOzoneManager() == null) { - // OzoneManager can be null for testing - return true; - } - return getOzoneManager().isLeaderReady() && !suspended.get(); - } - - /** - * Suspend the service. - */ - @VisibleForTesting - public void suspend() { - suspended.set(true); - } - - /** - * Resume the service if suspended. - */ - @VisibleForTesting - public void resume() { - suspended.set(false); - } - - @Override - public BackgroundTaskQueue getTasks() { - BackgroundTaskQueue queue = new BackgroundTaskQueue(); - queue.add(new SnapshotDirectoryCleaningService.SnapshotDirTask()); - return queue; - } - - private class SnapshotDirTask implements BackgroundTask { - - @Override - public BackgroundTaskResult call() { - if (!shouldRun()) { - return BackgroundTaskResult.EmptyTaskResult.newResult(); - } - LOG.debug("Running SnapshotDirectoryCleaningService"); - - getRunCount().incrementAndGet(); - OmSnapshotManager omSnapshotManager = - getOzoneManager().getOmSnapshotManager(); - Table snapshotInfoTable = - getOzoneManager().getMetadataManager().getSnapshotInfoTable(); - OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl) - getOzoneManager().getMetadataManager(); - SnapshotChainManager snapChainManager = metadataManager - .getSnapshotChainManager(); - - try (TableIterator> iterator = snapshotInfoTable.iterator()) { - - while (iterator.hasNext()) { - SnapshotInfo currSnapInfo = snapshotInfoTable.get(iterator.next().getKey()); - - // Expand deleted dirs only on active snapshot. Deleted Snapshots - // will be cleaned up by SnapshotDeletingService. - if (currSnapInfo == null || currSnapInfo.getSnapshotStatus() != SNAPSHOT_ACTIVE || - currSnapInfo.isDeepCleanedDeletedDir()) { - continue; - } - - UncheckedAutoCloseableSupplier rcPrevOmSnapshot = null; - UncheckedAutoCloseableSupplier rcPrevToPrevOmSnapshot = null; - try { - long volumeId = metadataManager - .getVolumeId(currSnapInfo.getVolumeName()); - // Get bucketInfo for the snapshot bucket to get bucket layout. - String dbBucketKey = metadataManager - .getBucketKey(currSnapInfo.getVolumeName(), - currSnapInfo.getBucketName()); - OmBucketInfo bucketInfo = metadataManager - .getBucketTable().get(dbBucketKey); - - if (bucketInfo == null) { - throw new IllegalStateException("Bucket " + "/" + - currSnapInfo.getVolumeName() + "/" + currSnapInfo - .getBucketName() + - " is not found. BucketInfo should not be " + - "null for snapshotted bucket. The OM is in " + - "unexpected state."); - } - - SnapshotInfo previousSnapshot = getPreviousSnapshot(getOzoneManager(), snapChainManager, currSnapInfo); - SnapshotInfo previousToPrevSnapshot = null; - - Table previousKeyTable = null; - Table prevRenamedTable = null; - - if (previousSnapshot != null) { - rcPrevOmSnapshot = omSnapshotManager.getActiveSnapshot( - previousSnapshot.getVolumeName(), - previousSnapshot.getBucketName(), - previousSnapshot.getName()); - OmSnapshot omPreviousSnapshot = rcPrevOmSnapshot.get(); - - previousKeyTable = omPreviousSnapshot.getMetadataManager() - .getKeyTable(bucketInfo.getBucketLayout()); - prevRenamedTable = omPreviousSnapshot - .getMetadataManager().getSnapshotRenamedTable(); - previousToPrevSnapshot = getPreviousSnapshot(getOzoneManager(), snapChainManager, previousSnapshot); - } - - Table previousToPrevKeyTable = null; - if (previousToPrevSnapshot != null) { - rcPrevToPrevOmSnapshot = omSnapshotManager.getActiveSnapshot( - previousToPrevSnapshot.getVolumeName(), - previousToPrevSnapshot.getBucketName(), - previousToPrevSnapshot.getName()); - OmSnapshot omPreviousToPrevSnapshot = rcPrevToPrevOmSnapshot.get(); - - previousToPrevKeyTable = omPreviousToPrevSnapshot - .getMetadataManager() - .getKeyTable(bucketInfo.getBucketLayout()); - } - - String dbBucketKeyForDir = metadataManager.getBucketKeyPrefixFSO( - currSnapInfo.getVolumeName(), currSnapInfo.getBucketName()); - try (UncheckedAutoCloseableSupplier - rcCurrOmSnapshot = omSnapshotManager.getActiveSnapshot( - currSnapInfo.getVolumeName(), - currSnapInfo.getBucketName(), - currSnapInfo.getName())) { - - OmSnapshot currOmSnapshot = rcCurrOmSnapshot.get(); - Table snapDeletedDirTable = - currOmSnapshot.getMetadataManager().getDeletedDirTable(); - - try (TableIterator> deletedDirIterator = snapDeletedDirTable - .iterator(dbBucketKeyForDir)) { - - while (deletedDirIterator.hasNext()) { - Table.KeyValue deletedDirInfo = - deletedDirIterator.next(); - - // For each deleted directory we do an in-memory DFS and - // do a deep clean and exclusive size calculation. - iterateDirectoryTree(deletedDirInfo, volumeId, bucketInfo, - previousSnapshot, previousToPrevSnapshot, - currOmSnapshot, previousKeyTable, prevRenamedTable, - previousToPrevKeyTable, dbBucketKeyForDir); - } - updateDeepCleanSnapshotDir(currSnapInfo.getTableKey()); - if (previousSnapshot != null) { - updateExclusiveSize(previousSnapshot.getTableKey()); - } - } - } - } finally { - IOUtils.closeQuietly(rcPrevOmSnapshot, rcPrevToPrevOmSnapshot); - } - } - } catch (IOException ex) { - LOG.error("Error while running directory deep clean on snapshots." + - " Will retry at next run.", ex); - } - return BackgroundTaskResult.EmptyTaskResult.newResult(); - } - } - - @SuppressWarnings("checkstyle:ParameterNumber") - private void iterateDirectoryTree( - Table.KeyValue deletedDirInfo, long volumeId, - OmBucketInfo bucketInfo, - SnapshotInfo previousSnapshot, - SnapshotInfo previousToPrevSnapshot, - OmSnapshot currOmSnapshot, - Table previousKeyTable, - Table prevRenamedTable, - Table previousToPrevKeyTable, - String dbBucketKeyForDir) throws IOException { - - Table snapDirTable = - currOmSnapshot.getMetadataManager().getDirectoryTable(); - Table snapRenamedTable = - currOmSnapshot.getMetadataManager().getSnapshotRenamedTable(); - - Stack stackNodes = new Stack<>(); - OmDirectoryInfo omDeletedDirectoryInfo = - getDirectoryInfo(deletedDirInfo.getValue()); - String dirPathDbKey = currOmSnapshot.getMetadataManager() - .getOzonePathKey(volumeId, bucketInfo.getObjectID(), - omDeletedDirectoryInfo); - // Stack Init - StackNode topLevelDir = new StackNode(); - topLevelDir.setDirKey(dirPathDbKey); - topLevelDir.setDirValue(omDeletedDirectoryInfo); - stackNodes.push(topLevelDir); - - try (TableIterator> - directoryIterator = snapDirTable.iterator(dbBucketKeyForDir)) { - - while (!stackNodes.isEmpty()) { - StackNode stackTop = stackNodes.peek(); - // First process all the files in the current directory - // and then do a DFS for directory. - if (StringUtils.isEmpty(stackTop.getSubDirSeek())) { - processFilesUnderDir(previousSnapshot, - previousToPrevSnapshot, - volumeId, - bucketInfo, - stackTop.getDirValue(), - currOmSnapshot.getMetadataManager(), - snapRenamedTable, - previousKeyTable, - prevRenamedTable, - previousToPrevKeyTable); - // Format : /volId/bucketId/parentId/ - String seekDirInDB = currOmSnapshot.getMetadataManager() - .getOzonePathKey(volumeId, bucketInfo.getObjectID(), - stackTop.getDirValue().getObjectID(), ""); - stackTop.setSubDirSeek(seekDirInDB); - } else { - // Adding \0 to seek the next greater element. - directoryIterator.seek(stackTop.getSubDirSeek() + "\0"); - if (directoryIterator.hasNext()) { - - Table.KeyValue deletedSubDirInfo = directoryIterator.next(); - String deletedSubDirKey = deletedSubDirInfo.getKey(); - String prefixCheck = currOmSnapshot.getMetadataManager() - .getOzoneDeletePathDirKey(stackTop.getSubDirSeek()); - // Exit if it is out of the sub dir prefix scope. - if (!deletedSubDirKey.startsWith(prefixCheck)) { - stackNodes.pop(); - } else { - stackTop.setSubDirSeek(deletedSubDirKey); - StackNode nextSubDir = new StackNode(); - nextSubDir.setDirKey(deletedSubDirInfo.getKey()); - nextSubDir.setDirValue(deletedSubDirInfo.getValue()); - stackNodes.push(nextSubDir); - } - } else { - stackNodes.pop(); - } - } - } - } - } - - private void updateExclusiveSize(String prevSnapshotKeyTable) throws IOException { - ClientId clientId = ClientId.randomId(); - SnapshotSize snapshotSize = SnapshotSize.newBuilder() - .setExclusiveSize( - exclusiveSizeMap.getOrDefault(prevSnapshotKeyTable, 0L)) - .setExclusiveReplicatedSize( - exclusiveReplicatedSizeMap.getOrDefault( - prevSnapshotKeyTable, 0L)) - .build(); - exclusiveSizeMap.remove(prevSnapshotKeyTable); - exclusiveReplicatedSizeMap.remove(prevSnapshotKeyTable); - SetSnapshotPropertyRequest - setSnapshotPropertyRequest = - SetSnapshotPropertyRequest.newBuilder() - .setSnapshotKey(prevSnapshotKeyTable) - .setSnapshotSizeDeltaFromDirDeepCleaning(snapshotSize) - .build(); - - OMRequest omRequest = OMRequest.newBuilder() - .setCmdType(Type.SetSnapshotProperty) - .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest) - .setClientId(clientId.toString()) - .build(); - - submitRequest(omRequest, clientId); - } - - @SuppressWarnings("checkstyle:ParameterNumber") - private void processFilesUnderDir( - SnapshotInfo previousSnapshot, - SnapshotInfo previousToPrevSnapshot, - long volumeId, - OmBucketInfo bucketInfo, - OmDirectoryInfo parentInfo, - OMMetadataManager metadataManager, - Table snapRenamedTable, - Table previousKeyTable, - Table prevRenamedTable, - Table previousToPrevKeyTable) - throws IOException { - String seekFileInDB = metadataManager.getOzonePathKey(volumeId, - bucketInfo.getObjectID(), - parentInfo.getObjectID(), ""); - List blocksForKeyDelete = new ArrayList<>(); - - Table fileTable = metadataManager.getFileTable(); - try (TableIterator> - iterator = fileTable.iterator(seekFileInDB)) { - - while (iterator.hasNext()) { - Table.KeyValue entry = iterator.next(); - OmKeyInfo fileInfo = entry.getValue(); - if (!OMFileRequest.isImmediateChild(fileInfo.getParentObjectID(), - parentInfo.getObjectID())) { - break; - } - - String ozoneDeletePathKey = metadataManager - .getOzoneDeletePathKey(fileInfo.getObjectID(), entry.getKey()); - if (isKeyReclaimable(previousKeyTable, snapRenamedTable, - fileInfo, bucketInfo, volumeId, null)) { - for (OmKeyLocationInfoGroup keyLocations : - fileInfo.getKeyLocationVersions()) { - List item = keyLocations.getLocationList().stream() - .map(b -> new BlockID(b.getContainerID(), b.getLocalID())) - .collect(Collectors.toList()); - BlockGroup keyBlocks = BlockGroup.newBuilder() - .setKeyName(ozoneDeletePathKey) - .addAllBlockIDs(item) - .build(); - blocksForKeyDelete.add(keyBlocks); - } - // TODO: Add Retry mechanism. - getScmClient().deleteKeyBlocks(blocksForKeyDelete); - } else if (previousSnapshot != null) { - calculateExclusiveSize(previousSnapshot, previousToPrevSnapshot, - fileInfo, bucketInfo, volumeId, snapRenamedTable, - previousKeyTable, prevRenamedTable, previousToPrevKeyTable, - exclusiveSizeMap, exclusiveReplicatedSizeMap); - } - } - } - } - - private void updateDeepCleanSnapshotDir(String snapshotKeyTable) { - ClientId clientId = ClientId.randomId(); - SetSnapshotPropertyRequest setSnapshotPropertyRequest = - SetSnapshotPropertyRequest.newBuilder() - .setSnapshotKey(snapshotKeyTable) - .setDeepCleanedDeletedDir(true) - .build(); - - OMRequest omRequest = OMRequest.newBuilder() - .setCmdType(Type.SetSnapshotProperty) - .setSetSnapshotPropertyRequest(setSnapshotPropertyRequest) - .setClientId(clientId.toString()) - .build(); - - submitRequest(omRequest, clientId); - } - - public void submitRequest(OMRequest omRequest, ClientId clientId) { - try { - OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, clientId, getRunCount().get()); - } catch (ServiceException e) { - LOG.error("Snapshot deep cleaning request failed. " + - "Will retry at next run.", e); - } - } - - /** - * Stack node data for directory deep clean for snapshot. - */ - private static class StackNode { - private String dirKey; - private OmDirectoryInfo dirValue; - private String subDirSeek; - - public String getDirKey() { - return dirKey; - } - - public void setDirKey(String dirKey) { - this.dirKey = dirKey; - } - - public OmDirectoryInfo getDirValue() { - return dirValue; - } - - public void setDirValue(OmDirectoryInfo dirValue) { - this.dirValue = dirValue; - } - - public String getSubDirSeek() { - return subDirSeek; - } - - public void setSubDirSeek(String subDirSeek) { - this.subDirSeek = subDirSeek; - } - - @Override - public String toString() { - return "StackNode{" + - "dirKey='" + dirKey + '\'' + - ", dirObjectId=" + dirValue.getObjectID() + - ", subDirSeek='" + subDirSeek + '\'' + - '}'; - } - } -} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java index 42e76377e14d..3c32a70e047c 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -169,6 +170,7 @@ private void createConfig(File testDir) { private void createSubject() throws Exception { OmTestManagers omTestManagers = new OmTestManagers(conf, scmBlockTestingClient, null); keyManager = omTestManagers.getKeyManager(); + keyDeletingService = keyManager.getDeletingService(); directoryDeletingService = keyManager.getDirDeletingService(); writeClient = omTestManagers.getWriteClient(); @@ -435,7 +437,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() @ParameterizedTest @ValueSource(booleans = {true, false}) public void testRenamedKeyReclaimation(boolean testForSnapshot) - throws IOException, InterruptedException, TimeoutException { + throws IOException, InterruptedException, TimeoutException, ExecutionException { Table snapshotInfoTable = om.getMetadataManager().getSnapshotInfoTable(); Table deletedTable =