Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ protected synchronized void setInterval(long newInterval, TimeUnit newUnit) {
this.unit = newUnit;
}

protected synchronized long getIntervalMillis() {
return this.unit.toMillis(interval);
}

public abstract BackgroundTaskQueue getTasks();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -152,7 +153,7 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
private int ratisByteLimit;
private final SnapshotChainManager snapshotChainManager;
private final boolean deepCleanSnapshots;
private final ExecutorService deletionThreadPool;
private ExecutorService deletionThreadPool;
private final int numberOfParallelThreadsPerStore;
private final AtomicLong deletedDirsCount;
private final AtomicLong movedDirsCount;
Expand All @@ -168,9 +169,8 @@ public DirectoryDeletingService(long interval, TimeUnit unit,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
this.numberOfParallelThreadsPerStore = dirDeletingServiceCorePoolSize;
this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, interval, unit,
new LinkedBlockingDeque<>(Integer.MAX_VALUE));

this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore,
interval, unit, new LinkedBlockingDeque<>(Integer.MAX_VALUE));
// always go to 90% of max limit for request as other header will be added
this.ratisByteLimit = (int) (limit * 0.9);
registerReconfigCallbacks(ozoneManager.getReconfigurationHandler());
Expand Down Expand Up @@ -226,9 +226,33 @@ public BackgroundTaskQueue getTasks() {

@Override
public void shutdown() {
if (deletionThreadPool != null) {
deletionThreadPool.shutdown();
try {
if (!deletionThreadPool.awaitTermination(60, TimeUnit.SECONDS)) {
deletionThreadPool.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
deletionThreadPool.shutdownNow();
}
}
super.shutdown();
}

@Override
public synchronized void start() {
if (deletionThreadPool == null || deletionThreadPool.isShutdown() || deletionThreadPool.isTerminated()) {
this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore,
super.getIntervalMillis(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(Integer.MAX_VALUE));
}
super.start();
}

private boolean isThreadPoolActive(ExecutorService threadPoolExecutor) {
return threadPoolExecutor != null && !threadPoolExecutor.isShutdown() && !threadPoolExecutor.isTerminated();
}

@SuppressWarnings("checkstyle:ParameterNumber")
void optimizeDirDeletesAndSubmitRequest(
long dirNum, long subDirNum, long subFileNum,
Expand All @@ -238,7 +262,7 @@ void optimizeDirDeletesAndSubmitRequest(
long remainingBufLimit, KeyManager keyManager,
CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> reclaimableDirChecker,
CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> reclaimableFileChecker,
UUID expectedPreviousSnapshotId, long rnCnt) {
UUID expectedPreviousSnapshotId, long rnCnt) throws InterruptedException {

// Optimization to handle delete sub-dir and keys to remove quickly
// This case will be useful to handle when depth of directory is high
Expand Down Expand Up @@ -435,7 +459,7 @@ private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest(
}

private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List<PurgePathRequest> requests,
String snapTableKey, UUID expectedPreviousSnapshotId) {
String snapTableKey, UUID expectedPreviousSnapshotId) throws InterruptedException {
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();

Expand All @@ -461,7 +485,7 @@ private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List<PurgePathReq
// Submit Purge paths request to OM. Acquire bootstrap lock when processing deletes for snapshots.
try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) {
return submitRequest(omRequest);
} catch (ServiceException | InterruptedException e) {
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.", e);
}
return null;
Expand Down Expand Up @@ -524,10 +548,13 @@ private void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyMan
try {
return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, remainingBufLimit,
expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (Throwable e) {
return false;
}
}, deletionThreadPool);
}, isThreadPoolActive(deletionThreadPool) ? deletionThreadPool : ForkJoinPool.commonPool());
processedAllDeletedDirs = future.thenCombine(future, (a, b) -> a && b);
}
// If AOS or all directories have been processed for snapshot, update snapshot size delta and deep clean flag
Expand Down Expand Up @@ -572,7 +599,7 @@ private void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyMan
*/
private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyManager keyManager,
DeletedDirSupplier dirSupplier, long remainingBufLimit, UUID expectedPreviousSnapshotId,
Map<UUID, Pair<Long, Long>> totalExclusiveSizeMap, long runCount) {
Map<UUID, Pair<Long, Long>> totalExclusiveSizeMap, long runCount) throws InterruptedException {
OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager();
IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock();
String snapshotTableKey = currentSnapshotInfo == null ? null : currentSnapshotInfo.getTableKey();
Expand Down Expand Up @@ -676,9 +703,13 @@ public BackgroundTaskResult call() {
: omSnapshot.get().getKeyManager();
processDeletedDirsForStore(snapInfo, keyManager, ratisByteLimit, run);
}
} catch (IOException | ExecutionException | InterruptedException e) {
} catch (IOException | ExecutionException e) {
LOG.error("Error while running delete files background task for store {}. Will retry at next run.",
snapInfo, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interruption running delete directory background task for store {}.",
snapInfo, e);
}
}
// By design, no one cares about the results of this call back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public AtomicLong getDeletedKeyCount() {

