request = prepareDeleteDirRequest(
+ stringOmKeyInfoPair.getValue(), stringOmKeyInfoPair.getKey(), subDirectoryReclaimable, allSubDirList,
+ keyManager, reclaimableFileChecker, remainingBufLimit);
+ if (!request.isPresent()) {
+ continue;
+ }
+ PurgePathRequest requestVal = request.get();
+ consumedSize += requestVal.getSerializedSize();
remainingBufLimit -= consumedSize;
- purgePathRequestList.add(request);
+ purgePathRequestList.add(requestVal);
// Count up the purgeDeletedDir, subDirs and subFiles
- if (request.getDeletedDir() != null
- && !request.getDeletedDir().isEmpty()) {
+ if (requestVal.hasDeletedDir() && !StringUtils.isBlank(requestVal.getDeletedDir())) {
subdirDelNum++;
}
- subDirNum += request.getMarkDeletedSubDirsCount();
- subFileNum += request.getDeletedSubFilesCount();
- subDirRecursiveCnt++;
+ subDirNum += requestVal.getMarkDeletedSubDirsCount();
+ subFileNum += requestVal.getDeletedSubFilesCount();
} catch (IOException e) {
LOG.error("Error while running delete directories and files " +
"background task. Will retry at next run for subset.", e);
break;
}
}
-
if (!purgePathRequestList.isEmpty()) {
submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId);
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index ad90490101c4..7edbe7761175 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -21,15 +21,26 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
@@ -41,15 +52,20 @@
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableDirFilter;
+import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
@@ -57,21 +73,64 @@
import org.slf4j.LoggerFactory;
/**
- * This is a background service to delete orphan directories and its
- * sub paths(sub-dirs and sub-files).
+ Background service responsible for purging deleted directories and files
+ * in the Ozone Manager (OM) and associated snapshots.
*
*
- * This will scan the metadata of om periodically to get the orphan dirs from
- * DeletedDirectoryTable and find its sub paths. It will fetch all sub-files
- * from FileTable and move those to DeletedTable so that OM's
- * KeyDeletingService will cleanup those files later. It will fetch all
- * sub-directories from the DirectoryTable and move those to
- * DeletedDirectoryTable so that these will be visited in next iterations.
+ * This service periodically scans the deleted directory table and submits
+ * purge requests for directories and their sub-entries (subdirectories and files).
+ * It operates in both the active object store (AOS) and across all deep-clean enabled
+ * snapshots. The service supports parallel processing using a thread pool and
+ * coordinates exclusive size calculations and cleanup status updates for
+ * snapshots.
+ *
*
- *
- * After moving all sub-files and sub-dirs the parent orphan directory will be
- * deleted by this service. It will continue traversing until all the leaf path
- * components of an orphan directory is visited.
+ *
Key Features
+ *
+ * - Processes deleted directories in both the active OM and all snapshots
+ * with deep cleaning enabled.
+ * - Uses a thread pool to parallelize deletion tasks within each store or snapshot.
+ * - Employs filters to determine reclaimability of directories and files,
+ * ensuring safety with respect to snapshot chains.
+ * - Tracks and updates exclusive size and replicated exclusive size for each
+ * snapshot as directories and files are reclaimed.
+ * - Updates the "deep cleaned" flag for snapshots after a successful run.
+ * - Handles error and race conditions gracefully, deferring work if necessary.
+ *
+ *
+ * Constructor Parameters
+ *
+ * - interval - How often the service runs.
+ * - unit - Time unit for the interval.
+ * - serviceTimeout - Service timeout in the given time unit.
+ * - ozoneManager - The OzoneManager instance.
+ * - configuration - Ozone configuration object.
+ * - dirDeletingServiceCorePoolSize - Number of parallel threads for deletion per store or snapshot.
+ * - deepCleanSnapshots - Whether to enable deep cleaning for snapshots.
+ *
+ *
+ * Threading and Parallelism
+ *
+ * - Uses a configurable thread pool for parallel deletion tasks within each store/snapshot.
+ * - Each snapshot and AOS get a separate background task for deletion.
+ *
+ *
+ * Snapshot Integration
+ *
+ * - Iterates all snapshots in the chain if deep cleaning is enabled.
+ * - Skips snapshots that are already deep-cleaned or not yet flushed to disk.
+ * - Updates snapshot metadata to reflect size changes and cleaning status.
+ *
+ *
+ * Usage
+ *
+ * - Should be scheduled as a background service in OM.
+ * - Intended to be run only on the OM leader node.
+ *
+ *
+ * @see org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableDirFilter
+ * @see org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter
+ * @see org.apache.hadoop.ozone.om.SnapshotChainManager
*/
public class DirectoryDeletingService extends AbstractKeyDeletingService {
private static final Logger LOG =
@@ -80,32 +139,34 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
// Using multi thread for DirDeletion. Multiple threads would read
// from parent directory info from deleted directory table concurrently
// and send deletion requests.
- private final int dirDeletingCorePoolSize;
private int ratisByteLimit;
private final AtomicBoolean suspended;
- private AtomicBoolean isRunningOnAOS;
-
- private final DeletedDirSupplier deletedDirSupplier;
-
- private AtomicInteger taskCount = new AtomicInteger(0);
+ private final AtomicBoolean isRunningOnAOS;
+ private final SnapshotChainManager snapshotChainManager;
+ private final boolean deepCleanSnapshots;
+ private final ExecutorService deletionThreadPool;
+ private final int numberOfParallelThreadsPerStore;
public DirectoryDeletingService(long interval, TimeUnit unit,
long serviceTimeout, OzoneManager ozoneManager,
- OzoneConfiguration configuration, int dirDeletingServiceCorePoolSize) {
+ OzoneConfiguration configuration, int dirDeletingServiceCorePoolSize, boolean deepCleanSnapshots) {
super(DirectoryDeletingService.class.getSimpleName(), interval, unit,
dirDeletingServiceCorePoolSize, serviceTimeout, ozoneManager, null);
int limit = (int) configuration.getStorageSize(
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
+ this.numberOfParallelThreadsPerStore = dirDeletingServiceCorePoolSize;
+ this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, interval, unit,
+ new LinkedBlockingDeque<>(Integer.MAX_VALUE));
+
// always go to 90% of max limit for request as other header will be added
this.ratisByteLimit = (int) (limit * 0.9);
this.suspended = new AtomicBoolean(false);
this.isRunningOnAOS = new AtomicBoolean(false);
- this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize;
- deletedDirSupplier = new DeletedDirSupplier();
registerReconfigCallbacks(ozoneManager.getReconfigurationHandler(), configuration);
- taskCount.set(0);
+ this.snapshotChainManager = ((OmMetadataManagerImpl)ozoneManager.getMetadataManager()).getSnapshotChainManager();
+ this.deepCleanSnapshots = deepCleanSnapshots;
}
public void registerReconfigCallbacks(ReconfigurationHandler handler, OzoneConfiguration conf) {
@@ -138,10 +199,6 @@ public boolean isRunningOnAOS() {
return isRunningOnAOS.get();
}
- public AtomicInteger getTaskCount() {
- return taskCount;
- }
-
/**
* Suspend the service.
*/
@@ -165,20 +222,19 @@ public void setRatisByteLimit(int ratisByteLimit) {
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
- if (taskCount.get() > 0) {
- LOG.info("{} Directory deleting task(s) already in progress.",
- taskCount.get());
- return queue;
- }
- try {
- deletedDirSupplier.reInitItr();
- } catch (IOException ex) {
- LOG.error("Unable to get the iterator.", ex);
- return queue;
- }
- taskCount.set(dirDeletingCorePoolSize);
- for (int i = 0; i < dirDeletingCorePoolSize; i++) {
- queue.add(new DirectoryDeletingService.DirDeletingTask(this));
+ queue.add(new DirDeletingTask(this, null));
+ if (deepCleanSnapshots) {
+ Iterator iterator = null;
+ try {
+ iterator = snapshotChainManager.iterator(true);
+ } catch (IOException e) {
+ LOG.error("Error while initializing snapshot chain iterator.");
+ return queue;
+ }
+ while (iterator.hasNext()) {
+ UUID snapshotId = iterator.next();
+ queue.add(new DirDeletingTask(this, snapshotId));
+ }
}
return queue;
}
@@ -186,39 +242,36 @@ public BackgroundTaskQueue getTasks() {
@Override
public void shutdown() {
super.shutdown();
- deletedDirSupplier.closeItr();
}
- private final class DeletedDirSupplier {
+ private static final class DeletedDirSupplier implements Closeable {
private TableIterator>
deleteTableIterator;
- private synchronized Table.KeyValue get()
- throws IOException {
+ private DeletedDirSupplier(TableIterator> deleteTableIterator) {
+ this.deleteTableIterator = deleteTableIterator;
+ }
+
+ private synchronized Table.KeyValue get() {
if (deleteTableIterator.hasNext()) {
return deleteTableIterator.next();
}
return null;
}
- private synchronized void closeItr() {
+ @Override
+ public void close() {
IOUtils.closeQuietly(deleteTableIterator);
- deleteTableIterator = null;
- }
-
- private synchronized void reInitItr() throws IOException {
- closeItr();
- deleteTableIterator =
- getOzoneManager().getMetadataManager().getDeletedDirTable()
- .iterator();
}
}
private final class DirDeletingTask implements BackgroundTask {
private final DirectoryDeletingService directoryDeletingService;
+ private final UUID snapshotId;
- private DirDeletingTask(DirectoryDeletingService service) {
+ private DirDeletingTask(DirectoryDeletingService service, UUID snapshotId) {
this.directoryDeletingService = service;
+ this.snapshotId = snapshotId;
}
@Override
@@ -226,147 +279,217 @@ public int getPriority() {
return 0;
}
- @Override
- public BackgroundTaskResult call() {
- try {
- if (shouldRun()) {
- isRunningOnAOS.set(true);
- long rnCnt = getRunCount().incrementAndGet();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running DirectoryDeletingService. {}", rnCnt);
- }
- long dirNum = 0L;
- long subDirNum = 0L;
- long subFileNum = 0L;
- long remainingBufLimit = ratisByteLimit;
- int consumedSize = 0;
- List purgePathRequestList = new ArrayList<>();
- List> allSubDirList =
- new ArrayList<>();
-
- Table.KeyValue pendingDeletedDirInfo;
- // This is to avoid race condition b/w purge request and snapshot chain updation. For AOS taking the global
- // snapshotId since AOS could process multiple buckets in one iteration.
- try {
- UUID expectedPreviousSnapshotId =
- ((OmMetadataManagerImpl) getOzoneManager().getMetadataManager()).getSnapshotChainManager()
- .getLatestGlobalSnapshotId();
-
- long startTime = Time.monotonicNow();
- while (remainingBufLimit > 0) {
- pendingDeletedDirInfo = getPendingDeletedDirInfo();
- if (pendingDeletedDirInfo == null) {
- break;
- }
- // Do not reclaim if the directory is still being referenced by
- // the previous snapshot.
- if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
- continue;
- }
-
- PurgePathRequest request = prepareDeleteDirRequest(
- pendingDeletedDirInfo.getValue(),
- pendingDeletedDirInfo.getKey(), allSubDirList,
- getOzoneManager().getKeyManager(), remainingBufLimit);
-
- consumedSize += request.getSerializedSize();
- remainingBufLimit -= consumedSize;
- purgePathRequestList.add(request);
- // Count up the purgeDeletedDir, subDirs and subFiles
- if (request.getDeletedDir() != null && !request.getDeletedDir()
- .isEmpty()) {
- dirNum++;
- }
- subDirNum += request.getMarkDeletedSubDirsCount();
- subFileNum += request.getDeletedSubFilesCount();
- }
+ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequestUpdatingExclusiveSize(
+ long exclusiveSize, long exclusiveReplicatedSize, UUID snapshotID) {
+ OzoneManagerProtocolProtos.SnapshotSize snapshotSize = OzoneManagerProtocolProtos.SnapshotSize.newBuilder()
+ .setExclusiveSize(exclusiveSize)
+ .setExclusiveReplicatedSize(exclusiveReplicatedSize)
+ .build();
+ return OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder()
+ .setSnapshotKey(snapshotChainManager.getTableKey(snapshotID))
+ .setSnapshotSizeDeltaFromDirDeepCleaning(snapshotSize)
+ .build();
+ }
- optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
- subFileNum, allSubDirList, purgePathRequestList, null,
- startTime, remainingBufLimit,
- getOzoneManager().getKeyManager(), expectedPreviousSnapshotId,
- rnCnt);
+ /**
+ *
+ * @param currentSnapshotInfo if null, deleted directories in AOS should be processed.
+ * @param keyManager KeyManager of the underlying store.
+ */
+ private void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager,
+ long remainingBufLimit, long rnCnt) throws IOException, ExecutionException, InterruptedException {
+ String volume, bucket; String snapshotTableKey;
+ if (currentSnapshotInfo != null) {
+ volume = currentSnapshotInfo.getVolumeName();
+ bucket = currentSnapshotInfo.getBucketName();
+ snapshotTableKey = currentSnapshotInfo.getTableKey();
+ } else {
+ volume = null; bucket = null; snapshotTableKey = null;
+ }
- } catch (IOException e) {
- LOG.error(
- "Error while running delete directories and files " + "background task. Will retry at next run.",
- e);
+ try (DeletedDirSupplier dirSupplier = new DeletedDirSupplier(currentSnapshotInfo == null ?
+ keyManager.getDeletedDirEntries() : keyManager.getDeletedDirEntries(volume, bucket))) {
+ // This is to avoid race condition b/w purge request and snapshot chain update. For AOS taking the global
+ // snapshotId since AOS could process multiple buckets in one iteration. While using path
+ // previous snapshotId for a snapshot since it would process only one bucket.
+ UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ?
+ snapshotChainManager.getLatestGlobalSnapshotId() :
+ SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo, snapshotChainManager);
+ Map> exclusiveSizeMap = Maps.newConcurrentMap();
+
+ CompletableFuture processedAllDeletedDirs = CompletableFuture.completedFuture(true);
+ for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
+ CompletableFuture future = CompletableFuture.supplyAsync(() -> {
+ try {
+ return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, remainingBufLimit,
+ expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt);
+ } catch (Throwable e) {
+ return false;
+ }
+ }, deletionThreadPool);
+ processedAllDeletedDirs = future.thenCombine(future, (a, b) -> a && b);
+ }
+ // If AOS or all directories have been processed for snapshot, update snapshot size delta and deep clean flag
+ // if it is a snapshot.
+ if (processedAllDeletedDirs.get()) {
+ List setSnapshotPropertyRequests = new ArrayList<>();
+
+ for (Map.Entry> entry : exclusiveSizeMap.entrySet()) {
+ UUID snapshotID = entry.getKey();
+ long exclusiveSize = entry.getValue().getLeft();
+ long exclusiveReplicatedSize = entry.getValue().getRight();
+ setSnapshotPropertyRequests.add(getSetSnapshotRequestUpdatingExclusiveSize(
+ exclusiveSize, exclusiveReplicatedSize, snapshotID));
}
- isRunningOnAOS.set(false);
- synchronized (directoryDeletingService) {
- this.directoryDeletingService.notify();
+
+ // Updating directory deep clean flag of snapshot.
+ if (currentSnapshotInfo != null) {
+ setSnapshotPropertyRequests.add(OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder()
+ .setSnapshotKey(snapshotTableKey)
+ .setDeepCleanedDeletedDir(true)
+ .build());
}
+ submitSetSnapshotRequests(setSnapshotPropertyRequests);
}
- } finally {
- taskCount.getAndDecrement();
}
- // place holder by returning empty results of this call back.
- return BackgroundTaskResult.EmptyTaskResult.newResult();
}
- private boolean previousSnapshotHasDir(
- KeyValue pendingDeletedDirInfo) throws IOException {
- String key = pendingDeletedDirInfo.getKey();
- OmKeyInfo deletedDirInfo = pendingDeletedDirInfo.getValue();
- OmSnapshotManager omSnapshotManager =
- getOzoneManager().getOmSnapshotManager();
- OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
- getOzoneManager().getMetadataManager();
- SnapshotInfo previousSnapshotInfo = SnapshotUtils.getLatestSnapshotInfo(deletedDirInfo.getVolumeName(),
- deletedDirInfo.getBucketName(), getOzoneManager(), metadataManager.getSnapshotChainManager());
- if (previousSnapshotInfo == null) {
+ /**
+ * Processes deleted directories for snapshot management, determining whether
+ * directories and files can be purged, and calculates exclusive size mappings
+ * for snapshots.
+ *
+ * @param currentSnapshotInfo Information about the current snapshot whose deleted directories are being processed.
+ * @param keyManager Key manager of the underlying storage system to handle key operations.
+ * @param dirSupplier Supplier for fetching pending deleted directories to be processed.
+ * @param remainingBufLimit Remaining buffer limit for processing directories and files.
+ * @param expectedPreviousSnapshotId The UUID of the previous snapshot expected in the chain.
+ * @param totalExclusiveSizeMap A map for storing total exclusive size and exclusive replicated size
+ * for each snapshot.
+ * @param runCount The number of times the processing task has been executed.
+ * @return A boolean indicating whether the processed directory list is empty.
+ */
+ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyManager keyManager,
+ DeletedDirSupplier dirSupplier, long remainingBufLimit, UUID expectedPreviousSnapshotId,
+ Map> totalExclusiveSizeMap, long runCount) {
+ OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager();
+ IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock();
+ String snapshotTableKey = currentSnapshotInfo == null ? null : currentSnapshotInfo.getTableKey();
+ try (ReclaimableDirFilter reclaimableDirFilter = new ReclaimableDirFilter(getOzoneManager(),
+ omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock);
+ ReclaimableKeyFilter reclaimableFileFilter = new ReclaimableKeyFilter(getOzoneManager(),
+ omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock)) {
+ long startTime = Time.monotonicNow();
+ long dirNum = 0L;
+ long subDirNum = 0L;
+ long subFileNum = 0L;
+ int consumedSize = 0;
+ List purgePathRequestList = new ArrayList<>();
+ List> allSubDirList = new ArrayList<>();
+ while (remainingBufLimit > 0) {
+ KeyValue pendingDeletedDirInfo = dirSupplier.get();
+ if (pendingDeletedDirInfo == null) {
+ break;
+ }
+ boolean isDirReclaimable = reclaimableDirFilter.apply(pendingDeletedDirInfo);
+ Optional request = prepareDeleteDirRequest(
+ pendingDeletedDirInfo.getValue(),
+ pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList,
+ getOzoneManager().getKeyManager(), reclaimableFileFilter, remainingBufLimit);
+ if (!request.isPresent()) {
+ continue;
+ }
+ PurgePathRequest purgePathRequest = request.get();
+ consumedSize += purgePathRequest.getSerializedSize();
+ remainingBufLimit -= consumedSize;
+ purgePathRequestList.add(purgePathRequest);
+ // Count up the purgeDeletedDir, subDirs and subFiles
+ if (purgePathRequest.hasDeletedDir() && !StringUtils.isBlank(purgePathRequest.getDeletedDir())) {
+ dirNum++;
+ }
+ subDirNum += purgePathRequest.getMarkDeletedSubDirsCount();
+ subFileNum += purgePathRequest.getDeletedSubFilesCount();
+ }
+
+ optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
+ subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
+ startTime, remainingBufLimit, getOzoneManager().getKeyManager(),
+ reclaimableDirFilter, reclaimableFileFilter, expectedPreviousSnapshotId,
+ runCount);
+ Map exclusiveReplicatedSizeMap = reclaimableFileFilter.getExclusiveReplicatedSizeMap();
+ Map exclusiveSizeMap = reclaimableFileFilter.getExclusiveSizeMap();
+ List previousPathSnapshotsInChain =
+ Stream.of(exclusiveSizeMap.keySet(), exclusiveReplicatedSizeMap.keySet())
+ .flatMap(Collection::stream).distinct().collect(Collectors.toList());
+ for (UUID snapshot : previousPathSnapshotsInChain) {
+ totalExclusiveSizeMap.compute(snapshot, (k, v) -> {
+ long exclusiveSize = exclusiveSizeMap.getOrDefault(snapshot, 0L);
+ long exclusiveReplicatedSize = exclusiveReplicatedSizeMap.getOrDefault(snapshot, 0L);
+ if (v == null) {
+ return Pair.of(exclusiveSize, exclusiveReplicatedSize);
+ }
+ return Pair.of(v.getLeft() + exclusiveSize, v.getRight() + exclusiveReplicatedSize);
+ });
+ }
+
+ return purgePathRequestList.isEmpty();
+ } catch (IOException e) {
+ LOG.error("Error while running delete directories for store : {} and files background task. " +
+ "Will retry at next run. ", snapshotTableKey, e);
return false;
}
- // previous snapshot is not active or it has not been flushed to disk then don't process the key in this
- // iteration.
- if (previousSnapshotInfo.getSnapshotStatus() != SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE ||
- !OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(),
- previousSnapshotInfo)) {
- return true;
- }
- try (UncheckedAutoCloseableSupplier rcLatestSnapshot =
- omSnapshotManager.getSnapshot(
- deletedDirInfo.getVolumeName(),
- deletedDirInfo.getBucketName(),
- previousSnapshotInfo.getName())) {
-
- if (rcLatestSnapshot != null) {
- String dbRenameKey = metadataManager
- .getRenameKey(deletedDirInfo.getVolumeName(),
- deletedDirInfo.getBucketName(), deletedDirInfo.getObjectID());
- Table prevDirTable =
- rcLatestSnapshot.get().getMetadataManager().getDirectoryTable();
- Table prevDeletedDirTable =
- rcLatestSnapshot.get().getMetadataManager().getDeletedDirTable();
- OmKeyInfo prevDeletedDirInfo = prevDeletedDirTable.get(key);
- if (prevDeletedDirInfo != null) {
- return true;
+ }
+
+ @Override
+ public BackgroundTaskResult call() {
+ // Check if this is the Leader OM. If not leader, no need to execute this
+ // task.
+ if (shouldRun()) {
+ final long run = getRunCount().incrementAndGet();
+ if (snapshotId == null) {
+ LOG.debug("Running DirectoryDeletingService for active object store, {}", run);
+ isRunningOnAOS.set(true);
+ } else {
+ LOG.debug("Running DirectoryDeletingService for snapshot : {}, {}", snapshotId, run);
+ }
+ OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager();
+ SnapshotInfo snapInfo = null;
+ try {
+ snapInfo = snapshotId == null ? null :
+ SnapshotUtils.getSnapshotInfo(getOzoneManager(), snapshotChainManager, snapshotId);
+ if (snapInfo != null) {
+ if (snapInfo.isDeepCleanedDeletedDir()) {
+ LOG.info("Snapshot {} has already been deep cleaned directory. Skipping the snapshot in this iteration.",
+ snapInfo.getSnapshotId());
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ if (!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(), snapInfo)) {
+ LOG.info("Skipping snapshot processing since changes to snapshot {} have not been flushed to disk",
+ snapInfo);
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+ }
+ try (UncheckedAutoCloseableSupplier omSnapshot = snapInfo == null ? null :
+ omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(), snapInfo.getBucketName(),
+ snapInfo.getName())) {
+ KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager()
+ : omSnapshot.get().getKeyManager();
+ processDeletedDirsForStore(snapInfo, keyManager, ratisByteLimit, run);
+ }
+ } catch (IOException | ExecutionException | InterruptedException e) {
+ LOG.error("Error while running delete files background task for store {}. Will retry at next run.",
+ snapInfo, e);
+ } finally {
+ if (snapshotId == null) {
+ isRunningOnAOS.set(false);
+ synchronized (directoryDeletingService) {
+ this.directoryDeletingService.notify();
+ }
}
- String prevDirTableDBKey = metadataManager.getSnapshotRenamedTable()
- .get(dbRenameKey);
- // In OMKeyDeleteResponseWithFSO OzonePathKey is converted to
- // OzoneDeletePathKey. Changing it back to check the previous DirTable
- String prevDbKey = prevDirTableDBKey == null ?
- metadataManager.getOzoneDeletePathDirKey(key) : prevDirTableDBKey;
- OmDirectoryInfo prevDirInfo = prevDirTable.get(prevDbKey);
- //Checking if the previous snapshot in the chain hasn't changed while checking if the deleted directory is
- // present in the previous snapshot. If the chain has changed, the deleted directory could have been moved
- // to the newly created snapshot.
- SnapshotInfo newPreviousSnapshotInfo = SnapshotUtils.getLatestSnapshotInfo(deletedDirInfo.getVolumeName(),
- deletedDirInfo.getBucketName(), getOzoneManager(), metadataManager.getSnapshotChainManager());
- return (!Objects.equals(Optional.ofNullable(newPreviousSnapshotInfo).map(SnapshotInfo::getSnapshotId),
- Optional.ofNullable(previousSnapshotInfo).map(SnapshotInfo::getSnapshotId))) || (prevDirInfo != null &&
- prevDirInfo.getObjectID() == deletedDirInfo.getObjectID());
}
}
-
- return false;
+ // By design, no one cares about the results of this call back.
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
}
}
-
- public KeyValue getPendingDeletedDirInfo()
- throws IOException {
- return deletedDirSupplier.get();
- }
-
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index d89726fd35ef..5e34c1ff741a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -122,7 +122,7 @@ public BackgroundTaskQueue getTasks() {
try {
iterator = snapshotChainManager.iterator(true);
} catch (IOException e) {
- LOG.error("Error while initializing snapshot chain iterator.");
+ LOG.error("Error while initializing snapshot chain iterator. DirDeletingTask will only process AOS this run.");
return queue;
}
while (iterator.hasNext()) {
@@ -204,7 +204,7 @@ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ
* @param keyManager KeyManager of the underlying store.
*/
private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager,
- int remainNum) throws IOException, InterruptedException {
+ int remainNum) throws IOException {
String volume = null, bucket = null, snapshotTableKey = null;
if (currentSnapshotInfo != null) {
volume = currentSnapshotInfo.getVolumeName();
@@ -323,8 +323,8 @@ public BackgroundTaskResult call() {
SnapshotUtils.getSnapshotInfo(getOzoneManager(), snapshotChainManager, snapshotId);
if (snapInfo != null) {
if (snapInfo.isDeepCleaned()) {
- LOG.info("Snapshot {} has already been deep cleaned. Skipping the snapshot in this iteration.",
- snapInfo.getSnapshotId());
+ LOG.info("Snapshot '{}' ({}) has already been deep cleaned. Skipping the snapshot in this iteration.",
+ snapInfo.getTableKey(), snapInfo.getSnapshotId());
return EmptyTaskResult.newResult();
}
if (!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(), snapInfo)) {
@@ -345,7 +345,7 @@ public BackgroundTaskResult call() {
: omSnapshot.get().getKeyManager();
processDeletedKeysForStore(snapInfo, keyManager, remainNum);
}
- } catch (IOException | InterruptedException e) {
+ } catch (IOException e) {
LOG.error("Error while running delete files background task for store {}. Will retry at next run.",
snapInfo, e);
} finally {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 68d9306584ae..42e76377e14d 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -21,8 +21,8 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEEP_CLEANING_ENABLED;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -140,7 +140,7 @@ class TestKeyDeletingService extends OzoneTestBase {
private KeyManager keyManager;
private OMMetadataManager metadataManager;
private KeyDeletingService keyDeletingService;
- private SnapshotDirectoryCleaningService snapshotDirectoryCleaningService;
+ private DirectoryDeletingService directoryDeletingService;
private ScmBlockLocationTestingClient scmBlockTestingClient;
@BeforeAll
@@ -156,7 +156,7 @@ private void createConfig(File testDir) {
100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL,
100, TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_SNAPSHOT_DIRECTORY_SERVICE_INTERVAL,
+ conf.setTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL,
100, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL,
1, TimeUnit.SECONDS);
@@ -170,7 +170,7 @@ private void createSubject() throws Exception {
OmTestManagers omTestManagers = new OmTestManagers(conf, scmBlockTestingClient, null);
keyManager = omTestManagers.getKeyManager();
keyDeletingService = keyManager.getDeletingService();
- snapshotDirectoryCleaningService = keyManager.getSnapshotDirectoryService();
+ directoryDeletingService = keyManager.getDirDeletingService();
writeClient = omTestManagers.getWriteClient();
om = omTestManagers.getOzoneManager();
metadataManager = omTestManagers.getMetadataManager();
@@ -524,6 +524,7 @@ void testSnapshotDeepClean() throws Exception {
// Suspend KeyDeletingService
keyDeletingService.suspend();
+ directoryDeletingService.suspend();
final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable);
final long initialKeyCount = metadataManager.countRowsInTable(keyTable);
@@ -571,6 +572,7 @@ void testSnapshotDeepClean() throws Exception {
checkSnapDeepCleanStatus(snapshotInfoTable, volumeName, false);
keyDeletingService.resume();
+ directoryDeletingService.resume();
try (UncheckedAutoCloseableSupplier rcOmSnapshot =
om.getOmSnapshotManager().getSnapshot(volumeName, bucketName, snap3)) {
@@ -640,6 +642,7 @@ void testSnapshotExclusiveSize() throws Exception {
// Supspend KDS
keyDeletingService.suspend();
+ directoryDeletingService.suspend();
final long initialSnapshotCount = metadataManager.countRowsInTable(snapshotInfoTable);
final long initialKeyCount = metadataManager.countRowsInTable(keyTable);
@@ -711,10 +714,11 @@ void testSnapshotExclusiveSize() throws Exception {
createAndCommitKey(testVolumeName, testBucketName, uniqueObjectName("key"), 3);
long prevKdsRunCount = getRunCount();
- long prevSnapshotDirectorServiceCnt = snapshotDirectoryCleaningService.getRunCount().get();
+ long prevSnapshotDirectorServiceCnt = directoryDeletingService.getRunCount().get();
+ directoryDeletingService.resume();
// Let SnapshotDirectoryCleaningService to run for some iterations
GenericTestUtils.waitFor(
- () -> (snapshotDirectoryCleaningService.getRunCount().get() > prevSnapshotDirectorServiceCnt + 20),
+ () -> (directoryDeletingService.getRunCount().get() > prevSnapshotDirectorServiceCnt + 100),
100, 100000);
keyDeletingService.resume();
@@ -779,7 +783,7 @@ void cleanup() {
@Test
@DisplayName("Should not update keys when purge request times out during key deletion")
- public void testFailingModifiedKeyPurge() throws IOException, InterruptedException {
+ public void testFailingModifiedKeyPurge() throws IOException {
try (MockedStatic mocked = mockStatic(OzoneManagerRatisUtils.class,
CALLS_REAL_METHODS)) {