Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ private OMConfigKeys() {
// resulting 24MB
public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 6000;

public static final String OZONE_THREAD_NUMBER_DIR_DELETION =
"ozone.thread.number.dir.deletion";

public static final int OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT = 10;

public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK =
"ozone.snapshot.filtering.limit.per.task";
public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution()
when(ozoneManager.getOmSnapshotManager()).thenAnswer(i -> omSnapshotManager);
DirectoryDeletingService service = Mockito.spy(new DirectoryDeletingService(1000, TimeUnit.MILLISECONDS, 1000,
ozoneManager,
cluster.getConf()));
cluster.getConf(), 1));
service.shutdown();
final int initialSnapshotCount =
(int) cluster.getOzoneManager().getMetadataManager().countRowsInTable(snapshotInfoTable);
Expand Down Expand Up @@ -563,7 +563,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution()
}
return i.callRealMethod();
}).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(), anyLong(),
anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(), Mockito.any(), any());
anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(), Mockito.any(), any(), anyLong());

Mockito.doAnswer(i -> {
store.createSnapshot(testVolumeName, testBucketName, snap2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private void addPropertiesNotInXml() {
OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY,
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION,
ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
ScmConfigKeys.OZONE_SCM_HA_PREFIX,
S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,22 +480,22 @@ public void testSnapshotWithFSO() throws Exception {

private DirectoryDeletingService getMockedDirectoryDeletingService(AtomicBoolean dirDeletionWaitStarted,
AtomicBoolean dirDeletionStarted)
throws InterruptedException, TimeoutException {
throws InterruptedException, TimeoutException, IOException {
OzoneManager ozoneManager = Mockito.spy(om);
om.getKeyManager().getDirDeletingService().shutdown();
GenericTestUtils.waitFor(() -> om.getKeyManager().getDirDeletingService().getThreadCount() == 0, 1000,
100000);
DirectoryDeletingService directoryDeletingService = Mockito.spy(new DirectoryDeletingService(10000,
TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf()));
TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf(), 1));
directoryDeletingService.shutdown();
GenericTestUtils.waitFor(() -> directoryDeletingService.getThreadCount() == 0, 1000,
100000);
when(ozoneManager.getMetadataManager()).thenAnswer(i -> {
doAnswer(i -> {
// Wait for SDS to reach DDS wait block before processing any deleted directories.
GenericTestUtils.waitFor(dirDeletionWaitStarted::get, 1000, 100000);
dirDeletionStarted.set(true);
return i.callRealMethod();
});
}).when(directoryDeletingService).getPendingDeletedDirInfo();
return directoryDeletingService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
Expand Down Expand Up @@ -257,8 +259,16 @@ public void start(OzoneConfiguration configuration) {
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
dirDeletingService = new DirectoryDeletingService(dirDeleteInterval,
TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
int dirDeletingServiceCorePoolSize =
configuration.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
if (dirDeletingServiceCorePoolSize <= 0) {
dirDeletingServiceCorePoolSize = 1;
}
dirDeletingService =
new DirectoryDeletingService(dirDeleteInterval, TimeUnit.MILLISECONDS,
serviceTimeout, ozoneManager, configuration,
dirDeletingServiceCorePoolSize);
dirDeletingService.start();
}

Expand Down Expand Up @@ -2042,7 +2052,7 @@ public List<OmKeyInfo> getPendingDeletionSubDirs(long volumeId, long bucketId,
parentInfo.getObjectID(), "");
long countEntries = 0;

Table dirTable = metadataManager.getDirectoryTable();
Table<String, OmDirectoryInfo> dirTable = metadataManager.getDirectoryTable();
try (TableIterator<String,
? extends Table.KeyValue<String, OmDirectoryInfo>>
iterator = dirTable.iterator()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private void addToMap(Map<Pair<String, String>, List<String>> map, String object

protected void submitPurgePaths(List<PurgePathRequest> requests,
String snapTableKey,
UUID expectedPreviousSnapshotId) {
UUID expectedPreviousSnapshotId, long rnCnt) {
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();

Expand All @@ -305,7 +305,7 @@ protected void submitPurgePaths(List<PurgePathRequest> requests,

// Submit Purge paths request to OM
try {
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, rnCnt);
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.");
}
Expand Down Expand Up @@ -400,7 +400,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
List<PurgePathRequest> purgePathRequestList,
String snapTableKey, long startTime,
int remainingBufLimit, KeyManager keyManager,
UUID expectedPreviousSnapshotId) {
UUID expectedPreviousSnapshotId, long rnCnt) {

// Optimization to handle delete sub-dir and keys to remove quickly
// This case will be useful to handle when depth of directory is high
Expand Down Expand Up @@ -442,7 +442,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
}

if (!purgePathRequestList.isEmpty()) {
submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId);
submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId, rnCnt);
}

if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
Expand All @@ -455,7 +455,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
"DeletedDirectoryTable, iteration elapsed: {}ms," +
" totalRunCount: {}",
dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum),
Time.monotonicNow() - startTime, getRunCount());
Time.monotonicNow() - startTime, rnCnt);
}
return remainNum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
Expand All @@ -49,6 +50,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
Expand All @@ -74,10 +76,10 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
public static final Logger LOG =
LoggerFactory.getLogger(DirectoryDeletingService.class);

// Use only a single thread for DirDeletion. Multiple threads would read
// or write to same tables and can send deletion requests for same key
// multiple times.
private static final int DIR_DELETING_CORE_POOL_SIZE = 1;
// Using multi thread for DirDeletion. Multiple threads would read
// from parent directory info from deleted directory table concurrently
// and send deletion requests.
private final int dirDeletingCorePoolSize;
private static final int MIN_ERR_LIMIT_PER_TASK = 1000;

// Number of items(dirs/files) to be batched in an iteration.
Expand All @@ -86,11 +88,15 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
private final AtomicBoolean suspended;
private AtomicBoolean isRunningOnAOS;

private final DeletedDirSupplier deletedDirSupplier;

private AtomicInteger taskCount = new AtomicInteger(0);

public DirectoryDeletingService(long interval, TimeUnit unit,
long serviceTimeout, OzoneManager ozoneManager,
OzoneConfiguration configuration) {
OzoneConfiguration configuration, int dirDeletingServiceCorePoolSize) {
super(DirectoryDeletingService.class.getSimpleName(), interval, unit,
DIR_DELETING_CORE_POOL_SIZE, serviceTimeout, ozoneManager, null);
dirDeletingServiceCorePoolSize, serviceTimeout, ozoneManager, null);
this.pathLimitPerTask = configuration
.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
Expand All @@ -102,6 +108,9 @@ public DirectoryDeletingService(long interval, TimeUnit unit,
this.ratisByteLimit = (int) (limit * 0.9);
this.suspended = new AtomicBoolean(false);
this.isRunningOnAOS = new AtomicBoolean(false);
this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize;
deletedDirSupplier = new DeletedDirSupplier();
taskCount.set(0);
}

private boolean shouldRun() {
Expand All @@ -116,6 +125,10 @@ public boolean isRunningOnAOS() {
return isRunningOnAOS.get();
}

public AtomicInteger getTaskCount() {
return taskCount;
}

/**
* Suspend the service.
*/
Expand All @@ -135,10 +148,54 @@ public void resume() {
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
queue.add(new DirectoryDeletingService.DirDeletingTask(this));
if (taskCount.get() > 0) {
LOG.info("{} Directory deleting task(s) already in progress.",
taskCount.get());
return queue;
}
try {
deletedDirSupplier.reInitItr();
} catch (IOException ex) {
LOG.error("Unable to get the iterator.", ex);
}
taskCount.set(dirDeletingCorePoolSize);
for (int i = 0; i < dirDeletingCorePoolSize; i++) {
queue.add(new DirectoryDeletingService.DirDeletingTask(this));
}
return queue;
}

@Override
public void shutdown() {
super.shutdown();
deletedDirSupplier.closeItr();
}

private final class DeletedDirSupplier {
private TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
deleteTableIterator;

private synchronized Table.KeyValue<String, OmKeyInfo> get()
throws IOException {
if (deleteTableIterator.hasNext()) {
return deleteTableIterator.next();
}
return null;
}

private synchronized void closeItr() {
IOUtils.closeQuietly(deleteTableIterator);
deleteTableIterator = null;
}

private synchronized void reInitItr() throws IOException {
closeItr();
deleteTableIterator =
getOzoneManager().getMetadataManager().getDeletedDirTable()
.iterator();
}
}

private final class DirDeletingTask implements BackgroundTask {
private final DirectoryDeletingService directoryDeletingService;

Expand All @@ -158,7 +215,8 @@ public BackgroundTaskResult call() {
LOG.debug("Running DirectoryDeletingService");
}
isRunningOnAOS.set(true);
getRunCount().incrementAndGet();
// rnCnt will be same for each thread to maintain same CallId.
long rnCnt = getRunCount().incrementAndGet();
long dirNum = 0L;
long subDirNum = 0L;
long subFileNum = 0L;
Expand All @@ -169,27 +227,27 @@ public BackgroundTaskResult call() {
= new ArrayList<>((int) remainNum);

Table.KeyValue<String, OmKeyInfo> pendingDeletedDirInfo;

try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
deleteTableIterator = getOzoneManager().getMetadataManager().
getDeletedDirTable().iterator()) {
// This is to avoid race condition b/w purge request and snapshot chain updation. For AOS taking the global
// snapshotId since AOS could process multiple buckets in one iteration.
// This is to avoid race condition b/w purge request and snapshot chain updation. For AOS taking the global
// snapshotId since AOS could process multiple buckets in one iteration.
try {
UUID expectedPreviousSnapshotId =
((OmMetadataManagerImpl)getOzoneManager().getMetadataManager()).getSnapshotChainManager()
((OmMetadataManagerImpl) getOzoneManager().getMetadataManager()).getSnapshotChainManager()
.getLatestGlobalSnapshotId();

long startTime = Time.monotonicNow();
while (remainNum > 0 && deleteTableIterator.hasNext()) {
pendingDeletedDirInfo = deleteTableIterator.next();
while (remainNum > 0) {
pendingDeletedDirInfo = getPendingDeletedDirInfo();
if (pendingDeletedDirInfo == null) {
break;
}
// Do not reclaim if the directory is still being referenced by
// the previous snapshot.
if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
continue;
}

PurgePathRequest request = prepareDeleteDirRequest(
remainNum, pendingDeletedDirInfo.getValue(),
PurgePathRequest request = prepareDeleteDirRequest(remainNum,
pendingDeletedDirInfo.getValue(),
pendingDeletedDirInfo.getKey(), allSubDirList,
getOzoneManager().getKeyManager());
if (isBufferLimitCrossed(ratisByteLimit, consumedSize,
Expand All @@ -202,8 +260,8 @@ public BackgroundTaskResult call() {
// if directory itself is having a lot of keys / files,
// reduce capacity to minimum level
remainNum = MIN_ERR_LIMIT_PER_TASK;
request = prepareDeleteDirRequest(
remainNum, pendingDeletedDirInfo.getValue(),
request = prepareDeleteDirRequest(remainNum,
pendingDeletedDirInfo.getValue(),
pendingDeletedDirInfo.getKey(), allSubDirList,
getOzoneManager().getKeyManager());
}
Expand All @@ -214,19 +272,18 @@ public BackgroundTaskResult call() {
remainNum = remainNum - request.getDeletedSubFilesCount();
remainNum = remainNum - request.getMarkDeletedSubDirsCount();
// Count up the purgeDeletedDir, subDirs and subFiles
if (request.getDeletedDir() != null
&& !request.getDeletedDir().isEmpty()) {
if (request.getDeletedDir() != null && !request.getDeletedDir()
.isEmpty()) {
dirNum++;
}
subDirNum += request.getMarkDeletedSubDirsCount();
subFileNum += request.getDeletedSubFilesCount();
}

optimizeDirDeletesAndSubmitRequest(
remainNum, dirNum, subDirNum, subFileNum,
allSubDirList, purgePathRequestList, null, startTime,
ratisByteLimit - consumedSize,
getOzoneManager().getKeyManager(), expectedPreviousSnapshotId);
getOzoneManager().getKeyManager(), expectedPreviousSnapshotId, rnCnt);

} catch (IOException e) {
LOG.error("Error while running delete directories and files " +
Expand All @@ -238,6 +295,7 @@ public BackgroundTaskResult call() {
}
}
// place holder by returning empty results of this call back.
taskCount.getAndDecrement();
return BackgroundTaskResult.EmptyTaskResult.newResult();
}

Expand Down Expand Up @@ -301,4 +359,9 @@ private boolean previousSnapshotHasDir(
}
}

public KeyValue<String, OmKeyInfo> getPendingDeletedDirInfo()
throws IOException {
return deletedDirSupplier.get();
}

}
Loading
Loading