Pair<Integer, Boolean> processKeyDeletes(List<BlockGroup> keyBlocksList,
Map<String, RepeatedOmKeyInfo> keysToModify, List<String> renameEntries,
String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException {
String snapTableKey, UUID expectedPreviousSnapshotId) throws IOException, InterruptedException {
long startTime = Time.monotonicNow();
Pair<Integer, Boolean> purgeResult = Pair.of(0, false);
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -166,7 +166,7 @@ Pair<Integer, Boolean> processKeyDeletes(List<BlockGroup> keyBlocksList,
*/
private Pair<Integer, Boolean> submitPurgeKeysRequest(List<DeleteBlockGroupResult> results,
Map<String, RepeatedOmKeyInfo> keysToModify, List<String> renameEntriesToBeDeleted,
String snapTableKey, UUID expectedPreviousSnapshotId) {
String snapTableKey, UUID expectedPreviousSnapshotId) throws InterruptedException {
List<String> purgeKeys = new ArrayList<>();

// Put all keys to be purged in a list
Expand Down Expand Up @@ -252,7 +252,7 @@ private Pair<Integer, Boolean> submitPurgeKeysRequest(List<DeleteBlockGroupResul
if (omResponse != null) {
purgeSuccess = purgeSuccess && omResponse.getSuccess();
}
} catch (ServiceException | InterruptedException e) {
} catch (ServiceException e) {
LOG.error("PurgeKey request failed. Will retry at next run.", e);
return Pair.of(0, false);
}
Expand Down Expand Up @@ -325,7 +325,7 @@ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ
* @param keyManager KeyManager of the underlying store.
*/
private void processDeletedKeysForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager,
int remainNum, int ratisLimit) throws IOException {
int remainNum, int ratisLimit) throws IOException, InterruptedException {
String volume = null, bucket = null, snapshotTableKey = null;
if (currentSnapshotInfo != null) {
volume = currentSnapshotInfo.getVolumeName();
Expand Down Expand Up @@ -471,6 +471,9 @@ public BackgroundTaskResult call() {
} catch (IOException e) {
LOG.error("Error while running delete files background task for store {}. Will retry at next run.",
snapInfo, e);
} catch (InterruptedException e) {
LOG.error("Interruption while running delete files background task for store {}.", snapInfo, e);
Thread.currentThread().interrupt();
}
}
// By design, no one cares about the results of this call back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ void cleanup() {

@Test
@DisplayName("Should not update keys when purge request times out during key deletion")
public void testFailingModifiedKeyPurge() throws IOException {
public void testFailingModifiedKeyPurge() throws IOException, InterruptedException {

try (MockedStatic<OzoneManagerRatisUtils> mocked = mockStatic(OzoneManagerRatisUtils.class,
CALLS_REAL_METHODS)) {
Expand Down