Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Optimise snapshot deletion to speed up snapshot deletion and creation(#15568) #15724

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))[SnapshotV2] Snapshot Status API changes (#15409))
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.Numbers;
import org.opensearch.common.Priority;
import org.opensearch.common.Randomness;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -1274,6 +1275,10 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
ActionListener<Void> listener
) {
final List<Tuple<BlobPath, String>> filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults);
long startTimeNs = System.nanoTime();
Randomness.shuffle(filesToDelete);
logger.debug("[{}] shuffled the filesToDelete with timeElapsedNs={}", metadata.name(), (System.nanoTime() - startTimeNs));

if (filesToDelete.isEmpty()) {
listener.onResponse(null);
return;
Expand All @@ -1291,8 +1296,8 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
staleFilesToDeleteInBatch.size()
);

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size());
// Start as many workers as fit into the snapshot_deletion pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size());
for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, remoteStoreLockManagerFactory, groupedListener);
}
Expand Down Expand Up @@ -1396,7 +1401,7 @@ private void executeStaleShardDelete(
if (filesToDelete == null) {
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> {
try {
// filtering files for which remote store lock release and cleanup succeeded,
// remaining files for which it failed will be retried in next snapshot delete run.
Expand Down Expand Up @@ -1460,7 +1465,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(
ActionListener<Collection<ShardSnapshotMetaDeleteResult>> onAllShardsCompleted
) {

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION);
final List<IndexId> indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds);

if (indices.isEmpty()) {
Expand Down Expand Up @@ -1648,7 +1653,7 @@ private void cleanupStaleBlobs(
listener.onResponse(deleteResult);
}, listener::onFailure), 2);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION);
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
Expand Down Expand Up @@ -1851,7 +1856,7 @@ void cleanupStaleIndices(

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(),
foundIndices.size() - survivingIndexIds.size()
);
for (int i = 0; i < workers; ++i) {
Expand Down Expand Up @@ -1903,7 +1908,7 @@ private void executeOneStaleIndexDelete(
return;
}
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.supply(listener, () -> {
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
List<String> matchingShardPaths = findMatchingShardPaths(indexSnId, snapshotShardPaths);
Expand Down Expand Up @@ -2180,9 +2185,10 @@ public void finalizeSnapshot(
repositoryUpdatePriority,
ActionListener.wrap(newRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData, newRepoData, listener);
} else {
listener.onResponse(newRepoData);
}
listener.onResponse(newRepoData);
}, onUpdateFailure)
);
}, onUpdateFailure), 2 + indices.size());
Expand Down Expand Up @@ -2347,7 +2353,12 @@ private void logShardPathsOperationWarning(IndexId indexId, SnapshotId snapshotI
}

// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) {
private void cleanupOldShardGens(
RepositoryData existingRepositoryData,
RepositoryData updatedRepositoryData,
RepositoryData newRepositoryData,
ActionListener<RepositoryData> listener
) {
final List<String> toDelete = new ArrayList<>();
updatedRepositoryData.shardGenerations()
.obsoleteShardGenerations(existingRepositoryData.shardGenerations())
Expand All @@ -2356,10 +2367,62 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
(shardId, oldGen) -> toDelete.add(shardPath(indexId, shardId).buildAsString() + INDEX_FILE_PREFIX + oldGen)
)
);
if (toDelete.isEmpty()) {
listener.onResponse(newRepositoryData);
return;
}
try {
deleteFromContainer(rootBlobContainer(), toDelete);
AtomicInteger counter = new AtomicInteger();
Collection<List<String>> subList = toDelete.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch))
.values();
final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList);
logger.info(
"[{}] cleanupOldShardGens toDeleteSize={} groupSize={}",
metadata.name(),
toDelete.size(),
staleFilesToDeleteInBatch.size()
);
final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.wrap(r -> {
logger.info("[{}] completed cleanupOldShardGens", metadata.name());
listener.onResponse(newRepositoryData);
}, ex -> {
logger.error(new ParameterizedMessage("[{}] exception in cleanupOldShardGens", metadata.name()), ex);
listener.onResponse(newRepositoryData);
}), staleFilesToDeleteInBatch.size());

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size());
for (int i = 0; i < workers; ++i) {
executeOldShardGensCleanup(staleFilesToDeleteInBatch, groupedListener);
}
} catch (Exception e) {
logger.warn("Failed to clean up old shard generation blobs", e);
logger.warn(new ParameterizedMessage(" [{}] Failed to clean up old shard generation blobs", metadata.name()), e);
listener.onResponse(newRepositoryData);
}
}

private void executeOldShardGensCleanup(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
throws InterruptedException {
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(rootBlobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"[{}] Failed to delete following blobs during cleanupOldFiles : {}",
metadata.name(),
filesToDelete
),
e
);
l.onFailure(e);
}
executeOldShardGensCleanup(staleFilesToDeleteInBatch, listener);
}));
}
}

Expand Down Expand Up @@ -2476,10 +2539,11 @@ public long getRemoteDownloadThrottleTimeInNanos() {
}

protected void assertSnapshotOrGenericThread() {
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_DELETION + ']')
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread ["
+ Thread.currentThread()
+ "] to be the snapshot or generic thread.";
+ "] to be the snapshot_deletion or snapshot or generic thread.";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static class Names {
public static final String REFRESH = "refresh";
public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot";
public static final String SNAPSHOT_DELETION = "snapshot_deletion";
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
Expand Down Expand Up @@ -175,6 +176,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.REFRESH, ThreadPoolType.SCALING);
map.put(Names.WARMER, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT_DELETION, ThreadPoolType.SCALING);
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
Expand Down Expand Up @@ -233,6 +235,7 @@ public ThreadPool(
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
final int snapshotDeletionPoolMax = boundedBy(4 * allocatedProcessors, 64, 256);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
Expand Down Expand Up @@ -262,6 +265,10 @@ public ThreadPool(
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(
Names.SNAPSHOT_DELETION,
new ScalingExecutorBuilder(Names.SNAPSHOT_DELETION, 1, snapshotDeletionPoolMax, TimeValue.timeValueMinutes(5))
);
builders.put(
Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.SNAPSHOT_DELETION, n -> ThreadPool.boundedBy(4 * n, 64, 256));
sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors);
sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors);
sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessors);
Expand Down
Loading