|
107 | 107 | import java.util.concurrent.Executor; |
108 | 108 | import java.util.concurrent.atomic.AtomicBoolean; |
109 | 109 | import java.util.stream.Collectors; |
| 110 | +import java.util.stream.Stream; |
110 | 111 |
|
111 | 112 | import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; |
112 | 113 |
|
@@ -387,12 +388,43 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI |
387 | 388 | ActionListener<Void> listener) throws IOException { |
388 | 389 | final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); |
389 | 390 | writeIndexGen(updatedRepositoryData, repositoryStateId); |
| 391 | + final ActionListener<Void> afterCleanupsListener = |
| 392 | + new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); |
| 393 | + |
| 394 | + // Run unreferenced blobs cleanup in parallel to snapshot deletion |
| 395 | + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(afterCleanupsListener, |
| 396 | + l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null)))); |
| 397 | + |
390 | 398 | deleteIndices( |
391 | 399 | updatedRepositoryData, |
392 | 400 | repositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId), |
393 | 401 | snapshotId, |
394 | | - ActionListener.delegateFailure(listener, |
395 | | - (l, v) -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null)))); |
| 402 | + ActionListener.runAfter( |
| 403 | + ActionListener.wrap( |
| 404 | + deleteResults -> { |
| 405 | + // Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths) |
| 406 | + // has been updated we can execute the delete operations for all blobs that have become unreferenced as a result |
| 407 | + final String basePath = basePath().buildAsString(); |
| 408 | + final int basePathLen = basePath.length(); |
| 409 | + blobContainer().deleteBlobsIgnoringIfNotExists( |
| 410 | + Stream.concat( |
| 411 | + deleteResults.stream().flatMap(shardResult -> { |
| 412 | + final String shardPath = |
| 413 | + shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); |
| 414 | + return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); |
| 415 | + }), |
| 416 | + deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId -> |
| 417 | + indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID())) |
| 418 | + ).map(absolutePath -> { |
| 419 | + assert absolutePath.startsWith(basePath); |
| 420 | + return absolutePath.substring(basePathLen); |
| 421 | + }).collect(Collectors.toList())); |
| 422 | + }, |
| 423 | + // Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request |
| 424 | + e -> logger.warn( |
| 425 | + () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)), |
| 426 | + () -> afterCleanupsListener.onResponse(null)) |
| 427 | + ); |
396 | 428 | } |
397 | 429 |
|
398 | 430 | /** |
@@ -551,89 +583,57 @@ private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices |
551 | 583 | * @param listener Listener to invoke when finished |
552 | 584 | */ |
553 | 585 | private void deleteIndices(RepositoryData repositoryData, List<IndexId> indices, SnapshotId snapshotId, |
554 | | - ActionListener<Void> listener) { |
| 586 | + ActionListener<Collection<ShardSnapshotMetaDeleteResult>> listener) { |
| 587 | + |
555 | 588 | if (indices.isEmpty()) { |
556 | | - listener.onResponse(null); |
| 589 | + listener.onResponse(Collections.emptyList()); |
557 | 590 | return; |
558 | 591 | } |
559 | | - // listener to complete once all shards folders affected by this delete have been added new metadata blobs without this snapshot |
560 | | - final StepListener<Collection<ShardSnapshotMetaDeleteResult>> deleteFromMetaListener = new StepListener<>(); |
561 | 592 |
|
562 | 593 | // Listener that flattens out the delete results for each index |
563 | 594 | final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>( |
564 | | - ActionListener.map(deleteFromMetaListener, |
565 | | - results -> results.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size()); |
| 595 | + ActionListener.map(listener, res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size()); |
566 | 596 | final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); |
567 | 597 | for (IndexId indexId : indices) { |
568 | 598 | executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener, |
569 | 599 | deleteIdxMetaListener -> { |
570 | | - IndexMetaData indexMetaData = null; |
| 600 | + final IndexMetaData indexMetaData; |
571 | 601 | try { |
572 | 602 | indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); |
573 | 603 | } catch (Exception ex) { |
574 | 604 | logger.warn(() -> |
575 | 605 | new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); |
576 | | - } |
577 | | - deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId); |
578 | | - if (indexMetaData != null) { |
579 | | - final int shardCount = indexMetaData.getNumberOfShards(); |
580 | | - assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]"; |
581 | | - // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index |
582 | | - final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener = |
583 | | - new GroupedActionListener<>(deleteIdxMetaListener, shardCount); |
584 | | - final Index index = indexMetaData.getIndex(); |
585 | | - for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { |
586 | | - final ShardId shard = new ShardId(index, shardId); |
587 | | - executor.execute(new AbstractRunnable() { |
588 | | - @Override |
589 | | - protected void doRun() throws Exception { |
590 | | - allShardsListener.onResponse( |
591 | | - deleteShardSnapshot(repositoryData, indexId, shard, snapshotId)); |
592 | | - } |
593 | | - |
594 | | - @Override |
595 | | - public void onFailure(Exception ex) { |
596 | | - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", |
597 | | - snapshotId, indexId.getName(), shard.id()), ex); |
598 | | - // Just passing null here to count down the listener instead of failing it, the stale data left behind |
599 | | - // here will be retried in the next delete or repository cleanup |
600 | | - allShardsListener.onResponse(null); |
601 | | - } |
602 | | - }); |
603 | | - } |
604 | | - } else { |
605 | 606 | // Just invoke the listener without any shard generations to count it down, this index will be cleaned up |
606 | 607 | // by the stale data cleanup in the end. |
607 | 608 | deleteIdxMetaListener.onResponse(null); |
| 609 | + return; |
608 | 610 | } |
609 | | - })); |
610 | | - } |
611 | | - |
612 | | - // Delete all the now unreferenced blobs in the shard paths |
613 | | - deleteFromMetaListener.whenComplete(newGens -> { |
614 | | - final String basePath = basePath().buildAsString(); |
615 | | - final int basePathLen = basePath.length(); |
616 | | - blobContainer().deleteBlobsIgnoringIfNotExists( |
617 | | - newGens.stream().flatMap(shardBlob -> { |
618 | | - final String shardPathAbs = shardContainer(shardBlob.indexId, shardBlob.shardId).path().buildAsString(); |
619 | | - assert shardPathAbs.startsWith(basePath); |
620 | | - final String pathToShard = shardPathAbs.substring(basePathLen); |
621 | | - return shardBlob.blobsToDelete.stream().map(blob -> pathToShard + blob); |
622 | | - }).collect(Collectors.toList()) |
623 | | - ); |
624 | | - listener.onResponse(null); |
625 | | - }, e -> { |
626 | | - logger.warn(() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e); |
627 | | - listener.onResponse(null); |
628 | | - }); |
629 | | - } |
| 611 | + final int shardCount = indexMetaData.getNumberOfShards(); |
| 612 | + assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]"; |
| 613 | + // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index |
| 614 | + final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener = |
| 615 | + new GroupedActionListener<>(deleteIdxMetaListener, shardCount); |
| 616 | + final Index index = indexMetaData.getIndex(); |
| 617 | + for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { |
| 618 | + final ShardId shard = new ShardId(index, shardId); |
| 619 | + executor.execute(new AbstractRunnable() { |
| 620 | + @Override |
| 621 | + protected void doRun() throws Exception { |
| 622 | + allShardsListener.onResponse( |
| 623 | + deleteShardSnapshot(repositoryData, indexId, shard, snapshotId)); |
| 624 | + } |
630 | 625 |
|
631 | | - private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) { |
632 | | - try { |
633 | | - indexMetaDataFormat.delete(indexContainer(indexId), snapshotId.getUUID()); |
634 | | - } catch (IOException ex) { |
635 | | - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]", |
636 | | - snapshotId, indexId.getName()), ex); |
| 626 | + @Override |
| 627 | + public void onFailure(Exception ex) { |
| 628 | + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", |
| 629 | + snapshotId, indexId.getName(), shard.id()), ex); |
| 630 | + // Just passing null here to count down the listener instead of failing it, the stale data left behind |
| 631 | + // here will be retried in the next delete or repository cleanup |
| 632 | + allShardsListener.onResponse(null); |
| 633 | + } |
| 634 | + }); |
| 635 | + } |
| 636 | + })); |
637 | 637 | } |
638 | 638 | } |
639 | 639 |
|
|
0 commit comments